微信公众号: 深广大数据Club
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
如果你觉得深广大数据Club对你有帮助,欢迎赞赏
本文主要讲述Apache Flink在集群模式下提交任务的执行流程源码分析。
本地模式任务提交源码解析可以参考上篇文章《Flink源码解析 | 从Example出发:读懂本地任务执行流程》进行了解。
Apache Flink集群模式任务提交执行流程入口与本地模式入口相同。
我们还是从SocketWindowWordCount入手.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
判断获取对应的ExecutionEnvironment对象,这里获取的对象是StreamContextEnvironment。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof ContextEnvironment) {
return new StreamContextEnvironment((ContextEnvironment) env);
} else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
return new StreamPlanEnvironment(env);
} else {
return createLocalEnvironment();
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
StreamGraph streamGraph = this.getStreamGraph();
streamGraph.setJobName(jobName);
transformations.clear();
// execute the programs
if (ctx instanceof DetachedEnvironment) {
LOG.warn("Job was executed in detached mode, the results will be available on completion.");
((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
} else {
return ctx
.getClient()
.run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
.getJobExecutionResult();
}
}
若不指定detached模式,则执行else代码块中的代码
获取ClusterClient对象,执行run方法
设置jar,classpath,classloader,savePointRestore
获取JobExecutionResult
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
throws ProgramInvocationException, ProgramMissingJobException {
...
if (prog.isUsingProgramEntryPoint()) {
...
}else if (prog.isUsingInteractiveMode()) {
...
}
}
任务运行提交的时候判断是使用交互模式还是使用程序入口点。
public boolean isUsingInteractiveMode() {
return this.program == null;
}
public boolean isUsingProgramEntryPoint() {
return this.program != null;
}
判断条件则是program是否为null。
flink run脚本调用CliFrontend.java类中的run方法。
我们执行run方法启动程序,调用buildProgram()方法初给program赋值,之后传递给runProgram()继续往下调用。
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);
final RunOptions runOptions = new RunOptions(commandLine);
final PackagedProgram program;
try {
LOG.info("Building program from JAR file");
program = buildProgram(runOptions);
}
...
runProgram(customCommandLine, commandLine, runOptions, program);
...
从以下代码可以看出,这里buildProgram其实就是我们提交jar包的方式。
PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
String[] programArgs = options.getProgramArgs();
String jarFilePath = options.getJarFilePath();
List<URL> classpaths = options.getClasspaths();
if (jarFilePath == null) {
throw new IllegalArgumentException("The program JAR file was not specified.");
}
File jarFile = new File(jarFilePath);
// Check if JAR file exists
if (!jarFile.exists()) {
throw new FileNotFoundException("JAR file does not exist: " + jarFile);
}
else if (!jarFile.isFile()) {
throw new FileNotFoundException("JAR file is not a file: " + jarFile);
}
// Get assembler class
String entryPointClass = options.getEntryPointClassName();
PackagedProgram program = entryPointClass == null ?
new PackagedProgram(jarFile, classpaths, programArgs) :
new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
return program;
}
调用多层run方法后,最后调用StandaloneClusterClient的submit方法提交任务,返回JobSubmissionResult结果。
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
throws CompilerException, ProgramInvocationException {
ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
if (classLoader == null) {
throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
}
OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
}
...
public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
throws ProgramInvocationException {
JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
return submitJob(job, classLoader);
}
以上流程调用涉及到的两个关键的run方法。
第一个run()方法
加载运行jar包中的主程序类
获取优化计划OptimizedPlan
提交到下一个run方法
第二个run()方法
创建JobGraph对象
提交任务并返回JobSubmissionResult(ClusterClient的submitJob()方法是一个抽象方法,这里实际上是调用的StandaloneClusterClient的submitJob()方法)
@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
throws ProgramInvocationException {
if (isDetached()) {
return super.runDetached(jobGraph, classLoader);
} else {
return super.run(jobGraph, classLoader);
}
}
我们在执行flink run命令的时候,若命令行添加 -d 指定,则会走 runDetached() ;否则,走 run()
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
waitForClusterToBeReady();
final ActorSystem actorSystem;
try {
actorSystem = actorSystemLoader.get();
} catch (FlinkException fe) {
throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " +
"JobManager.", jobGraph.getJobID(), fe);
}
try {
logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
this.lastJobExecutionResult = JobClient.submitJobAndWait(
actorSystem,
flinkConfig,
highAvailabilityServices,
jobGraph,
timeout,
printStatusDuringExecution,
classLoader);
return lastJobExecutionResult;
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), jobGraph.getJobID(), e);
}
}
到这里就会涉及到flink的ActorSystem,ActorSystem的分析后续文章再做讲述。
上述逻辑包含如下步骤:
等待集群状态准备就绪
获取ActorSystem实例
传入actorSystem,flinkConfig以及jobGraph等参数,调用JobClient.submitJobAndWait()方法执行并等待任务返回结果JobExecutionResult
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
waitForClusterToBeReady();
final ActorGateway jobManagerGateway;
try {
jobManagerGateway = getJobManagerGateway();
} catch (Exception e) {
throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.",
jobGraph.getJobID(), e);
}
try {
logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
JobClient.submitJobDetached(
new AkkaJobManagerGateway(jobManagerGateway),
flinkConfig,
jobGraph,
Time.milliseconds(timeout.toMillis()),
classLoader);
return new JobSubmissionResult(jobGraph.getJobID());
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(),
jobGraph.getJobID(), e);
}
}
runDetached流程与run的流程类似。不同的是runDetached采用的是ActorGateway,而run采用的是ActorSystem.
创建ContextEnvironmentFactory工厂对象,并通过factory.getLastEnvCreated()获得DetachedEnvironment,并调用finalizeExecute方法。通过实例化的ClusterClient实例对象调用run方法,run方法再调用submit执行,返回JobSubmissionResult结果
这块在这里就不具体展开,有兴趣的伙伴可以自己看下源码。