Storm入门基础

Storm是什么

朋友们可能不知道storm是什么, storm是一款免费且开源的分布式实时计算系统,类似于hadoop

通过这篇文章,可以让你学到怎样创建storm topologies, 并部署他们到storm cluster里面。

Storm cluster的组成

一个storm cluster类似于hadoop cluster. 在hadoop上执行的是“MapReduce Jobs”, 在storm里面,你执行的叫“topolgies jobs”. 他们之间有一个关键的不同是: MapReduce job会最终完成,但是topology会一直处理消息除非你kill掉这个job。

 

在storm cluster里面有2种node:  master node 和 worker node.   master node上面有一个叫”Nimbus”的守护进程,类似于Hadoop上的“JobTracker”.  Nimbus进程的主要负责是分发代码到cluster,分配任务,监控故障。

每一个worker node上面有一个叫“supervisor”的守护进程,它负责监听从Nimbus分配来的任务,按需要启动和终止这些任务。每一个worker进程执行一个topology的子集; 一个执行的topology由横跨很多机器的worker进程组成。

storm-cluster

 

 

 

 

 

 

 

 

Nimbus和Supervisor的所有协调都由zookeepe cluster完成,而且 nimbus和supervisor守护进程都是故障自动修复和无状态的; 所有状态都保存在zookeeper或者是本地磁盘上。 也就是说,你可以kill -9 Nimbus或者supervisor进程,他们会自动启用备份方案,这种设计使得storm cluster非常的稳定。大家可能不是很清楚zookeeper, 阿虎可以给大家举个简单的例子:假如zookeeper cluster里面有三台机器A,B,C, 你向任何一台机器增删改数据,其他2台的机器的数据会自动更新同步,保证数据一致。为了保持数据一致性,zookeeper框架在分布式应用里会经常用到,例如Hadoop。

 

Topologies

想要做实时计算,你需要创建topologies。 一个topology就是一个计算图。 topology里面的每个节点都包含了处理逻辑,节点之间的连接(表示节点之间传递数据的流向)。

执行一个topology很简单。 首先,把你的代码和对应的依赖打包成一个jar包。 然后,执行下面这个命令

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

命令会执行backtype.storm.MyTopology这个类,并传人了2个参数 arg1,arg2。 这个类的主要功能是定义topology,提交这个topology到Nimbus。 storm jar负责连接Nimbus和上传你的jar包。 上面这种方式基于JVM是最简单的上传方式,当然你也可以利用其它语言创建和上传topology。

 

 Streams

storm种核心的抽象是stream, 一个stream是一个无边界的元组序列。 storm提供了转换一个stream到一个新stream的原语。

最基本的stream转换是“spouts”和”bolts”. 它们有你实现的接口去执行特定的逻辑。

spout是stream的源头。例如,一个spout可能从Kestrel队列里面读取一些元组,然后分发出去作为一个stream.   一个spout也可能是连接到twitter的API,分发推文的一个steam.

bolt 消费任何输入stream、会做一些处理,还可能发出新的stream。 bolt能做很多事情,例如执行一个函数,过滤元组,stream的聚合,stream的join,连接数据库等等。

spouts和bolts的网络被打包成了一个“topology”, 它是你提交给storm cluster执行的最高的抽象。 一个topology是一个stream转换图,每一个节点就是spout或者bolt.  图里面连线指明了哪些bolts订阅了哪些stream. 当一个spout或者bolt发射一个元组到stream里面时,它实际上把这个元组发送给了所有订阅这个stream的每个bolt.

 

topology

 

 

 

 

 

 

 

 

每一个节点都是可以并行的执行,可以在topology里面指定每个节点的并行度(阿虎: 就是说Bolt里面可以开多个线程跑)。

一个topology会一直执行,除非你kill它。storm会自动重新分配失败的任务,并且还会保证即使机器当掉和消息丢掉,也会没有数据丢失。

数据模型

storm使用元组作为它的数据模型,元组就是一个命名的数据列表,元组种的字段可以是任何类型的对象。storm支持所有基本的数据类型,strings, byte数组作为元组的字段值。也可以实现serializer创建自己的对象类型。

每个节点必须声明输出字段。例如下面的例子声明了2个元组,字段是“double”和“triple”

public class DoubleAndTripleBolt extends BaseRichBolt { private OutputCollectorBase _collector;

@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
    _collector = collector;
}

@Override
public void execute(Tuple input) {
    int val = input.getInteger(0);        
    _collector.emit(input, new Values(val*2, val*3));
    _collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("double", "triple"));
}     
}

 

一个简单的topology

让我们来看一个简单的topology

 

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("words", new TestWordSpout(), 10);

builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("words");

builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

 

