SyncSpout简介 SyncSpout是上海华瑞银行(SHRB)大数据团队开发的,用来构造可交互的、同步的Storm拓扑的组件。我们在做实时推荐系统中,希望将Storm的并发性和分布式计算能力应用到“请求-响应”范式中, 比如客户的某次购买行为能够以消息的形式发送到storm拓扑中,storm在指定时间返回推荐结果,也就是说storm需要具有可交互性。基于这样的背景,大数据团队开发了SyncSpout组件, 该组件可以接收客户端异步的消息,经过Storm拓扑异步计算,在指定时间内返回给客户端。
// 创建客户端
val client = new SyncSpoutClient(topName)
// 初始化
client.init()
// 向远程storm集群发送消息,并在1000毫秒内返回,若超时则返回null指针
val syncResult = client.ask(ClientMsg("这是发送的消息,可以是任意类型"),1000).asInstanceOf[String]
println(s"返回消息是[$syncResult],可以是任意类型")
val builder = new TopologyBuilder()
// ActorSpout用于接收消息
builder.setSpout("syncSpout",SyncSpout(),2)
// SimpleBolt用于处理消息
builder.setBolt("simpleBolt",new SimpleBolt(),2).setNumTasks(4).shuffleGrouping("syncSpout")
// SendBolt用于返回消息
builder.setBolt("sendBolt",new SendBolt(),2).shuffleGrouping("simpleBolt")
val cluster = new LocalCluster()
val topName = "SyncSpoutTop"
val conf = new Config()
conf.setNumWorkers(2)
cluster.submitTopology(topName,conf,builder.createTopology())
println( "SyncSpout 启动成功!" )
E-MAIL: 365781062@qq.com
公司E-MAIL: wushaojie@shrbank.com
GitHub: https://github.com/shrbank/SyncSpout