我们按照 storm 规范开发的 spout 和 bolt 需要使用 TopologyBuilder 构建成有向无环图(拓扑),并指定消息的分组方式,然后提交给 storm 集群执行,本篇我们将分析 topology 的构建和提交过程。前面分析 storm 的编程接口时曾介绍过 StormTopology 这个 thrift 类,topology 在构建完成之后会封装成一个 StormTopology 对象,并通过 RPC 方法提交给 storm 集群的 nimbus 节点。
拓扑结构在 storm 集群中以 StormTopology 对象的形式表示,这是一个 thrift 类,其定义如下:
struct StormTopology {
1: required map<string, SpoutSpec> spouts; // topology 中的 spout 集合
2: required map<string, Bolt> bolts; // topology 中的 bolt 集合
3: required map<string, StateSpoutSpec> state_spouts; // topology 中的 state spout 集合
}
属性 spouts 的 key 是 spout 对应的 ID,value 是 SpoutSpec 类型对象,这也是一个 thrift 类,封装了 spout 的序列化 ComponentObject 对象和通用组件 ComponentCommon 对象。属性 bolts 的 key 是 bolt 对应的 ID,value 是 Bolt 类型对象,Bolt 同样是一个 thrfit 类,封装了 bolt 的序列化 ComponentObject 对象和通用组件 ComponentCommon 对象。
ComponentObject 是一个 thrift 联合类型(union),在这里主要使用了 serialized_java 字段记录组件的序列化值:
union ComponentObject {
1: binary serialized_java; // 序列化后的 java 对象
2: ShellComponent shell; // ShellComponent 对象
3: JavaObject java_object; // java 对象
}
ComponentCommon 是对组件的抽象表示,spout 和 bolt 在 topology 中统称为组件,topology 构建过程中会将 spout 和 bolt 都封装成为 ComponentCommon 对象:
struct ComponentCommon {
// 组件将从哪些 GlobalStreamId 以何种分组方式接收数据
1: required map<GlobalStreamId, Grouping> inputs;
// 组件要输出的所有流,key 是 streamId
2: required map<string, StreamInfo> streams;
// 组件并行度(即多少个线程),这些线程可能分布在不同的机器或进程空间中
3: optional i32 parallelism_hint;
// 组件相关配置项
4: optional string json_conf;
}
StormTopology 作为 thrift 类在编译成 java 实现时比较冗长,所以 storm 提供了 TopologyBuilder 构造器类来简化 topology 的构造,其使用形式一般如下:
public class WordCountTopology implements ComponentId, FieldName {
private static final String TOPOLOGY_NAME = "wordcount-topology";
public static void main(String[] args) throws Exception {
SentenceSpout sentenceSpout = new SentenceSpout();
SentenceSplitBolt sentenceSplitBolt = new SentenceSplitBolt();
WordCountBolt wordCountBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, sentenceSpout);
builder.setBolt(SENTENCE_SPLIT_BOLT_ID, sentenceSplitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(WORD_COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SENTENCE_SPLIT_BOLT_ID, new Fields(WORD));
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(WORD_COUNT_BOLT_ID);
Config config = new Config();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
TimeUnit.MINUTES.sleep(10);
localCluster.killTopology(TOPOLOGY_NAME);
localCluster.shutdown();
}
}
创建完 TopologyBuilder 实例之后,我们可以调用 setSpout
方法往 topology 中添加并设置 spout 组件,调用 setBolt
方法往 topology 中添加并设置 bolt 组件,并最后通过调用 createTopology
方法来完成 topology 的构建,该方法会返回一个 StormTopology 对象。TopologyBuilder 类中主要关注 3 个属性:
/** 记录拓扑范围内所有的 Bolt 对象 */ protected Map<String, IRichBolt> _bolts = new HashMap<>(); /** 记录拓扑范围内所有的 Spout 对象 */ protected Map<String, IRichSpout> _spouts = new HashMap<>(); /** 记录拓扑范围内封装所有的 Spout 和 Bolt 的 ComponentCommon 组件对象 */ protected Map<String, ComponentCommon> _commons = new HashMap<>();
下面来看一下 spout 和 bolt 的构造过程,即 setSpout
和 setBolt
方法,针对这两类方法,TopologyBuilder 都提供了多种重载版本,其中 setSpout
对应的底层实现如下:
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException {
// 保证 id 在 topology 范围内的全局唯一
this.validateUnusedId(id);
// 以 ComponentCommon 的形式封装组件,并记录到 _commons 属性中
this.initCommon(id, spout, parallelism_hint);
// 记录组件到 _spout 集合中
_spouts.put(id, spout);
return new SpoutGetter(id);
}
方法首先会验证 spoutId 在 topology 范围内的全局唯一性,即没有被已有的 spout 和 bolt 占用,否则会抛出 IllegalArgumentException 异常。 initCommon
方法会构造 spout 对应的 ComponentCommon 对象并记录到 _commons
属性中:
protected void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
// 设置并行度
if (parallelism != null) {
int dop = parallelism.intValue();
if (dop < 1) {
throw new IllegalArgumentException("Parallelism must be positive.");
}
common.set_parallelism_hint(dop); // 设置组件并行度
} else {
// 如果没有设置的话,默认设置并行度为 1
common.set_parallelism_hint(1);
}
// 获取组件相关的配置并以 json 的形式记录到 ComponentCommon 对象中
Map conf = component.getComponentConfiguration();
if (conf != null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}
最后将 spout 对象记录到 _spouts
属性中,并构造当前 spout 对应的 SpoutGetter 对象。
SpoutGetter 可以理解为 spout 对应的属性配置器,用于为当前 spout 加载通用的配置和设置私有的属性值,并最终将所有的配置项序列化为 json 格式记录到封装当前 spout 的 ComponentCommon 对象中(json_conf 属性)。
protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {
public SpoutGetter(String id) {
super(id);
}
}
SpoutGetter 类继承了 ConfigGetter 类,并实现了 SpoutDeclarer 接口,该接口主要是声明了一些 spout 组件相关的配置方法,具体的实现都在 ConfigGetter 的父类 BaseConfigurationDeclarer 中,实现比较简单,不展开说明。ConfigGetter 覆盖实现了父类的 addConfigurations
方法,并在该方法中将当前 spout 所有相关的配置项序列化成 json 记录到对应的 ComponentCommon 对象中:
@Override
public T addConfigurations(Map conf) {
if (conf != null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { // ${topology.kryo.register}
// 在通常的非事务流处理中,不允许设置组件的序列化方式
throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
}
String currConf = _commons.get(_id).get_json_conf();
// 将 currConf 与 conf 的配置项合并,并以 json string 的形式记录到对应组件的 json_conf 字段中
_commons.get(_id).set_json_conf(JStormUtils.mergeIntoJson(JStormUtils.parseJson(currConf), conf));
return (T) this;
}
下面继续分析 setBolt
方法,上一篇在介绍 Bolt 组件接口时我们知道 storm 提供了三种基础的 Bolt 组件类型,即 IBolt(or IRichBolt)、IBasicBolt,以及 IBatchBolt。针对每种 Bolt 类型,TopologyBuilder 都有提供相应版本的 setBolt
方法实现,下面以最常见的 IBolt 类型为例,对应的方法实现如下:
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
// 保证 id 在 topology 范围内的全局唯一
this.validateUnusedId(id);
// 以 ComponentCommon 形式封装组件,并记录到 _commons 属性中
this.initCommon(id, bolt, parallelism_hint);
// 记录 bolt 到 _bolt 集合中
_bolts.put(id, bolt);
return new BoltGetter(id);
}
流程上与 setSpout
大同小异,不再重复撰述,方法最终会将 bolt 对象记录到 _bolts
属性中,并构造当前 bolt 对应的 BoltGetter 对象。
前面我们分析了 SpoutGetter,知道其作用主要是为 spout 配置相关属性,BoltGetter 的作用同样如此,不过相对于 SpoutGetter 增加了消息分组方式的配置入口,最后同样将属性序列化为 json 格式记录到与组件相对应的 ComponentCommon 对象中。
在完成调用 setSpout
和 setBolt
往 topology 中添加 spout 和 bolt 组件之后,我们需要调用 createTopology
方法创建相应的 StormTopology 对象,该方法的实现如下:
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
// 如果当前 topology 中含有 stateful-bolt,就为 topology 自动添加一个 CheckpointSpout
this.maybeAddCheckpointSpout();
// 遍历处理 bolt,封装 bolt 的序列化形式和 ComponentCommon 形式为 Bolt 对象,并记录到 boltSpecs 中
for (String boltId : _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
// 如果当前 topology 中含有 stateful-bolt,那么针对 non-stateful bolt 都采用 CheckpointTupleForwarder 进行包装
bolt = this.maybeAddCheckpointTupleForwarder(bolt);
ComponentCommon common = this.getComponentCommon(boltId, bolt);
try {
this.maybeAddCheckpointInputs(common);
this.maybeAddWatermarkInputs(common, bolt);
// 封装 bolt 的序列化形式和 ComponentCommon 形式为 Bolt 对象,并记录到 boltSpecs 中
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
} catch (RuntimeException wrapperCause) {
// 省略异常处理逻辑
throw wrapperCause;
}
}
// 遍历处理 spout,封装 spout 的序列化形式和 ComponentCommon 形式为 SpoutSpec 对象,并记录到 spoutSpecs 中
for (String spoutId : _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = this.getComponentCommon(spoutId, spout);
try {
// 封装 spout 的序列化形式和 ComponentCommon 形式为 SpoutSpec 对象,并记录到 spoutSpecs 中
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
} catch (RuntimeException wrapperCause) {
// 省略异常处理逻辑
throw wrapperCause;
}
}
// 封装成为 stormTopology 对象返回
return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>());
}
整个方法的执行流程可以概括为:
当构建完 topology 之后,我们需要以任务的形式将其提交到 storm 集群运行。此外,为了方便调试,storm 也支持通过 LocalCluster 在本地提交运行任务,本节我们主要介绍如何向 storm 集群提交任务。
Storm 提供了 StormSubmitter 类用于向 storm 集群提交任务,并提供了两类方法: submitTopology
和 submitTopologyWithProgressBar
。后者是对前者的封装,在原版 storm 中用于支持显示任务的提交进度,但是这一设计在 jstorm 中被移除,所以两类方法实际上是等价的。接下来我们对 submitTopology
方法的实现进行分析,storm 为该方法提供了多个重载版本,对应的底层实现如下:
public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts)
throws AlreadyAliveException, InvalidTopologyException {
// 验证配置是否为 json 格式
if (!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
// 封装配置(构建 topology 期间添加的、提交 topology 时传入的,以及命令行参数)
Map userTotalConf = new HashMap();
userTotalConf.putAll(TopologyBuilder.getStormConf()); // add the configuration generated during topology building
userTotalConf.putAll(stormConf);
userTotalConf.putAll(Utils.readCommandLineOpts());
// 加载配置文件配置
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
putUserInfo(conf, stormConf);
try {
String serConf = Utils.to_json(userTotalConf); // 转换成 json 形式
if (localNimbus != null) {
// 本地模式
LOG.info("Submitting topology " + name + " in local mode");
localNimbus.submitTopology(name, null, serConf, topology);
} else {
// 集群模式
// 创建 Thrift 客户端
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
// 是否允许热部署 ${topology.hot.deploy.enable}
boolean enableDeploy = ConfigExtension.getTopologyHotDeplogyEnable(userTotalConf);
// 是否是灰度发布 ${topology.upgrade}
boolean isUpgrade = ConfigExtension.isUpgradeTopology(userTotalConf);
// 是否允许动态更新
boolean dynamicUpdate = enableDeploy || isUpgrade;
if (topologyNameExists(client, conf, name) != dynamicUpdate) {
if (dynamicUpdate) {
// 动态更新,但是对应的 topology 不存在
throw new RuntimeException("Topology with name `" + name + "` does not exist on cluster");
} else {
// 提交新任务,但是对应的 topology 已经存在
throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
}
}
// 上传 jar 包
submitJar(client, conf);
LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
// 提交任务
if (opts != null) {
client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts);
} else {
// for backward compatibility
client.getClient().submitTopology(name, path, serConf, topology);
}
} finally {
client.close();
}
}
LOG.info("Finished submitting topology: " + name);
}
// 省略 catch 代码块
}
Storm 任务提交的过程本质上是一个与 nimbus 节点进行 RPC 通信的过程,整体流程可以概括为:
配置的加载与封装过程会验证配置是否为 json 格式,并聚合多个来源的配置封装成 map 集合。在任务提交之前会验证当前 topology 在远程集群的状态,如果当前操作是热部署或灰度发布,则必须保证对应的 topology 在远程集群已经存在,而对于新提交的 topology 来说,如果远程集群存在同名的 topology 则会禁止提交。
Storm 任务的提交分为两个步骤,首先上传 topology 对应的 jar 文件到 nimbus 服务器,上传成功之后才会调用远程方法通知 nimbus 有新任务加入,需要开始为该 topology 制定运行方案。下面先来看一下 jar 报上传的过程,该过程位于 submitJar
方法中:
private static void submitJar(NimbusClient client, Map conf) {
if (submittedJar == null) {
try {
LOG.info("Jar not uploaded to master yet. Submitting jar...");
// 获取对应的 client jar 名称,例如 jstorm-1.0.0-SNAPSHOT.jar
String localJar = System.getProperty("storm.jar");
// 为待上传的 jar 包创建存储路径和 Channel,并返回路径值
// ${storm.local.dir}/nimbus/inbox/${key}
path = client.getClient().beginFileUpload();
String[] pathCache = path.split("/");
// ${storm.local.dir}/nimbus/inbox/${key}/stormjar-${key}.jar
String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar";
// 如果设置了 lib jar 则先上传 lib jar
List<String> lib = (List<String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME); // topology.lib.name
Map<String, String> libPath = (Map<String, String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_PATH); // topology.lib.path
if (lib != null && lib.size() != 0) {
for (String libName : lib) {
String jarPath = path + "/lib/" + libName;
client.getClient().beginLibUpload(jarPath);
submitJar(conf, libPath.get(libName), jarPath, client);
}
} else {
if (localJar == null) {
// no lib, no client jar
throw new RuntimeException("No client app jar found, please upload it");
}
}
// 上传 client jar
if (localJar != null) {
submittedJar = submitJar(conf, localJar, uploadLocation, client);
} else {
// no client jar, but with lib jar
client.getClient().finishFileUpload(uploadLocation);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
LOG.info("Jar has already been uploaded to master. Will not submit again.");
}
}
方法首先会获取 topology 对应的 jar 文件名称(项目打包后对应的 jar 文件),然后调用 thrift 方法 beginFileUpload
为待上传的 jar 文件创建存储路径和传输通道,并返回对应的路径值。在 storm.thrift
文件中定义了一个 service 类型的 Nimbus 类,如果你对 thrift 熟悉就应该知道这是一个 service 接口声明,Nimbus 类声明了一些能够与 nimbus 节点进行远程通信的方法,相应方法实现位于 ServiceHandler 类中,可以在该类中找到 beginFileUpload
方法的实现:
public String beginFileUpload() throws TException {
String fileLoc = null;
try {
String key = UUID.randomUUID().toString();
String path = StormConfig.masterInbox(conf) + "/" + key; // ${storm.local.dir}/nimbus/inbox/${key}
FileUtils.forceMkdir(new File(path));
FileUtils.cleanDirectory(new File(path));
fileLoc = path + "/stormjar-" + key + ".jar"; // ${storm.local.dir}/nimbus/inbox/${key}/stormjar-${key}.jar
data.getUploaders().put(fileLoc, Channels.newChannel(new FileOutputStream(fileLoc)));
LOG.info("Begin upload file from client to " + fileLoc);
return path;
}
// 省略 catch 代码块
}
方法首先会基于 UUID 为本次需要上传的 jar 文件创建一个唯一的名称标识,然后在 nimbus 本地对应的目录下创建 jar 文件存储路径(如下),同时为该路径创建一个传输通道,并返回该路径(不包含文件名称):
${storm.local.dir}/nimbus/inbox/${key}/stormjar-${key}.jar
接下来就开始执行 jar 文件的上传逻辑,如果我们在自己的代码中提交 topology 时指定了一些依赖包,那么这里首先会上传这些依赖包,然后再上传主程序包。所有的文件上传都位于一个重载版本的 submitJar
方法中,该重载方法会调用远程 uploadChunk
方法执行具体的文件上传操作,并在上传完成之后调用远程 finishFileUpload
方法关闭对应的上传通道。整个过程就是将我们发布机上本地的 topology jar 文件上传到 nimbus 节点对应的本地路径 nimbus/inbox/${key}/stormjar-${key}.jar
下面,其中 key 是一个 UUID 唯一标识。
接下来方法会调用 submitTopology
方法提交 topology 任务,默认会设置 topology 的初始化状态为 ACTIVE。Nimbus 在接收到 RPC 请求之后开始对提交的任务制定运行方案,主要是依据 topology 配置和集群的运行状态为提交的任务分配 task、worker,以及 supervisor。如果成功则返回对应的 topologyId,否则会抛出相应的异常,我们将在下一篇中对整个 topology 任务分配过程进行深入分析。
(本篇完)