Storm是什么
朋友们可能不知道storm是什么, storm是一款免费且开源的分布式实时计算系统,类似于hadoop
通过这篇文章,可以让你学到怎样创建storm topologies, 并部署他们到storm cluster里面。
Storm cluster的组成
一个storm cluster类似于hadoop cluster. 在hadoop上执行的是“MapReduce Jobs”, 在storm里面,你执行的叫“topolgies jobs”. 他们之间有一个关键的不同是: MapReduce job会最终完成,但是topology会一直处理消息除非你kill掉这个job。
在storm cluster里面有2种node: master node 和 worker node. master node上面有一个叫”Nimbus”的守护进程,类似于Hadoop上的“JobTracker”. Nimbus进程的主要负责是分发代码到cluster,分配任务,监控故障。
每一个worker node上面有一个叫“supervisor”的守护进程,它负责监听从Nimbus分配来的任务,按需要启动和终止这些任务。每一个worker进程执行一个topology的子集; 一个执行的topology由横跨很多机器的worker进程组成。
Nimbus和Supervisor的所有协调都由zookeepe cluster完成,而且 nimbus和supervisor守护进程都是故障自动修复和无状态的; 所有状态都保存在zookeeper或者是本地磁盘上。 也就是说,你可以kill -9 Nimbus或者supervisor进程,他们会自动启用备份方案,这种设计使得storm cluster非常的稳定。大家可能不是很清楚zookeeper, 阿虎可以给大家举个简单的例子:假如zookeeper cluster里面有三台机器A,B,C, 你向任何一台机器增删改数据,其他2台的机器的数据会自动更新同步,保证数据一致。为了保持数据一致性,zookeeper框架在分布式应用里会经常用到,例如Hadoop。
Topologies
想要做实时计算,你需要创建topologies。 一个topology就是一个计算图。 topology里面的每个节点都包含了处理逻辑,节点之间的连接(表示节点之间传递数据的流向)。
执行一个topology很简单。 首先,把你的代码和对应的依赖打包成一个jar包。 然后,执行下面这个命令
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
命令会执行backtype.storm.MyTopology这个类,并传人了2个参数 arg1,arg2。 这个类的主要功能是定义topology,提交这个topology到Nimbus。 storm jar负责连接Nimbus和上传你的jar包。 上面这种方式基于JVM是最简单的上传方式,当然你也可以利用其它语言创建和上传topology。
Streams
storm种核心的抽象是stream, 一个stream是一个无边界的元组序列。 storm提供了转换一个stream到一个新stream的原语。
最基本的stream转换是“spouts”和”bolts”. 它们有你实现的接口去执行特定的逻辑。
spout是stream的源头。例如,一个spout可能从Kestrel队列里面读取一些元组,然后分发出去作为一个stream. 一个spout也可能是连接到twitter的API,分发推文的一个steam.
bolt 消费任何输入stream、会做一些处理,还可能发出新的stream。 bolt能做很多事情,例如执行一个函数,过滤元组,stream的聚合,stream的join,连接数据库等等。
spouts和bolts的网络被打包成了一个“topology”, 它是你提交给storm cluster执行的最高的抽象。 一个topology是一个stream转换图,每一个节点就是spout或者bolt. 图里面连线指明了哪些bolts订阅了哪些stream. 当一个spout或者bolt发射一个元组到stream里面时,它实际上把这个元组发送给了所有订阅这个stream的每个bolt.
每一个节点都是可以并行的执行,可以在topology里面指定每个节点的并行度(阿虎: 就是说Bolt里面可以开多个线程跑)。
一个topology会一直执行,除非你kill它。storm会自动重新分配失败的任务,并且还会保证即使机器当掉和消息丢掉,也会没有数据丢失。
数据模型
storm使用元组作为它的数据模型,元组就是一个命名的数据列表,元组种的字段可以是任何类型的对象。storm支持所有基本的数据类型,strings, byte数组作为元组的字段值。也可以实现serializer创建自己的对象类型。
每个节点必须声明输出字段。例如下面的例子声明了2个元组,字段是“double”和“triple”
public class DoubleAndTripleBolt extends BaseRichBolt { private OutputCollectorBase _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); } }
一个简单的topology
让我们来看一个简单的topology
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
这个topology里面包含了一个spout和2个bolts。 spout发射words, 每一个bolt在它的输入里面添加字符串”!!!”, 如果spout发射一个元组 [“bob”] 和 [“john”], 第二个bolt就会发射[“bob!!!!!!”] 和[“john!!!!!!”].
这个topology使用setSpout和setBolt方法定义节点。方法的三个参数分别是:用户指定的ID,处理逻辑对象和这个节点的并行度。words,exclaim1,exclaim2都是用户ID。
其中TestWordSpout必须要实现 IRichSpout接口,ExclamationBolt必须要实现IRichBolt接口。
最后一个参数是可选的,它指明了会有多少个线程执行那个组件,如果你不指定,storm默认分配一个线程给那个节点。
setBolt
方法返回一个 InputDeclarer对象,它用来定义Bolt的输入。在这里exclaim1 声明了它想读取所有words发出的元组; exclaim2 声明了它想读取所有exclaim1 发射的元组。“shuffle grouping” 意思是元组应该随机的分发给bolt的任务。如果你想让exclaim2读取words和exclaim1的元组,可以这样定义exclaim2
builder.setBolt("exclaim2", new ExclamationBolt(), 5) .shuffleGrouping("words") .shuffleGrouping("exclaim1");
让我们更深入的看看spout和bolt的实现,spout的职责是给topology发送新的消息。TestWordSpout
每100ms随机从列表[“nathan”, “mike”, “jackson”, “golda”, “bertels”]中选一个单词作为一个元组发射,下面是nextTuple()方法的实现:
void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }
ExclamationBol在它的输入中添加一个字符串 “!!!”, 下面是完整的实现:
public static class ExclamationBolt implements IRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public Map getComponentConfiguration() { return null; } }
prepare方法简单的保存OutputCollector
以备在excute方法中使用,OutputCollector
是用来从当前这个bolt发射元组用的。
execute方法就是从一个bolt里面获取元组,把元组的第一个值 添加“!!!” ,然后发射出去。 如果你想知道这个元组来自哪个源头,可以使用Tuple#getSourceComponent
方法获取。
emit和ack方法保证了数据的不丢失,至于为什么,后面解释!
当bolt停止的时候会调用cleanup方法,释放一些打开的资源。注意:这个方法不保证在cluster里面一定会执行,在本地模式还是可以使用。
declareOutputFields
方法声明了ExclamationBolt
发射一个叫“word”的元组。
cleanup
和
getComponentConfiguration
通常不需要实现,可以直接继承BaseRichBolt实现一些基本的功能。
本地模式执行ExclamationTopology
storm有2种执行模式:本地模式和分布式模式。本地模式下,storm完全通过线程来模拟worker节点,本地模式下可以很好的调试和开发,可以参考下面的资料了解详细
Local mode: http://storm.incubator.apache.org/documentation/Local-mode.html
cluster mode: http://storm.incubator.apache.org/documentation/Running-topologies-on-a-production-cluster.html
下面是在本地模式执行ExclamationTopology
的代码片段
Config conf = new Config(); conf.setDebug(true); //告诉storm要log每一条message,本地模式调试比较有用。 conf.setNumWorkers(2); //指定2个workers, 注意:每个worker上面跑多少个线程是由setBolt和setSpout指定 LocalCluster cluster = new LocalCluster(); //定义一个cluster cluster.submitTopology("test", conf, builder.createTopology()); //提交topology Utils.sleep(10000); cluster.killTopology(“test”); cluster.shutdown();
Stream groupings
本节是讲述一个topology是怎样在组件之间传输元组数据的,记住一点,spout和bolt都是并行的执行,下面的图展示了一个topology在任务级别的数据传输图:
其中一个空心圆代表一个task,即一个线程,每个组件可能会有多个线程,从图中可以看到storm的数据会拷贝多份在不同组件中传输。
文章转载自:http://www.ahuoo.com/?p=1556