 
 
微信公众号: 深广大数据Club
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
如果你觉得深广大数据Club对你有帮助,欢迎赞赏
本文主要讲述Apache Flink在集群模式下提交任务的执行流程源码分析。
本地模式任务提交源码解析可以参考上篇文章《Flink源码解析 | 从Example出发:读懂本地任务执行流程》进行了解。
Apache Flink集群模式任务提交执行流程入口与本地模式入口相同。
我们还是从SocketWindowWordCount入手.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
判断获取对应的ExecutionEnvironment对象,这里获取的对象是StreamContextEnvironment。
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        if (env instanceof ContextEnvironment) {
            return new StreamContextEnvironment((ContextEnvironment) env);
        } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
            return new StreamPlanEnvironment(env);
        } else {
            return createLocalEnvironment();
        }
 
  @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
        StreamGraph streamGraph = this.getStreamGraph();
        streamGraph.setJobName(jobName);
        transformations.clear();
        // execute the programs
        if (ctx instanceof DetachedEnvironment) {
            LOG.warn("Job was executed in detached mode, the results will be available on completion.");
            ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
            return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
        } else {
            return ctx
                .getClient()
                .run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
                .getJobExecutionResult();
        }
    }
 
  若不指定detached模式,则执行else代码块中的代码
获取ClusterClient对象,执行run方法
设置jar,classpath,classloader,savePointRestore
获取JobExecutionResult
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
            throws ProgramInvocationException, ProgramMissingJobException {
    ...
    if (prog.isUsingProgramEntryPoint()) {
        ...
    }else if (prog.isUsingInteractiveMode()) {
        ...
    }
}
 
  任务运行提交的时候判断是使用交互模式还是使用程序入口点。
    public boolean isUsingInteractiveMode() {
        return this.program == null;
    }
    public boolean isUsingProgramEntryPoint() {
        return this.program != null;
    }
 
  判断条件则是program是否为null。
flink run脚本调用CliFrontend.java类中的run方法。
我们执行run方法启动程序,调用buildProgram()方法初给program赋值,之后传递给runProgram()继续往下调用。
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
        final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);
        final RunOptions runOptions = new RunOptions(commandLine);
        final PackagedProgram program;
        try {
            LOG.info("Building program from JAR file");
            program = buildProgram(runOptions);
        }
        ...
        runProgram(customCommandLine, commandLine, runOptions, program);
        ...
 
  从以下代码可以看出,这里buildProgram其实就是我们提交jar包的方式。
PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
        String[] programArgs = options.getProgramArgs();
        String jarFilePath = options.getJarFilePath();
        List<URL> classpaths = options.getClasspaths();
        if (jarFilePath == null) {
            throw new IllegalArgumentException("The program JAR file was not specified.");
        }
        File jarFile = new File(jarFilePath);
        // Check if JAR file exists
        if (!jarFile.exists()) {
            throw new FileNotFoundException("JAR file does not exist: " + jarFile);
        }
        else if (!jarFile.isFile()) {
            throw new FileNotFoundException("JAR file is not a file: " + jarFile);
        }
        // Get assembler class
        String entryPointClass = options.getEntryPointClassName();
        PackagedProgram program = entryPointClass == null ?
                new PackagedProgram(jarFile, classpaths, programArgs) :
                new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
        program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
        return program;
    }
 
  调用多层run方法后,最后调用StandaloneClusterClient的submit方法提交任务,返回JobSubmissionResult结果。
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
            throws CompilerException, ProgramInvocationException {
        ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
        if (classLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }
        OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
        return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
    }
    ...
    public JobSubmissionResult run(FlinkPlan compiledPlan,
            List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
            throws ProgramInvocationException {
        JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
        return submitJob(job, classLoader);
    }
 
  以上流程调用涉及到的两个关键的run方法。
第一个run()方法
加载运行jar包中的主程序类
获取优化计划OptimizedPlan
提交到下一个run方法
第二个run()方法
创建JobGraph对象
提交任务并返回JobSubmissionResult(ClusterClient的submitJob()方法是一个抽象方法,这里实际上是调用的StandaloneClusterClient的submitJob()方法)
@Override
    public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
            throws ProgramInvocationException {
        if (isDetached()) {
            return super.runDetached(jobGraph, classLoader);
        } else {
            return super.run(jobGraph, classLoader);
        }
    }
 
   我们在执行flink run命令的时候,若命令行添加 -d 指定,则会走 runDetached() ;否则,走 run() 
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        waitForClusterToBeReady();
        final ActorSystem actorSystem;
        try {
            actorSystem = actorSystemLoader.get();
        } catch (FlinkException fe) {
            throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " +
                "JobManager.", jobGraph.getJobID(), fe);
        }
        try {
            logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
            this.lastJobExecutionResult = JobClient.submitJobAndWait(
                actorSystem,
                flinkConfig,
                highAvailabilityServices,
                jobGraph,
                timeout,
                printStatusDuringExecution,
                classLoader);
            return lastJobExecutionResult;
        } catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), jobGraph.getJobID(), e);
        }
    }
 
  到这里就会涉及到flink的ActorSystem,ActorSystem的分析后续文章再做讲述。
上述逻辑包含如下步骤:
等待集群状态准备就绪
获取ActorSystem实例
传入actorSystem,flinkConfig以及jobGraph等参数,调用JobClient.submitJobAndWait()方法执行并等待任务返回结果JobExecutionResult
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        waitForClusterToBeReady();
        final ActorGateway jobManagerGateway;
        try {
            jobManagerGateway = getJobManagerGateway();
        } catch (Exception e) {
            throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.",
                jobGraph.getJobID(), e);
        }
        try {
            logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
            JobClient.submitJobDetached(
                new AkkaJobManagerGateway(jobManagerGateway),
                flinkConfig,
                jobGraph,
                Time.milliseconds(timeout.toMillis()),
                classLoader);
            return new JobSubmissionResult(jobGraph.getJobID());
        } catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(),
                jobGraph.getJobID(), e);
        }
    }
 
  runDetached流程与run的流程类似。不同的是runDetached采用的是ActorGateway,而run采用的是ActorSystem.
创建ContextEnvironmentFactory工厂对象,并通过factory.getLastEnvCreated()获得DetachedEnvironment,并调用finalizeExecute方法。通过实例化的ClusterClient实例对象调用run方法,run方法再调用submit执行,返回JobSubmissionResult结果
这块在这里就不具体展开,有兴趣的伙伴可以自己看下源码。
 