阿里的STORM——JSTORM

摘要:
解决应用的类和Jstorm的类发生冲突,应用的类在自己的类空间中Task内部异步化。JStorm是一个类似HadoopMapReduce的系统,用户按照指定的接口实现一个任务,然后将这个任务递交给JStorm系统,Jstorm将这个任务跑起来,并且按7*24小时运行起来,一旦中间一个worker发生意外故障,调度器立即分配一个新的worker替换这个失效的worker。因此,从应用的角度,JStorm应用是一种遵守某种编程规范的分布式应用。

看介绍文档貌似挺好:
https://github.com/alibaba/jstorm

Storm 和JStorm

阿里拥有自己的实时计算引擎

  1. 类似于hadoop 中的MR

  2. 开源storm响应太慢

  3. 开源社区的速度完全跟不上Ali的需求

  4. 降低未来运维成本

  5. 提供更多技术支持,加快内部业务响应速度

现有Storm无法满足一些需求

  1. 现有storm调度太简单粗暴,无法定制化

  2. Storm 任务分配不平衡

  3. RPC OOM一直没有解决

  4. 监控太简单

  5. 对ZK 访问频繁

JStorm相比Storm更稳定

  1. Nimbus 实现HA:当一台nimbus挂了,自动热切到备份nimbus

  2. 原生Storm RPC:Zeromq 使用堆外内存,导致OS 内存不够,Netty 导致OOM;JStorm底层RPC 采用netty + disruptor保证发送速度和接受速度是匹配的

  3. 新上线的任务不会冲击老的任务:新调度从cpu,memory,disk,net 四个角度对任务进行分配,已经分配好的新任务,无需去抢占老任务的cpu,memory,disk和net

  4. Supervisor主线

  5. Spout/Bolt 的open/prepar

  6. 所有IO, 序列化,反序列化

  7. 减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描。

JStorm相比Storm调度更强大

  1. 彻底解决了storm 任务分配不均衡问题

  2. 从4个维度进行任务分配:CPU、Memory、Disk、Net

  3. 默认一个task,一个cpu slot。当task消耗更多的cpu时,可以申请更多cpu slot

  4. 默认一个task,一个memory slot。当task需要更多内存时,可以申请更多内存slot

  5. 默认task,不申请disk slot。当task 磁盘IO较重时,可以申请disk slot

  6. 可以强制某个component的task 运行在不同的节点上

  7. 可以强制topology运行在单独一个节点上

  8. 可以自定义任务分配,提前预约任务分配到哪台机器上,哪个端口,多少个cpu slot,多少内存,是否申请磁盘

  9. 可以预约上一次成功运行时的任务分配,上次task分配了什么资源,这次还是使用这些资源

JStorm相比Storm性能更好

JStorm 0.9.0 性能非常的好,使用netty时单worker 发送最大速度为11万QPS,使用zeromq时,最大速度为12万QPS。

  • JStorm 0.9.0 在使用Netty的情况下,比Storm 0.9.0 使用netty情况下,快10%, 并且JStorm netty是稳定的而Storm 的Netty是不稳定的

  • 在使用ZeroMQ的情况下, JStorm 0.9.0 比Storm 0.9.0 快30%

性能提升的原因:

  1. Zeromq 减少一次内存拷贝

  2. 增加反序列化线程

  3. 重写采样代码,大幅减少采样影响

  4. 优化ack代码

  5. 优化缓冲map性能

  6. Java 比clojure更底层

JStorm的其他优化点

  1. 资源隔离。不同部门,使用不同的组名,每个组有自己的Quato;不同组的资源隔离;采用cgroups 硬隔离

  2. Classloader。解决应用的类和Jstorm的类发生冲突,应用的类在自己的类空间中

  3. Task 内部异步化。Worker 内部全流水线模式,Spout nextTuple和ack/fail运行在不同线程

具体如何实现,请参考本ID的的博文系列 【jstorm-源码解析】


概叙 & 应用场景 JStorm 是一个分布式实时计算引擎。

