转载

Storm集群的DRPC模式

欲速则不达

积土成山,风雨兴焉;积水成渊,蛟龙生焉;积善成德,而神明自得,圣心备焉。故不积跬步,无以至千里;不积小流,无以成江海。骐骥一跃,不能十步;驽马十驾,功在不舍。锲而舍之,朽木不折;锲而不舍,金石可镂。蚓无爪牙之利,筋骨之强,上食埃土 ,下饮黄泉 ,用心一也。蟹六跪而二螯,非蛇鳝之穴无可寄托者,用心躁也。

博客园 首页 博问 闪存 新随笔 联系 订阅 Storm集群的DRPC模式 管理

随笔-18  评论-2  文章-0  trackbacks-0

Storm集群的DRPC模式

storm的DRPC模式的作用是实现从远程调用storm集群的计算资源,而不需要连接到集群的某一个节点。OK。那么storm实现DRPC主要是使用LinearDRPCTopologyBuilder这个类。下面就先来看看一个简单的例子,它的源码的github上。

import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;   public class BasicDRPCTopology {   public static class ExclaimBolt extends BaseBasicBolt {     //主要需要覆写execute方法和declareoutputfields方法     @Override     public void execute(Tuple tuple, BasicOutputCollector collector) {       String input = tuple.getString(1);       collector.emit(new Values(tuple.getValue(0), input + "!"));     }      @Override     public void declareOutputFields(OutputFieldsDeclarer declarer) {       declarer.declare(new Fields("id", "result"));     }    }    public static void main(String[] args) throws Exception {     LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");//实现DRPC模式     builder.addBolt(new ExclaimBolt(), 3);      Config conf = new Config();      if (args == null || args.length == 0) {       LocalDRPC drpc = new LocalDRPC();       LocalCluster cluster = new LocalCluster();        cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));        for (String word : new String[]{ "hello", "goodbye" }) {         System.out.println("Result for /"" + word + "/": " + drpc.execute("exclamation", word));       }        cluster.shutdown();       drpc.shutdown();     }     else {       conf.setNumWorkers(3);       StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());     }   } }

这段代码主要实现的功能是给接收到的每一个输入后面添加一个感叹号。ok,这样就可以编译提交了。

不过在这之前需要先配置storm集群的drpc server的ip。如图。主要是下面的server的ip需要配置好。并且集群的每一个节点的配置文件都需要配置这项参数!

Storm集群的DRPC模式

然后即可使用storm drpc &命令启动drpc模式。(这里的分工是172.17.150.6为客户端,其余的172.17.150.7(.8,.11)为集群的三个节点,.11是nimbus节点。)

OK,那接下来就使用客户端向集群提交Topology。如图。使用客户端向集群提交名为exclaim的Topology。里面设置的worker数为3。

Storm集群的DRPC模式

从下图可以看到两个supervisor分别有一个是运行两个worker,有一个是运行一个worker。

Storm集群的DRPC模式

Storm集群的DRPC模式

ok,下面是客户端调用远程资源进行计算的程序。主要是声明DRPCClient的ip以及端口,以及指定执行的方法名和传入的参数(client.execute("exclamation",word))。

Storm集群的DRPC模式

运行结果如下。

Storm集群的DRPC模式

OK,整个DRPC的过程就是这样。

谢谢大家!本人水平有限,请不吝指正!

posted on 2015-03-07 17:22 Uber 阅读( ... ) 评论( ... )编辑 收藏

刷新评论刷新页面返回顶部

博客园首页 博问 新闻 闪存 程序员招聘 知识库

正文到此结束
Loading...