转载

Apache Flink流作业提交流程分析

提交流程调用的关键方法链

用户编写的程序逻辑需要提交给Flink才能得到执行。本文来探讨一下客户程序如何提交给Flink。鉴于用户将自己利用Flink的API编写的逻辑打成相应的应用程序包(比如Jar)然后提交到一个目标Flink集群上去运行是比较主流的使用场景,因此我们的分析也基于这一场景进行。

Flink的API针对不同的执行环境有不同的 Environment 对象,这里我们主要基于常用的 RemoteStreamEnvironmentRemoteEnvironment 进行分析

在前面我们谈到了Flink中实现了“惰性求值”,只有当最终调用 execute 方法时,才会“真正”开始执行。因此, execute 方法是我们的切入点。

其源码位于 org.apache.flink.streaming.api.environment.RemoteStreamEnvironment

首先,我们来看一下其 execute 方法触发的关键方法调用链示意图:

Apache Flink流作业提交流程分析

根据上图的调用链,我们针对这些关键方法进行剖析,当然一些细节性的内容我们可能会暂时略过,这样可以保证主路径一直都很清晰。

getStreamGraph 方法用于获得一个 StreamGraph 的实例,该实例表示流的完整的拓扑结构并且包含了生成 JobGraph 所必要的相关信息(包含了 sourcesink 的集合以及这些在图中的“节点”抽象化的表示、一些虚拟的映射关系、执行和检查点的配置等)。

获得 StreamGraph 之后,通过调用 executeRemotely 方法进行远程执行。该方法首先根据获取到的用户程序包的路径以及类路径创建加载用户代码的类加载器:

ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(
jarFiles, globalClasspaths, getClass().getClassLoader());

紧接着根据配置构建Client对象(Client对象是真正跟JobManager对接的内部代理):

Client client;
try {
client = new Client(configuration);
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}catch (Exception e) {
throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
}

后面的事情就此被 Client 接管:

try {   
return client.runBlocking(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader);
}catch (ProgramInvocationException e) {
throw e;
}catch (Exception e) {
String term = e.getMessage() == null ? "." : (": " + e.getMessage());
throw new ProgramInvocationException("The program execution failed" + term, e);
}finally {
client.shutdown();
}

client 对象调用了 runBlocking 以阻塞式的行为“运行”用户程序并等待返回 JobExecutionResult 对象作为 Job 的执行结果。执行完成,最终在 finally 块中,调用 shutdown 方法关闭并释放资源。

runBlocking 被调用后,调用链跳转到Client类中。为了适配多种提交方式以及运行模式, runBlocking 方法有着非常多的重载。在当前的远程执行环境下, runBlocking 在多个重载方法之间跳转的过程中,会调用 getJobGraph 方法获得 JobGraph 的实例。 JobGraph 表示Flink dataflow 程序,它将会被 JobManager 所理解并接收。在某个 Job 被提交给 JobManager 之前,通过Flink提供的高层次的API都将会被转化为 JobGraph 表示。关于如何获得JobGraph的实现,我们后面会进行剖析。这里,让我们忽视这些细节,进入下一个关键方法。

runBlocking_1 其实是 runBlocking 方法的重载,这里加一个后缀标识,只是为了跟上面的 runBlocking 进行区别。runBlocking_1方法中,首先利用 LeaderRetrievalUtils 创建了 LeaderRetrievalService 这一服务对象:

LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
} catch (Exception e) {
throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
}

顾名思义, LeaderRetrievalService 在Flink中提供查找主节点的服务。它会根据Flink的配置信息(主要是recovery.mode来判断基于哪种恢复机制来创建该服务。当前有两种模式:一种是 Standalone 的独立运行模式;另一种是基于 Zookeeper 的高可用模式)。Flink提供了一个称之为 LeaderRetrievalListener 的回调接口来获得主节点的信息。接下来,就是调用 JobClientsubmitJobAndWait 方法将产生的 JobGraph 以及主节点查找的服务对象等相关信息提交给 JobManager 并等待返回结果:

try {   
this.lastJobID = jobGraph.getJobID();
return JobClient.submitJobAndWait(actorSystem, leaderRetrievalService, jobGraph,
timeout, printStatusDuringExecution, classLoader)
;

} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}

上面的 submitJobAndWait 方法的第一个参数 actorSystemActorSystem 的实例。在构造 Client 对象时创建,在Job提交并获得返回结果后通过调用 Clientshutdown 方法关闭:

public void shutdown() {   
if (!this.actorSystem.isTerminated()) {
this.actorSystem.shutdown();
this.actorSystem.awaitTermination();
}
}

该方法的调用见上面 executeRemotely 方法的代码段的finally语句块。

JobClient 的出现可能会让你产生疑惑——它跟 Client 是什么关系?作用是什么?下面这幅示意图可以用来解释这些疑问:

Apache Flink流作业提交流程分析

上面这幅图展示了 Client 对象与其他几个对象的关系。 JobClient 在其中起到了“桥接”作用,它在基于API的编程层面上桥接了同步的方法调用和异步的消息通信。更具体得说, JobClient 可以看做是一个“静态类”提供了一些静态方法,这里我们主要关注上面的 submitJobAndWait 方法,该方法内部封装了 Actor 之间的异步通信(具体的通信对象是 JobClientActor ,它负责跟 JobManagerActorSystemActor 进行通信),并以阻塞的形式返回结果。而 Client 只需调用 JobClient 的这些方法,而无需关注其内部是如何实现的。

通过调用 JobClient 的静态方法 submitJobAndWait ,会触发基于 AkkaActor 之间的消息通信来完成后续的提交JobGraph的动作。 JobClient 提交 Job 的基于消息交互的抽象示意图如下:

Apache Flink流作业提交流程分析

总体来说这里总共有两个 ActorSystem ,一个归属于 Client ,另一个归属于 JobManager 。在 submitJobAndWait 方法中,其首先会创建一个 JobClientActorActorRef

ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

然后向其发起一个 SubmitJobAndWait 消息,该消息将 JobGraph 的实例提交给 jobClientActor 。该消息的发起模式是 ask ,它表示需要一个应答消息。

JobClient向JobClientActor发送消息的代码段如下所示:

Future<Object> future = Patterns.ask(jobClientActor,      
new JobClientMessages.SubmitJobAndWait(jobGraph),
new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());

JobClient 会阻塞等待该 future 返回结果。在得到返回结果answer之后,先进行解析判断它是 Job 被成功执行返回的结果还是失败返回的结果。

小结

至此,Client提交Streaming Job的关键方法调用路径已梳理完成。这里为了突出主路线,同时避免被太多的实现细节干扰,我们暂时忽略了一些重要数据结构和关键概念的解读。不过,后续我们会对它们进行分析。

微信扫码关注公众号:Apache_Flink

Apache Flink流作业提交流程分析

QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

Apache Flink流作业提交流程分析

原文  http://vinoyang.com/2016/07/17/flink-streaming-job-submit/
正文到此结束
Loading...