JStorm 是一个类似Hadoop MapReduce的系统, 用户按照指定的接口实现一个任务,然后将这个任务递交给JStorm系统,Jstorm将这个任务跑起来,并且按7 * 24小时运行起来,一旦中间一个worker 发生意外故障, 调度器立即分配一个新的worker替换这个失效的worker。

因此,从应用的角度,JStorm 应用是一种遵守某种编程规范的分布式应用。从系统角度, JStorm一套类似MapReduce的调度系统。 从数据的角度, 是一套基于流水线的消息处理机制。

实时计算现在是大数据领域中最火爆的一个方向,因为人们对数据的要求越来越高,实时性要求也越来越快,传统的Hadoop Map Reduce,逐渐满足不了需求,因此在这个领域需求不断。

优点

在Storm和JStorm出现以前,市面上出现很多实时计算引擎,但自storm和JStorm出现后,基本上可以说一统江湖: 究其优点:

  • 开发非常迅速, 接口简单,容易上手,只要遵守Topology,Spout, Bolt的编程规范即可开发出一个扩展性极好的应用,底层rpc,worker之间冗余,数据分流之类的动作完全不用考虑。
  • 扩展性极好, 当一级处理单元速度,直接配置一下并发数,即可线性扩展性能
  • 健壮, 当worker失效或机器出现故障时, 自动分配新的worker替换失效worker
  • 数据准确性, 可以采用Acker机制,保证数据不丢失。 如果对精度有更多一步要求,采用事务机制,保证数据准确。
应用场景

JStorm处理数据的方式是基于消息的流水线处理, 因此特别适合无状态计算,也就是计算单元的依赖的数据全部在接受的消息中可以找到, 并且最好一个数据流不依赖另外一个数据流。

因此,常常用于

  • 日志分析,从日志中分析出特定的数据,并将分析的结果存入外部存储器如数据库。目前,主流日志分析技术就使用JStorm或Storm
  • 管道系统, 将一个数据从一个系统传输到另外一个系统, 比如将数据库同步到Hadoop
  • 消息转化器, 将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件
  • 统计分析器, 从日志或消息中,提炼出某个字段,然后做count或sum计算,最后将统计值存入外部存储器。中间处理过程可能更复杂。
如何安装
安装步骤
  • Downloads下载relase包
  • 搭建Zookeeper集群
  • 安装Python 2.6
  • 安装Java
  • 安装zeromq
  • 安装Jzmq
  • 配置$JSTORM_HOME/conf/storm.yaml
  • 搭建web ui
  • 启动JStorm集群

搭建Zookeeper集群

本处不细描叙Zookeeper安装步骤

搭建JStorm集群

安装python 2.6

-s $HOME/.pythonbrew/etc/bashrc&& source $HOME/.pythonbrew/etc/bashrc

pythonbrew install 2.6.7

pythonbrew switch 2.6.7

安装java

注意,如果当前系统是64位系统,则需要下载java 64位,如果是32为系统,则下载32位java

安装zeromq(如果不使用zeromq, 可以不安装zeromq)

wgethttp://download.zeromq.org/zeromq-2.1.7.tar.gz

tar zxf zeromq-2.1.7.tar.gz

cd zeromq-2.1.7

./configure

make

sudo make install

sudo ldconfig

如果没有root权限,或当前用户无sudo权限时,执行 “ ./configure --prefix=/home/xxxxx” 替换 “./configure”, 其中/home/xxxx 为安装目标目录

安装jzmq(如果不使用zeromq, 可以不安装jzmq)

git clone git://github.com/nathanmarz/jzmq.git

cd jzmq

./autogen.sh

./configure

make

make install

如果没有root权限,或当前用户无sudo权限时,执行 “ ./configure --prefix=/home/xxxx --with-zeromq=/home/xxxx” 替换 “./configure”, 其中/home/xxxx 为安装目标目录

安装JStorm

假设以jstorm-0.9.3.zip为例

unzip jstorm-0.9.3.zip

vi ~/.bashrc