这个topology里面包含了一个spout和2个bolts。 spout发射words, 每一个bolt在它的输入里面添加字符串”!!!”,  如果spout发射一个元组 [“bob”] 和 [“john”], 第二个bolt就会发射[“bob!!!!!!”] 和[“john!!!!!!”].

这个topology使用setSpout和setBolt方法定义节点。方法的三个参数分别是:用户指定的ID,处理逻辑对象和这个节点的并行度。words,exclaim1,exclaim2都是用户ID。

其中TestWordSpout必须要实现 IRichSpout接口,ExclamationBolt必须要实现IRichBolt接口。

最后一个参数是可选的,它指明了会有多少个线程执行那个组件,如果你不指定,storm默认分配一个线程给那个节点。

setBolt方法返回一个 InputDeclarer对象,它用来定义Bolt的输入。在这里exclaim1 声明了它想读取所有words发出的元组;  exclaim2 声明了它想读取所有exclaim1 发射的元组。“shuffle grouping” 意思是元组应该随机的分发给bolt的任务。如果你想让exclaim2读取words和exclaim1的元组,可以这样定义exclaim2

 

builder.setBolt("exclaim2", new ExclamationBolt(), 5) .shuffleGrouping("words") .shuffleGrouping("exclaim1");

 

让我们更深入的看看spout和bolt的实现,spout的职责是给topology发送新的消息。TestWordSpout每100ms随机从列表[“nathan”, “mike”, “jackson”, “golda”, “bertels”]中选一个单词作为一个元组发射,下面是nextTuple()方法的实现:

void nextTuple() {

   Utils.sleep(100);

   final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};

   final Random rand = new Random();

   final String word = words[rand.nextInt(words.length)];

   _collector.emit(new Values(word));

}

ExclamationBol在它的输入中添加一个字符串 “!!!”, 下面是完整的实现:

public static class ExclamationBolt implements IRichBolt { 

OutputCollector _collector;

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    _collector = collector;
}

public void execute(Tuple tuple) {
    _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    _collector.ack(tuple);
}

public void cleanup() {
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
}

public Map getComponentConfiguration() {
    return null;
} 
}

prepare方法简单的保存OutputCollector 以备在excute方法中使用,OutputCollector是用来从当前这个bolt发射元组用的。

execute方法就是从一个bolt里面获取元组,把元组的第一个值 添加“!!!” ,然后发射出去。 如果你想知道这个元组来自哪个源头,可以使用Tuple#getSourceComponent方法获取。

emit和ack方法保证了数据的不丢失,至于为什么,后面解释!

当bolt停止的时候会调用cleanup方法,释放一些打开的资源。注意:这个方法不保证在cluster里面一定会执行,在本地模式还是可以使用。

declareOutputFields 方法声明了ExclamationBolt发射一个叫“word”的元组。

cleanupgetComponentConfiguration 通常不需要实现,可以直接继承BaseRichBolt实现一些基本的功能。

本地模式执行ExclamationTopology

storm有2种执行模式:本地模式和分布式模式。本地模式下,storm完全通过线程来模拟worker节点,本地模式下可以很好的调试和开发,可以参考下面的资料了解详细

Local mode: http://storm.incubator.apache.org/documentation/Local-mode.html

cluster mode:  http://storm.incubator.apache.org/documentation/Running-topologies-on-a-production-cluster.html

下面是在本地模式执行ExclamationTopology 的代码片段

Config conf = new Config(); 
conf.setDebug(true); //告诉storm要log每一条message,本地模式调试比较有用。
conf.setNumWorkers(2); //指定2个workers, 注意:每个worker上面跑多少个线程是由setBolt和setSpout指定
LocalCluster cluster = new LocalCluster(); //定义一个cluster
cluster.submitTopology("test", conf, builder.createTopology()); //提交topology
Utils.sleep(10000); 
cluster.killTopology(“test”); 
cluster.shutdown();

 

Stream groupings

本节是讲述一个topology是怎样在组件之间传输元组数据的,记住一点,spout和bolt都是并行的执行,下面的图展示了一个topology在任务级别的数据传输图:

topology-tasks

 

 

 

 

 

 

 

其中一个空心圆代表一个task,即一个线程,每个组件可能会有多个线程,从图中可以看到storm的数据会拷贝多份在不同组件中传输。

 

文章转载自:http://www.ahuoo.com/?p=1556

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

To create code blocks or other preformatted text, indent by four spaces:

    This will be displayed in a monospaced font. The first four 
    spaces will be stripped off, but all other whitespace
    will be preserved.
    
    Markdown is turned off in code blocks:
     [This is not a link](http://example.com)

To create not a block, but an inline code span, use backticks:

Here is some inline `code`.

For more help see http://daringfireball.net/projects/markdown/syntax

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>