一、Storm简介
1.引例
在介绍Storm之前,我们先看一个日志统计的例子:假如我们想要根据用户的访问日志统计使用斗鱼客户端的用户的地域分布情况,一般情况下我们会分这几步:
取出访问日志中客户端的IP
把IP转换成对应地域
按照地域进行统计
Hadoop貌似就可以轻松搞定:
map做ip提取,转换成地域
reduce以地域为key聚合,计数统计
从HDFS取出结果
如果有时效性要求呢?
小时级:还行,每小时跑一个MapReduceJob
10分钟:还凑合能跑
5分钟:够呛了,等槽位可能要几分钟呢
1分钟:算了吧,启动Job就要几十秒呢
秒级:…要满足秒级别的数据统计需求,需要
进程常驻运行;
数据在内存中
Storm正好适合这种需求。
2.特性
Storm是一个分布式实时流式计算平台。主要特性如下:
简单的编程模型:类似于MapReduce降低了并行批处理复杂性,Storm降低了实时处理的复杂性,只需实现几个接口即可(Spout实现ISpout接口,Bolt实现IBolt接口)。
支持多种语言:你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
容错性:nimbus、supervisor都是无状态的,可以用kill-9来杀死Nimbus和Supervisor进程,然后再重启它们,任务照常进行;当worker失败后,supervisor会尝试在本机重启它。
分布式:计算是在多个线程、进程和服务器之间并行进行的。
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
可靠的消息处理:Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息(ack机制)。
快速、实时:Storm保证每个消息能能得到快速的处理。
3.与常用其他大数据计算平台对比
Stormvs.MapReduceStorm的一个拓扑常驻内存运行,MR作业运行完了进行就被kill了;storm是流式处理,MR是批处理;Storm数据在内存中不写磁盘,而MR会与磁盘进行交互;Storm的DAG(有向无环图)模型可以组合多个阶段,而MR只可以有MAP和REDUCE两个阶段。
Stormvs.SparkStreamingStorm处理的是每次传入的一条数据,SparkStreaming实际处理的是微批量数据。
二、Storm的架构和运行时原理
1.集群架构
如上图所示,一个典型的storm集群包含一个主控节点Nimbus,负责资源分配和任务调度;还有若干个子节点Supervisor,负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程;Nimbus和Supervisor之间的所有协调工作都是通过Zookeeper集群完成。
2.Storm的容错(FaultTolerance)机制
Nimbus和Supervisor进程被设计成快速失败(failfast)的(当遇到异常的情况,进程就会挂掉)并且是无状态的(状态都保存在Zookeeper或者在磁盘上)。
Nimbus与Supervisor本身也是无状态的,状态信息是由zookeeper存储(实现了高可用,当nimbus挂掉,可以找另外一个节点启动nimbus进程,状态信息从zookeeper获得)。
在Nimbus进程失败后,可以快速重启恢复正常工作,不需要很长的时间来进行初始化和状态恢复。
当Nimbus从zookeeper得知有supervisor节点挂掉,可以将该节点的任务重新分配给其他子节点。
Nimbus在“某种程度”上属于单点故障的。在实际中,即使Nimbus进程挂掉,也不会有灾难性的事情发生。
当Nimbus挂掉会怎样?
已经存在的拓扑可以继续正常运行,但是不能提交新拓扑;
正在运行的worker进程仍然可以继续工作。而且当worker挂掉,Supervisor会一直重启worker。
失败的任务不会被分配到其他机器(是Nimbus的职责)上了
当一个Supervisor(slave节点)挂掉会怎样?
分配到这台机器的所有任务(task)会超时,Nimbus会把这些任务(task)重新分配给其他机器。当一个worker挂掉会怎么样?
当一个worker挂掉,Supervisor会重启它。如果启动一直失败那么此时worker也就不能和Nimbus保持心跳了,Nimbus会重新分配worker到其他机器
3.Storm的编程模型
Strom在运行中可分为spout与bolt两个组件,其中,数据源从spout开始,数据以tuple的方式发送到bolt,多个bolt可以串连起来,一个bolt也可以接入多个spot/bolt。运行时Topology如下图:
编程模型的一些基本概念:
元组
storm使用tuple(元组)来作为它的数据模型。每个tuple由一堆域(field)组成,每个域有一个值,并且每个值可以是任何类型。
一个tuple可以看作一个没有方法的java对象。总体来看,storm支持所有的基本类型、字符串以及字节数组作为tuple的值类型。
Spout
i.BaseRichSpout是实现IRichSpout接口的类,对上述必要的方法有默认的实现;
ii.如果业务需要自定义ack()、fail()等方法,选择实现IRichSpout接口;
iii.如果业务没有自定义需求,选择继承BaseRichSpout类,可以不实现并不一定需要用户实现的方法,简化开发。
i.open方法是初始化动作。允许你在该spout初始化时做一些动作,传入了上下文,方便取上下文的一些数据。
ii.close方法在该spout关闭前执行。
iii.activate和deactivate:一个spout可以被暂时激活和关闭,这两个方法分别在对应的时刻被调用。
iv.nextTuple用来发射数据。Spout中最重要的方法。
v.ack(Object)传入的Object其实是一个id,唯一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。
vi.fail(Object)同ack,只不过是tuple处理失败时执行。
Spout是在一个topology中产生源数据流的组件。通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。Spout是一个主动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用此函数,用户只要在其中生成源数据即可。
实现Spout时,需要实现最顶层抽象ISpout接口里面的几个方法
实现Spout时,还需要实现I