export JSTORM_HOME=/XXXXX/XXXX

export PATH=$PATH:$JSTORM_HOME/bin

配置$JSTORM_HOME/conf/storm.yaml

配置项:

  • storm.zookeeper.servers: 表示zookeeper 的地址,
  • nimbus.host: 表示nimbus的地址
  • storm.zookeeper.root: 表示jstorm在zookeeper中的根目录,当多个JStorm共享一个ZOOKEEPER时,需要设置该选项,默认即为“/jstorm”
  • storm.local.dir: 表示jstorm临时数据存放目录,需要保证jstorm程序对该目录有写权限
  • java.library.path: zeromq 和java zeromq library的安装目录,默认"/usr/local/lib:/opt/local/lib:/usr/lib"
  • supervisor.slots.ports: 表示supervisor 提供的端口slot列表,注意不要和其他端口发生冲突,默认是68xx,而storm的是67xx
  • supervisor.disk.slot: 表示提供数据目录,当一台机器有多块磁盘时,可以提供磁盘读写slot,方便有重IO操作的应用。
  • topology.enable.classloader: false, 默认关闭classloader,如果应用的jar与jstorm的依赖的jar发生冲突,比如应用使用thrift9,但jstorm使用thrift7时,就需要打开classloader
  • nimbus.groupfile.path: 如果需要做资源隔离,比如数据仓库使用多少资源,技术部使用多少资源,无线部门使用多少资源时,就需要打开分组功能, 设置一个配置文件的绝对路径,改配置文件如源码中group_file.ini所示
  • storm.local.dir: jstorm使用的本地临时目录,如果一台机器同时运行storm和jstorm的话, 则不要共用一个目录,必须将二者分离开

在提交jar的节点上执行:

#mkdir ~/.jstorm
#cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm

安装JStorm web ui

必须使用tomcat 7.0 或以上版本, 注意不要忘记拷贝~/.jstorm/storm.yaml

web ui 可以和nimbus不在同一个节点

mkdir ~/.jstorm

cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm

下载tomcat 7.x (以apache-tomcat-7.0.37 为例)

tar -xzf apache-tomcat-7.0.37.tar.gz

cd apache-tomcat-7.0.37

cd webapps

cp $JSTORM_HOME/jstorm-ui-0.9.3.war ./

mv ROOT ROOT.old

ln -s jstorm-ui-0.9.3 ROOT

cd ../bin

./startup.sh

启动JStorm

  • 在nimbus 节点上执行 “nohup jstorm nimbus &”, 查看$JSTORM_HOME/logs/nimbus.log检查有无错误
  • 在supervisor节点上执行 “nohup jstorm supervisor &”, 查看$JSTORM_HOME/logs/supervisor.log检查有无错误
基本概念

在JStorm中有对于流stream的抽象,流是一个不间断的无界的连续tuple,注意JStorm在建模事件流时,把流中的事件抽象为tuple即元组,后面会解释JStorm中如何使用tuple。

STREAM

Spout/Bolt

JStorm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头抽象为spout,spout可能是连接消息中间件(如MetaQ, Kafka, TBNotify等),并不断发出消息,也可能是从某个队列中不断读取队列元素并装配为tuple发射。

有了源头即spout也就是有了stream,那么该如何处理stream内的tuple呢,同样的思想JStorm将tuple的中间处理过程抽象为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout(管口)再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。

我们可以认为spout就是一个一个的水龙头,并且每个水龙头里流出的水是不同的,我们想拿到哪种水就拧开哪个水龙头,然后使用管道将水龙头的水导向到一个水处理器(bolt),水处理器处理后再使用管道导向另一个处理器或者存入容器中。

spoutbolt

Topology

topology

