看介绍文档貌似挺好:
https://github.com/alibaba/jstorm
阿里拥有自己的实时计算引擎
类似于hadoop 中的MR
开源storm响应太慢
开源社区的速度完全跟不上Ali的需求
降低未来运维成本
提供更多技术支持,加快内部业务响应速度
现有Storm无法满足一些需求
现有storm调度太简单粗暴,无法定制化
Storm 任务分配不平衡
RPC OOM一直没有解决
监控太简单
对ZK 访问频繁
JStorm相比Storm更稳定
Nimbus 实现HA:当一台nimbus挂了,自动热切到备份nimbus
原生Storm RPC:Zeromq 使用堆外内存,导致OS 内存不够,Netty 导致OOM;JStorm底层RPC 采用netty + disruptor保证发送速度和接受速度是匹配的
新上线的任务不会冲击老的任务:新调度从cpu,memory,disk,net 四个角度对任务进行分配,已经分配好的新任务,无需去抢占老任务的cpu,memory,disk和net
Supervisor主线
Spout/Bolt 的open/prepar
所有IO, 序列化,反序列化
减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描。
JStorm相比Storm调度更强大
彻底解决了storm 任务分配不均衡问题
从4个维度进行任务分配:CPU、Memory、Disk、Net
默认一个task,一个cpu slot。当task消耗更多的cpu时,可以申请更多cpu slot
默认一个task,一个memory slot。当task需要更多内存时,可以申请更多内存slot
默认task,不申请disk slot。当task 磁盘IO较重时,可以申请disk slot
可以强制某个component的task 运行在不同的节点上
可以强制topology运行在单独一个节点上
可以自定义任务分配,提前预约任务分配到哪台机器上,哪个端口,多少个cpu slot,多少内存,是否申请磁盘
可以预约上一次成功运行时的任务分配,上次task分配了什么资源,这次还是使用这些资源
JStorm相比Storm性能更好
JStorm 0.9.0 性能非常的好,使用netty时单worker 发送最大速度为11万QPS,使用zeromq时,最大速度为12万QPS。
JStorm 0.9.0 在使用Netty的情况下,比Storm 0.9.0 使用netty情况下,快10%, 并且JStorm netty是稳定的而Storm 的Netty是不稳定的
在使用ZeroMQ的情况下, JStorm 0.9.0 比Storm 0.9.0 快30%
性能提升的原因:
Zeromq 减少一次内存拷贝
增加反序列化线程
重写采样代码,大幅减少采样影响
优化ack代码
优化缓冲map性能
Java 比clojure更底层
JStorm的其他优化点
资源隔离。不同部门,使用不同的组名,每个组有自己的Quato;不同组的资源隔离;采用cgroups 硬隔离
Classloader。解决应用的类和Jstorm的类发生冲突,应用的类在自己的类空间中
Task 内部异步化。Worker 内部全流水线模式,Spout nextTuple和ack/fail运行在不同线程
具体如何实现,请参考本ID的的博文系列 【jstorm-源码解析】
JStorm 是一个类似Hadoop MapReduce的系统, 用户按照指定的接口实现一个任务,然后将这个任务递交给JStorm系统,Jstorm将这个任务跑起来,并且按7 * 24小时运行起来,一旦中间一个worker 发生意外故障, 调度器立即分配一个新的worker替换这个失效的worker。
因此,从应用的角度,JStorm 应用是一种遵守某种编程规范的分布式应用。从系统角度, JStorm一套类似MapReduce的调度系统。 从数据的角度, 是一套基于流水线的消息处理机制。
实时计算现在是大数据领域中最火爆的一个方向,因为人们对数据的要求越来越高,实时性要求也越来越快,传统的Hadoop Map Reduce,逐渐满足不了需求,因此在这个领域需求不断。
优点在Storm和JStorm出现以前,市面上出现很多实时计算引擎,但自storm和JStorm出现后,基本上可以说一统江湖: 究其优点:
- 开发非常迅速, 接口简单,容易上手,只要遵守Topology,Spout, Bolt的编程规范即可开发出一个扩展性极好的应用,底层rpc,worker之间冗余,数据分流之类的动作完全不用考虑。
- 扩展性极好, 当一级处理单元速度,直接配置一下并发数,即可线性扩展性能
- 健壮, 当worker失效或机器出现故障时, 自动分配新的worker替换失效worker
- 数据准确性, 可以采用Acker机制,保证数据不丢失。 如果对精度有更多一步要求,采用事务机制,保证数据准确。
JStorm处理数据的方式是基于消息的流水线处理, 因此特别适合无状态计算,也就是计算单元的依赖的数据全部在接受的消息中可以找到, 并且最好一个数据流不依赖另外一个数据流。
因此,常常用于
- 日志分析,从日志中分析出特定的数据,并将分析的结果存入外部存储器如数据库。目前,主流日志分析技术就使用JStorm或Storm
- 管道系统, 将一个数据从一个系统传输到另外一个系统, 比如将数据库同步到Hadoop
- 消息转化器, 将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件
- 统计分析器, 从日志或消息中,提炼出某个字段,然后做count或sum计算,最后将统计值存入外部存储器。中间处理过程可能更复杂。
- 从Downloads下载relase包
- 搭建Zookeeper集群
- 安装Python 2.6
- 安装Java
- 安装zeromq
- 安装Jzmq
- 配置$JSTORM_HOME/conf/storm.yaml
- 搭建web ui
- 启动JStorm集群
搭建Zookeeper集群
本处不细描叙Zookeeper安装步骤
- 安装步骤麻烦参考”zookeeper 安装步骤“
- zookeeper配置麻烦参考“zookeeper 配置介绍”
搭建JStorm集群
安装python 2.6
- 如果当前系统提供python,可以不用安装python
- 自己可以参考python
- 也可以使用https://github.com/utahta/pythonbrew来安装python > curl -kLhttp://xrl.us/pythonbrewinstall| bash
-s $HOME/.pythonbrew/etc/bashrc&& source $HOME/.pythonbrew/etc/bashrc
pythonbrew install 2.6.7
pythonbrew switch 2.6.7
安装java
注意,如果当前系统是64位系统,则需要下载java 64位,如果是32为系统,则下载32位java
安装zeromq(如果不使用zeromq, 可以不安装zeromq)
wgethttp://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./configure
make
sudo make install
sudo ldconfig
如果没有root权限,或当前用户无sudo权限时,执行 “ ./configure --prefix=/home/xxxxx” 替换 “./configure”, 其中/home/xxxx 为安装目标目录
安装jzmq(如果不使用zeromq, 可以不安装jzmq)
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install
如果没有root权限,或当前用户无sudo权限时,执行 “ ./configure --prefix=/home/xxxx --with-zeromq=/home/xxxx” 替换 “./configure”, 其中/home/xxxx 为安装目标目录
安装JStorm
假设以jstorm-0.9.3.zip为例
unzip jstorm-0.9.3.zip
vi ~/.bashrc
export JSTORM_HOME=/XXXXX/XXXX
export PATH=$PATH:$JSTORM_HOME/bin
配置$JSTORM_HOME/conf/storm.yaml
配置项:
- storm.zookeeper.servers: 表示zookeeper 的地址,
- nimbus.host: 表示nimbus的地址
- storm.zookeeper.root: 表示jstorm在zookeeper中的根目录,当多个JStorm共享一个ZOOKEEPER时,需要设置该选项,默认即为“/jstorm”
- storm.local.dir: 表示jstorm临时数据存放目录,需要保证jstorm程序对该目录有写权限
- java.library.path: zeromq 和java zeromq library的安装目录,默认"/usr/local/lib:/opt/local/lib:/usr/lib"
- supervisor.slots.ports: 表示supervisor 提供的端口slot列表,注意不要和其他端口发生冲突,默认是68xx,而storm的是67xx
- supervisor.disk.slot: 表示提供数据目录,当一台机器有多块磁盘时,可以提供磁盘读写slot,方便有重IO操作的应用。
- topology.enable.classloader: false, 默认关闭classloader,如果应用的jar与jstorm的依赖的jar发生冲突,比如应用使用thrift9,但jstorm使用thrift7时,就需要打开classloader
- nimbus.groupfile.path: 如果需要做资源隔离,比如数据仓库使用多少资源,技术部使用多少资源,无线部门使用多少资源时,就需要打开分组功能, 设置一个配置文件的绝对路径,改配置文件如源码中group_file.ini所示
- storm.local.dir: jstorm使用的本地临时目录,如果一台机器同时运行storm和jstorm的话, 则不要共用一个目录,必须将二者分离开
在提交jar的节点上执行:
安装JStorm web ui
必须使用tomcat 7.0 或以上版本, 注意不要忘记拷贝~/.jstorm/storm.yaml
web ui 可以和nimbus不在同一个节点
mkdir ~/.jstorm
cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm
下载tomcat 7.x (以apache-tomcat-7.0.37 为例)
tar -xzf apache-tomcat-7.0.37.tar.gz
cd apache-tomcat-7.0.37
cd webapps
cp $JSTORM_HOME/jstorm-ui-0.9.3.war ./
mv ROOT ROOT.old
ln -s jstorm-ui-0.9.3 ROOT
cd ../bin
./startup.sh
启动JStorm
- 在nimbus 节点上执行 “nohup jstorm nimbus &”, 查看$JSTORM_HOME/logs/nimbus.log检查有无错误
- 在supervisor节点上执行 “nohup jstorm supervisor &”, 查看$JSTORM_HOME/logs/supervisor.log检查有无错误
在JStorm中有对于流stream的抽象,流是一个不间断的无界的连续tuple,注意JStorm在建模事件流时,把流中的事件抽象为tuple即元组,后面会解释JStorm中如何使用tuple。
Spout/BoltJStorm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头抽象为spout,spout可能是连接消息中间件(如MetaQ, Kafka, TBNotify等),并不断发出消息,也可能是从某个队列中不断读取队列元素并装配为tuple发射。
有了源头即spout也就是有了stream,那么该如何处理stream内的tuple呢,同样的思想JStorm将tuple的中间处理过程抽象为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout(管口)再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。
我们可以认为spout就是一个一个的水龙头,并且每个水龙头里流出的水是不同的,我们想拿到哪种水就拧开哪个水龙头,然后使用管道将水龙头的水导向到一个水处理器(bolt),水处理器处理后再使用管道导向另一个处理器或者存入容器中。
Topology对应上文的介绍,我们可以很容易的理解这幅图,这是一张有向无环图,JStorm将这个图抽象为Topology即拓扑(的确,拓扑结构是有向无环的),拓扑是Jstorm中最高层次的一个抽象概念,它可以被提交到Jstorm集群执行,一个拓扑就是一个数据流转换图,图中每个节点是一个spout或者bolt,图中的边表示bolt订阅了哪些流,当spout或者bolt发送元组到流时,它就发送元组到每个订阅了该流的bolt(这就意味着不需要我们手工拉管道,只要预先订阅,spout就会将流发到适当bolt上)。 插个位置说下Jstorm的topology实现,为了做实时计算,我们需要设计一个拓扑图,并实现其中的Bolt处理细节,JStorm中拓扑定义仅仅是一些Thrift结构体,这样一来我们就可以使用其他语言来创建和提交拓扑。
TupleJStorm将流中数据抽象为tuple,一个tuple就是一个值列表value list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。
Worker/TaskWorker和Task是JStorm中任务的执行单元, 一个worker表示一个进程,一个task表示一个线程, 一个worker可以运行多个task。
资源slot在JStorm中,资源类型分为4种, CPU, Memory,Disk, Port, 不再局限于Storm的port。 即一个supervisor可以提供多少个CPU slot,多少个Memory slot, 多少个Disk slot, 多少个Port slot
- 一个worker就消耗一个Port slot, 默认一个task会消耗一个CPU slot和一个Memory slot
- 当task执行任务较重时,可以申请更多的CPU slot,
- 当task需要更多内存时,可以申请更多的内存slot,
- 当task 磁盘读写较多时,可以申请磁盘slot,则该磁盘slot给该task独享。
生成Topology
IRichSpout 为最简单的Spout接口
其中注意:
- spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
- spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
- open是当task起来后执行的初始化动作
- close是当task被shutdown后执行的动作
- activate 是当task被激活时,触发的动作
- deactivate 是task被deactive时,触发的动作
- nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
- ack, 当spout收到一条ack消息时,触发的动作,详情可以参考ack机制
- fail, 当spout收到一条fail消息时,触发的动作,详情可以参考ack机制
- declareOutputFields, 定义spout发送数据,每个字段的含义
- getComponentConfiguration 获取本spout的component 配置
其中注意:
- bolt对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
- bolt可以有构造函数,但构造函数只执行一次,是在提交任务时,创建bolt对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将bolt序列化到文件中去,在worker起来时再将bolt从文件中反序列化出来)。
- prepare是当task起来后执行的初始化动作
- cleanup是当task被shutdown后执行的动作
- execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。 ** 在executor中,当程序处理一条消息时,需要执行collector.ack, 详情可以参考ack机制** 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail ,详情可以参考ack机制
- declareOutputFields, 定义bolt发送数据,每个字段的含义
- getComponentConfiguration 获取本bolt的component 配置
在Maven中配置
如果找不到jstorm-client和jstorm-client-extension包,可以自己下载jstorm源码进行编译,请参考源码编译
打包时,需要将所有依赖打入到一个包中
jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter
- xxxx.jar 为打包后的jar
- com.alibaba.xxxx.xx 为入口类,即提交任务的类
- parameter即为提交参数
JStorm VS Storm 请参看JStorm 0.9.0 介绍.pptx
JStorm 比Storm更稳定,更强大,更快, storm上跑的程序,一行代码不变可以运行在jstorm上。
Flume是一个成熟的系统,主要focus在管道上,将数据从一个数据源传输到另外一个数据源, 系统提供大量现成的插件做管道作用。当然也可以做一些计算和分析,但插件的开发没有Jstorm便捷和迅速。
S4就是一个半成品,健壮性还可以,但数据准确性较糟糕,无法保证数据不丢失,这个特性让S4 大受限制,也导致了S4开源很多年,但发展一直不是很迅速。
AKKA是一个actor模型,也是一个不错的系统,在这个actor模型基本上,你想做任何事情都没有问题,但问题是你需要做更多的工作,topology怎么生成,怎么序列化。数据怎么流(随机,还是group by)等等。
Spark是一个轻量的内存MR, 更偏重批量数据处理
0.9.0 性能测试 JStorm 0.9.0 性能非常的好, 使用netty时单worker 发送最大速度为11万QPS, 使用zeromq时,最大速度为12万QPS.
- JStorm 0.9.0 在使用Netty的情况下,比Storm 0.9.0 使用netty情况下,快10%, 并且JStorm netty是稳定的而Storm 的Netty是不稳定的
- 在使用ZeroMQ的情况下, JStorm 0.9.0 比Storm 0.9.0 快30%
- Zeromq 减少一次内存拷贝
- 增加反序列化线程
- 重写采样代码,大幅减少采样影响
- 优化ack代码
- 优化缓冲map性能
- Java 比clojure更底层
测试样例
测试样例为https://github.com/longdafeng/storm-examples
测试环境
5 台 16核, 98G 物理机
测试结果
- JStorm with netty, Spout 发送QPS 为 11万
- storm with netty, Spout 应用发送QPS 为 10万 (截图为上层应用的QPS, 没有包括发送到ack的QPS, Spout发送QPS 正好为上层应用QPS的2倍)
- JStorm with zeromq, Spout 发送QPS 为12万
- Storm with zeromq, Spout 发送QPS 为9万(截图为上层应用的QPS, 没有包括发送到ack的QPS, Spout发送QPS 正好为上层应用QPS的2倍)
cgroups是control groups的缩写,是Linux内核提供的一种可以限制, 记录, 隔离进程组(process groups)所使用的物理资源(如:cpu,memory,IO 等等)的机制。
在Jstorm中,我们使用cgroup进行cpu硬件资源的管理。使用前,需要做如下检查和配置。
- 检查/etc/passwd 文件中当前用户的uid和gid, 假设当前用户是admin, 则看/etc/passwd文件中admin的uid和gid是多少
cgroup功能在当前系统的内核版本是否支持
检查/etc/cgconfig.conf是否存在。如果不存在, 请“yum install libcgroup”,如果存在,设置cpu子系统的挂载目录位置, 以及修改该配置文件中相应的uid/gid为启动jstorm用户的uid/gid, 本例子中以500为例, 注意是根据第一步来进行设置的。
- 然后启动cgroup服务
Note: cgconfig.conf只能在root模式下修改。
或者直接执行命令
这是一个cgconfig.conf配置文件例子。比如jstorm的启动用户为admin,admin在当前 系统的uid/gid为500(查看/etc/passwd 可以查看到uid和gid),那么相对应cpu子系统的jstorm目录uid/gid也需要设置为相同的值。 以便jstorm有相应权限可以在这个目录下为jstorm的每个需要进行资源隔离的进程创建对应 的目录和进行相关设置。
. 在jstorm配置文件中打开cgroup, 配置storm.yaml
supervisor.enable.cgroup: true
参考性能优化
资源不够当报告 ”No supervisor resource is enough for component “, 则意味着资源不够 如果是仅仅是测试环境,可以将supervisor的cpu 和memory slot设置大,
在jstorm中, 一个task默认会消耗一个cpu slot和一个memory slot, 而一台机器上默认的cpu slot是(cpu 核数 -1), memory slot数(物理内存大小 * 75%/1g), 如果一个worker上运行task比较多时,需要将memory slot size设小(默认是1G), 比如512M, memory.slot.per.size: 535298048
所有spout,bolt,configuration, 发送的消息(Tuple)都必须实现Serializable, 否则就会出现序列化错误.
如果是spout或bolt的成员变量没有实现Serializable时,但又必须使用时, 可以对该变量申明时,增加transient 修饰符, 然后在open或prepare时,进行实例化
Log4j 冲突0.9.0 开始,JStorm依旧使用Log4J,但storm使用Logbak,因此应用程序如果有依赖log4j-over-slf4j.jar, 则需要exclude 所有log4j-over-slf4j.jar依赖,下个版本将自定义classloader,就不用担心这个问题。
如果应用程序使用和JStorm相同的jar 但版本不一样时,建议打开classloader, 修改配置文件
topology.enable.classloader: true
或者
ConfigExtension.setEnableTopologyClassLoader(conf, true);
JStorm默认是关掉classloader,因此JStorm会强制使用JStorm依赖的jar
提交任务后,等待几分钟后,web ui始终没有显示对应的task有3种情况:
用户程序初始化太慢
如果有用户程序的日志输出,则表明是用户的初始化太慢或者出错,查看日志即可。 另外对于MetaQ 1.x的应用程序,Spout会recover ~/.meta_recover/目录下文件,可以直接删除这些消费失败的问题,加速启动。
通常是用户jar冲突或初始化发生问题
打开supervisor 日志,找出启动worker命令,单独执行,然后检查是否有问题。类似下图:
检查是不是storm和jstorm使用相同的本地目录
检查配置项 ”storm.local.dir“, 是不是storm和jstorm使用相同的本地目录,如果相同,则将二者分开
提示端口被绑定有2种情况:
多个worker抢占一个端口
假设是6800 端口被占, 可以执行命令 “ps -ef|grep 6800” 检查是否有多个进程, 如果有多个进程,则手动杀死他们
系统打开太多的connection
Linux对外连接端口数限制,TCP client对外发起连接数达到28000左右时,就开始大量抛异常,需要
# echo "10000 65535" > /proc/sys/net/ipv4/ip_local_port_range