转载

微博推荐的storm实践

1. 概述

“天下武功,唯快不破”。推荐系统也一样,用户的实时行为和实时兴趣对于推荐系统来说是一个非常重要的因素,对于提升推荐效果和用户惊喜度都有很大的促进作用。

用户对一条微博进行转,评,赞的行为,对另外一个用户的关注和取消关注,对于基于关系图谱和兴趣图谱的推荐和广告来说,都是非常重要的信息。

在微博内部,这些信息是通过firehose这个队列系统来进行分发和流动,不过这些消息如何进行分析和处理落地,就是各个业务方的事情。

面对每天数亿的流量,我们需要一个低延迟、 高性能、 分布式、 可扩展 、容错、可以抵抗高峰流量(比如“周一见”这样的事件)的分布式流处理系统。

我们除了有自己的分布式流处理系统RIN之外,也尝试过使用大热的开源实时处理系统storm,目前就有几个非常重要的业务就运行在storm集群之上。除了刚上线出过几次小问题,线上跑了一年多,运行正常,没再出过问题。

本篇文章从以下几个方面介绍storm在微博推荐中的实践:

  • storm的简介
  • storm的环境搭建
  • storm的监控
  • 一个简单的业务介绍

2. storm简介

Storm是twitter开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。Storm有很多使用场景:如实时分析,在线机器学习,持续计算,分布式 RPC,ETL等等。Storm支持水平扩展, 具有高容错性,保证每个消息都会得到处理,而且处理速度很快(在一个小集群中,每个结点每秒可以处理数以百万计的消息)。

2.1 基础部件

微博推荐的storm实践

1)Nimbus 负责在集群里面发送代码,分配工作给机器,并且监控状态。 全局只有一个。

2)Supervisor 会监听分配给它那台机器的工作,根据需要启动/关闭工作进程Worker。

3) Zookeeper是Storm重点依赖的外部资源。 Nimbus和Supervisor甚至实际运行的Worker都是把心跳保存在Zookeeper上的。

4)Storm ui 是storm的监控界面,能清楚的看到所有逻辑节点的处理情况。

Nimbus和Supervisor之间的所有协调工作都是通过一个Zookeeper集群来完成。并且,nimbus进程和supervisor都是快速失败(fail-fast)和无状态的。所有的状态要么在Zookeeper里面, 要么在本地磁盘上。这也就意味着你可以用kill -9来杀死nimbus和supervisor进程, 然后再重启它们,它们可以继续工作, 就好像什么都没有发生过似的。这个设计使得storm不可思议的稳定。详细原理可以参看参考文献【4】

2.2 逻辑单元

Storm提交运行的程序称为Topology,结构如下图所示:

微博推荐的storm实践

Topology由Spout和Bolt构成。Spout是发出Tuple的结点,Bolt可以随意订阅某个Spout或者Bolt发出的Tuple。

Tuple是Topology处理的最小的消息单位,也就是一个任意对象的数组。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型。总体来看,storm支持所有的基本类型、字符串以及字节数组作为tuple的值类型。你也可以使用你自己定义的类型来作为值类型, 只要你实现对应的序列化器(serializer)。

每个tuple都由两种状态:fail和ack。Storm里面有一类特殊的task称为acker, 他们负责跟踪spout发出的每一个tuple的tuple树。当acker发现一个tuple树已经处理完成了。它会发送一个消息给产生这个tuple的那个task。

3. storm的环境搭建

我们使用的环境均为CentOS release 5.4,内核版本2.6.18,需要安装zookeeper集群(3台),storm集群(4台),分别依赖zeromq,jzmq,java6等。

3.1 安装依赖包

即便有参考文献【5】和参考文献【6】这样详细的资料,搭建过程也并非一帆风顺,基本的步骤不再赘述,只补充几个中间出问题的地方。

1) 安装zeromq

cd zeromq-2.2.0

./configure --with-pgm

make

make install

sudo ldconfig

cannot link with -luuid, install uuid-dev.(出错)

【需要安装util-linux-2.21.1】

2)安装jzmq-master

cd /tmp/storm_softwares/jzmq-master

./autogen.sh

./configure

make

make install

autoreconf: aclocal failed with exit status: 255(出错)

autogen.sh: error: autoreconf exited with status 0

【需要安装比较新的perl版本,比如5.18.2】

3.2 修改storm.yaml配置文件

在安装完相应的依赖文件之后,我们需要集群中每台机器的storm.yaml文件,下方是我们supervisor-2节点的配置文件内容:

storm.zookeeper.servers:

- "10.10.10.10"

- "10.10.10.11"

- "10.10.10.12"

storm.zookeeper.port: 2181

nimbus.host: "10.10.10.14"

storm.local.dir: "/data1/storm"

storm.local.hostname: "supervisor-2"

supervisor.slots.ports:

- 6700

- 6701

- 6702