对应上文的介绍,我们可以很容易的理解这幅图,这是一张有向无环图,JStorm将这个图抽象为Topology即拓扑(的确,拓扑结构是有向无环的),拓扑是Jstorm中最高层次的一个抽象概念,它可以被提交到Jstorm集群执行,一个拓扑就是一个数据流转换图,图中每个节点是一个spout或者bolt,图中的边表示bolt订阅了哪些流,当spout或者bolt发送元组到流时,它就发送元组到每个订阅了该流的bolt(这就意味着不需要我们手工拉管道,只要预先订阅,spout就会将流发到适当bolt上)。 插个位置说下Jstorm的topology实现,为了做实时计算,我们需要设计一个拓扑图,并实现其中的Bolt处理细节,JStorm中拓扑定义仅仅是一些Thrift结构体,这样一来我们就可以使用其他语言来创建和提交拓扑。

Tuple

JStorm将流中数据抽象为tuple,一个tuple就是一个值列表value list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。

Worker/Task

Worker和Task是JStorm中任务的执行单元, 一个worker表示一个进程,一个task表示一个线程, 一个worker可以运行多个task。

资源slot

在JStorm中,资源类型分为4种, CPU, Memory,Disk, Port, 不再局限于Storm的port。 即一个supervisor可以提供多少个CPU slot,多少个Memory slot, 多少个Disk slot, 多少个Port slot

  • 一个worker就消耗一个Port slot, 默认一个task会消耗一个CPU slot和一个Memory slot
  • 当task执行任务较重时,可以申请更多的CPU slot,
  • 当task需要更多内存时,可以申请更多的内存slot,
  • 当task 磁盘读写较多时,可以申请磁盘slot,则该磁盘slot给该task独享。
应用例子
最简单的JStorm例子分为4个步骤:

生成Topology

Mapconf=newHashMp();//topology所有自定义的配置均放入这个MapTopologyBuilderbuilder=newTopologyBuilder();//创建topology的生成器intspoutParal=get("spout.parallel",1);//获取spout的并发设置SpoutDeclarerspout=builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME
,
newSequenceSpout(), spoutParal);//创建Spout, 其中new SequenceSpout() 为真正spout对象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 为spout的名字,注意名字中不要含有空格intboltParal=get("bolt.parallel",1);//获取bolt的并发设置BoltDeclarertotalBolt=builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME,newTotalCount(),
boltParal)
.shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);//创建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 为bolt名字,TotalCount 为bolt对象,boltParal为bolt并发数,//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME),//表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的数据,并且以shuffle方式,//即每个spout随机轮询发送tuple到下一级bolt中intackerParal=get("acker.parallel",1);Config.setNumAckers(conf, ackerParal);//设置表示acker的并发数intworkerNum=get("worker.num",10);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);//表示整个topology将使用几个worker
conf.put(Config.STORM_CLUSTER_MODE,"distributed");//设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行StormSubmitter.submitTopology(streamName, conf,
builder.createTopology());//提交topology
IRichSpout

IRichSpout 为最简单的Spout接口

IRichSpout
{
@Overridepublicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollector
collector) {
}
@Overridepublicvoid
close() {
}
@Overridepublicvoid
activate() {
}
@Overridepublicvoid
deactivate() {
}
@Overridepublicvoid
nextTuple() {
}
@Overridepublicvoidack(Object
msgId) {
}
@Overridepublicvoidfail(Object
msgId) {
}
@OverridepublicvoiddeclareOutputFields(OutputFieldsDeclarer
declarer) {
}
@OverridepublicMap<String,Object>
getComponentConfiguration() {
returnnull;
}

其中注意:

  • spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
  • spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
  • open是当task起来后执行的初始化动作
  • close是当task被shutdown后执行的动作
  • activate 是当task被激活时,触发的动作
  • deactivate 是task被deactive时,触发的动作
  • nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
  • ack, 当spout收到一条ack消息时,触发的动作,详情可以参考ack机制
  • fail, 当spout收到一条fail消息时,触发的动作,详情可以参考ack机制
  • declareOutputFields, 定义spout发送数据,每个字段的含义
  • getComponentConfiguration 获取本spout的component 配置
Bolt
IRichBolt
{
@Overridepublicvoidprepare(MapstormConf,TopologyContextcontext,OutputCollector
collector) {
}
@Overridepublicvoidexecute(Tuple
input) {
}
@Overridepublicvoid
cleanup() {
}
@OverridepublicvoiddeclareOutputFields(OutputFieldsDeclarer
declarer) {
}
@OverridepublicMap<String,Object>
getComponentConfiguration() {
returnnull;
}
}

其中注意:

  • bolt对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
  • bolt可以有构造函数,但构造函数只执行一次,是在提交任务时,创建bolt对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将bolt序列化到文件中去,在worker起来时再将bolt从文件中反序列化出来)。
  • prepare是当task起来后执行的初始化动作
  • cleanup是当task被shutdown后执行的动作
  • execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。 ** 在executor中,当程序处理一条消息时,需要执行collector.ack, 详情可以参考ack机制** 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail ,详情可以参考ack机制
  • declareOutputFields, 定义bolt发送数据,每个字段的含义
  • getComponentConfiguration 获取本bolt的component 配置
编译

在Maven中配置

<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client</artifactId>
<version>0.9.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client-extension</artifactId>
<version>0.9.3.1</version>
<scope>provided</scope>
</dependency>

如果找不到jstorm-client和jstorm-client-extension包,可以自己下载jstorm源码进行编译,请参考源码编译

打包时,需要将所有依赖打入到一个包中

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>storm.starter.SequenceTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
提交jar JStorm vs Storm vs flume vs S4 选型

jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter

  • xxxx.jar 为打包后的jar
  • com.alibaba.xxxx.xx 为入口类,即提交任务的类
  • parameter即为提交参数

JStorm VS Storm 请参看JStorm 0.9.0 介绍.pptx

JStorm 比Storm更稳定,更强大,更快, storm上跑的程序,一行代码不变可以运行在jstorm上。

Flume是一个成熟的系统,主要focus在管道上,将数据从一个数据源传输到另外一个数据源, 系统提供大量现成的插件做管道作用。当然也可以做一些计算和分析,但插件的开发没有Jstorm便捷和迅速。

S4就是一个半成品,健壮性还可以,但数据准确性较糟糕,无法保证数据不丢失,这个特性让S4 大受限制,也导致了S4开源很多年,但发展一直不是很迅速。

AKKA是一个actor模型,也是一个不错的系统,在这个actor模型基本上,你想做任何事情都没有问题,但问题是你需要做更多的工作,topology怎么生成,怎么序列化。数据怎么流(随机,还是group by)等等。

Spark是一个轻量的内存MR, 更偏重批量数据处理


0.9.0 性能测试 JStorm 0.9.0 性能非常的好, 使用netty时单worker 发送最大速度为11万QPS, 使用zeromq时,最大速度为12万QPS.
结论
  • JStorm 0.9.0 在使用Netty的情况下,比Storm 0.9.0 使用netty情况下,快10%, 并且JStorm netty是稳定的而Storm 的Netty是不稳定的
  • 在使用ZeroMQ的情况下, JStorm 0.9.0 比Storm 0.9.0 快30%
原因
  • Zeromq 减少一次内存拷贝
  • 增加反序列化线程
  • 重写采样代码,大幅减少采样影响
  • 优化ack代码
  • 优化缓冲map性能
  • Java 比clojure更底层
测试

测试样例

测试样例为https://github.com/longdafeng/storm-examples

测试环境

5 台 16核, 98G 物理机

uname -a :
Linux dwcache1 2.6.32-220.23.1.tb735.el5.x86_64 #1 SMP Tue Aug 14 16:03:04 CST 2012 x86_64 x86_64 x86_64 GNU/Linux

测试结果

  • JStorm with netty, Spout 发送QPS 为 11万

jstorm.0.9.0.netty

  • storm with netty, Spout 应用发送QPS 为 10万 (截图为上层应用的QPS, 没有包括发送到ack的QPS, Spout发送QPS 正好为上层应用QPS的2倍)

storm.0.9.0.netty

  • JStorm with zeromq, Spout 发送QPS 为12万