- 6703

- 6704

其中:

1)storm.zookeeper.servers和storm.zookeeper.port是zookeeper集群的ip和端口

2)nimbus.host是集群中nimbus的ip

3)storm.local.dir是存储nimbus和supervisor进程的一些状态信息,比如supervisor/stormdist/就存放着你的具体topology的信息,包括一些实时产生的日志。

4)storm.local.hostname指本机的hosename

5)supervisor.slots.ports是该节点可以运行的worker数量

3.3 启动

在Storm主控节点上运行 bin/storm nimbus >/dev/null 2>&1 &

在Storm各个工作节点上运行 bin/storm supervisor >/dev/null 2>&1 &

在Storm主控节点上运行 bin/storm ui >/dev/null 2>&1 &

Storm后台进程被启动后,将在Storm安装部署目录下的logs/子目录下生成各个进程的日志文件。确认无误之后,接下来,我们就可以通过storm jar 命令来向集群提交任务运行了。

4. storm的监控

虽然storm的ui的信息展示非常全面,但它毕竟是一个被动的信息展示页面,对于storm集群,我们还是需要做更加完善的监控和报警。

4.1 原则

1)Zookeeper是fail-fast的系统,只要出现什么错误就会退出,所以一定要监控

2)Zookeeper运行过程中会生成很多日志和快照文件,必须定期清理

3)对于每个进程都要有监控!storm是一个fail-fast系统,出现什么不可预知的错误的时候它都会退出的 。

4)除了对于storm的流量监控,还需要对业务的处理情况(qps,时延等)进行监控

4.2 主要功能

1)对zookeeper进行mock,看是否正常,如果down了需要及时重启。

2)监控supervisor数目是否正确,当supervisor挂掉的时候会发送警告。

3)监控nimbus是否正常运行,monitor会尝试连接nimbus,如果连接失败就认为nimbus挂掉。

4)监控topology是否正常运行,包括它是否正常部署,是否有运行中的任务。

5)对worker的日志进行统计,监控处理消息的时间与消息产生时间的时间间隔

4.3 实现方式

1)对于zookeeper,和woker的日志统计,可以通过一些简单shell程序来定期运行。

2)对于nimbus、topology和supervisor,基本的原理很简单,对supervisor和topology的监控是通过zookeeper来间接地监控,通过定期查看path是否存在。对nimbus的监控是每次起一个短连接连上去,连不上去即认为挂掉。我们便可以使用thrift的python客户端来获取cluster的信息,然后与监控和报警系统打通。代码就不贴了,可以看看参考文献【7】

5. 一个简单的业务介绍

对于一些特定的人,比如广告主,跟他相关的加减关注行为以及他的微博被转发、评论和赞都是非常重要的数据。目前有一个业务就是需要从微博的全站转发、评论、赞、加减关注的行为中,找到和这些人相关的行为供后续分析使用。

我们的topology设计下图(黑色数字为进程数):

微博推荐的storm实践

1)加关注、转发、评论和赞行为分别是四个不同的流源头,每个源头一个进程,将数据直接发给下游的Bolt;

2)每种行为的数据格式分别都不一样的,所以我们对每个来源的数据都进行特定的数据解析,以获取我们需要的信息tuple(操作人,被操作人,行为类型,行为时间等),传递给FilterBolt;

3) 我们只需要获取特定人相关的行为,所以通过FilterBolt进行相应的数据过滤操作;

4)sendBolt完成的功能就是将数据发送给下游业务方;

5)由于我们会记录行为的时间,所以并不在此进行严格的时序要求(比如加关注之立刻进行取消关注行为),统一都用shfflegrouping的方式进行消息的传递。

6. 结语

本文介绍了storm在微博推荐的一些实践工作,包括storm介绍、环境搭建、监控以及一个线上业务的设计。storm作为一个开源的分布式流处理系统,在逐步的迭代中也变得越来越稳定,并且有很多有志之士为其贡献自己的力量,比如前不久的集群空闲CPU飙高问题排查(详见参考文献【8】)。希望以后微博推荐也能更好的利用这一利器,有更加深入的实践和研究工作。时间有限,难免有错误和疏漏,有问题或者建议可以联系笔者 @夏村拓哉。

7. 参考资料

【1】《Storm Real-time Processing Cookbook》- Quinton Anderson

【2】《Getting Started with Storm》- Jonathan Leibiusky

【3】Implementing Real-Time Trending Topics with a Distributed Rolling Count Algorithm in Storm - Michael G

【4】http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/

【5】http://xumingming.sinaapp.com/138/twitter-storm入门/

【6】http://blog.linezing.com/?p=1892

【7】http://tutorials.github.io/pages/retrieving-storm-data-from-nimbus.html#.U1TXHtwTj4Z

【8】http://daiwa.ninja/index.php/2015/07/18/storm-cpu-overload/

正文到此结束
Loading...