jstorm.0.9.0.zmq

  • Storm with zeromq, Spout 发送QPS 为9万(截图为上层应用的QPS, 没有包括发送到ack的QPS, Spout发送QPS 正好为上层应用QPS的2倍)

storm.0.9.0.zmq


资源硬隔离

cgroups是control groups的缩写,是Linux内核提供的一种可以限制, 记录, 隔离进程组(process groups)所使用的物理资源(如:cpu,memory,IO 等等)的机制。

在Jstorm中,我们使用cgroup进行cpu硬件资源的管理。使用前,需要做如下检查和配置。

  • 检查/etc/passwd 文件中当前用户的uid和gid, 假设当前用户是admin, 则看/etc/passwd文件中admin的uid和gid是多少
  • cgroup功能在当前系统的内核版本是否支持

    检查/etc/cgconfig.conf是否存在。如果不存在, 请“yum install libcgroup”,如果存在,设置cpu子系统的挂载目录位置, 以及修改该配置文件中相应的uid/gid为启动jstorm用户的uid/gid, 本例子中以500为例, 注意是根据第一步来进行设置的。

mount {
cpu = /cgroup/cpu;
}
group jstorm {
perm {
task {
uid = 500;
gid = 500;
}
admin {
uid = 500;
gid = 500;
}
}
cpu {
}
}
  • 然后启动cgroup服务
service cgconfig restart
chkconfig --level 23456 cgconfig on

Note: cgconfig.conf只能在root模式下修改。

或者直接执行命令

这是一个cgconfig.conf配置文件例子。比如jstorm的启动用户为admin,admin在当前 系统的uid/gid为500(查看/etc/passwd 可以查看到uid和gid),那么相对应cpu子系统的jstorm目录uid/gid也需要设置为相同的值。 以便jstorm有相应权限可以在这个目录下为jstorm的每个需要进行资源隔离的进程创建对应 的目录和进行相关设置。

mkdir /cgroup/cpu
mount -t cgroup -o cpu none /cgroup/cpu
mkdir /cgroup/cpu/jstorm
chown admin:admin /cgroup/cpu/jstorm

. 在jstorm配置文件中打开cgroup, 配置storm.yaml

supervisor.enable.cgroup: true

常见问题
性能问题

参考性能优化

资源不够

当报告 ”No supervisor resource is enough for component “, 则意味着资源不够 如果是仅仅是测试环境,可以将supervisor的cpu 和memory slot设置大,

在jstorm中, 一个task默认会消耗一个cpu slot和一个memory slot, 而一台机器上默认的cpu slot是(cpu 核数 -1), memory slot数(物理内存大小 * 75%/1g), 如果一个worker上运行task比较多时,需要将memory slot size设小(默认是1G), 比如512M, memory.slot.per.size: 535298048

#if it is null, then it will be detect by system
supervisor.cpu.slot.num: null
#if it is null, then it will be detect by system
supervisor.mem.slot.num: null
# support disk slot
# if it is null, it will use $(storm.local.dir)/worker_shared_data
supervisor.disk.slot: null
序列化问题

所有spout,bolt,configuration, 发送的消息(Tuple)都必须实现Serializable, 否则就会出现序列化错误.

如果是spout或bolt的成员变量没有实现Serializable时,但又必须使用时, 可以对该变量申明时,增加transient 修饰符, 然后在open或prepare时,进行实例化

seriliazble_error

Log4j 冲突

0.9.0 开始,JStorm依旧使用Log4J,但storm使用Logbak,因此应用程序如果有依赖log4j-over-slf4j.jar, 则需要exclude 所有log4j-over-slf4j.jar依赖,下个版本将自定义classloader,就不用担心这个问题。

SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError.
SLF4J: See also
http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.log4j.Logger.getLogger(Logger.java:39)
at org.apache.log4j.Logger.getLogger(Logger.java:43)
at com.alibaba.jstorm.daemon.worker.Worker.<clinit>(Worker.java:32)
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also
http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
... 3 more
Could not find the main class: com.alibaba.jstorm.daemon.worker.Worker. Program will exit.
类冲突

如果应用程序使用和JStorm相同的jar 但版本不一样时,建议打开classloader, 修改配置文件

topology.enable.classloader: true

或者

ConfigExtension.setEnableTopologyClassLoader(conf, true);

JStorm默认是关掉classloader,因此JStorm会强制使用JStorm依赖的jar

提交任务后,等待几分钟后,web ui始终没有显示对应的task

有3种情况:

用户程序初始化太慢

如果有用户程序的日志输出,则表明是用户的初始化太慢或者出错,查看日志即可。 另外对于MetaQ 1.x的应用程序,Spout会recover ~/.meta_recover/目录下文件,可以直接删除这些消费失败的问题,加速启动。

通常是用户jar冲突或初始化发生问题

打开supervisor 日志,找出启动worker命令,单独执行,然后检查是否有问题。类似下图:

fail_start_worker

检查是不是storm和jstorm使用相同的本地目录

检查配置项 ”storm.local.dir“, 是不是storm和jstorm使用相同的本地目录,如果相同,则将二者分开

提示端口被绑定

有2种情况:

多个worker抢占一个端口

假设是6800 端口被占, 可以执行命令 “ps -ef|grep 6800” 检查是否有多个进程, 如果有多个进程,则手动杀死他们

系统打开太多的connection

Linux对外连接端口数限制,TCP client对外发起连接数达到28000左右时,就开始大量抛异常,需要

# echo "10000 65535" > /proc/sys/net/ipv4/ip_local_port_range

免责声明:文章转载自《阿里的STORM——JSTORM》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇cocoapods安装及常用命令Spring通过SchedulerFactoryBean实现调度任务的配置下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

RabbitMQ消费消息的两种模式:推和拉

前言 在rabbitmq中有两种消息处理的模式,一种是推模式/订阅模式/投递模式(也叫push模式),消费者调用channel.basicConsume方法订阅队列后,由RabbitMQ主动将消息推送给订阅队列的消费者;另一种是拉模式/检索模式(也叫pull模式),需要消费者调用channel.basicGet方法,主动从指定队列中拉取消息。 推模式:消...

Netty入门

一、是什么 Netty是一个高性能、异步事件驱动、基于Java NIO的异步的可扩展的客户端/服务器网络编程框架。 Netty提供了对 TCP、UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞的,通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。 Net...

【RabbitMQ】一文带你搞定springboot整合RabbitMQ涉及消息的发送确认,消息的消费确认机制,延时队列的实现

说明 这一篇里,我们将继续介绍RabbitMQ的高级特性,通过本篇的学习,你将收获: 什么是延时队列 延时队列使用场景 RabbitMQ中的TTL 如何利用RabbitMQ来实现延时队列 本文大纲 什么是延迟队列 延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。 其次,延时队列,最...

asp.net中序列化和反序列化json的两种常用方式

使用System.Web.Script.Serialization.JavaScriptSerializer类       JavaScriptSerializer类为.net类库自带,.net3.5及以后版本都可以使用,该类位于System.Web.Extensions.dll中,如需使用该类,必须添加引用。      (1) 序列化 p...

ubuntu下如何检查nvidia显卡驱动是否安装OK?

答:使用sudo lshw -c video即可,笔者的输出如下: jello~$ sudo lshw -c video*-display description: VGA compatible controller product: GP106 [GeForce GTX 2080 80GB...

解决ASP.NET MVC返回的JsonResult 中 日期类型数据格式问题,和返回的属性名称转为“驼峰命名法”和循环引用问题

DateTime类型数据格式问题 问题 在使用ASP.NET MVC 在写项目的时候发现,返回给前端的JSON数据,日期类型是 Date(121454578784541) 的格式,需要前端来转换一下才能用来使用。 C#对象属性名称转换成JSON自动转成“驼峰命名法” 问题 在C#中推荐的属性命名方式是“帕斯卡命名法”【首字母大写】但是在前端推荐命名方式为“...