文章目录
本套教程针对Flink 1.12.0版本的核心模块进行源码级讲解,从任务提交流程、通讯过程、Task调度、内存模型四大方面入手,庖丁解牛逐行分析源码,手术刀级别剖析Flink内核架构!
任务提交流程
实例以yarn-per-job为例。
flink提交作业是通过flink run进行提交的,可以从提交脚本中看到启动类即程序的入口是:
org.apache.flink.client.cli.CliFrontend
查看其中的main方法,执行的逻辑简单总结如下:
- 获取flink的conf目录的路径
- 根据conf路径,加载配置
- 封装命令行接口:按顺序Generic、Yarn、Default
完成配置加载后,真正解析之行flink任务的是下午中的cli.parseAndRun(args)
:
点进去可以看到里面对应的多种操作,是取第一个参数进行判断,提交时我们用的是flink run
,明显对应下图run的位置:
简单看下run方法:
/**
* 执行run操作。
*
* @param args 运行操作的命令行参数。
*/
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
/*TODO 获取run动作,默认的配置项*/
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
/*TODO 根据用户指定的配置项,进行解析 例如-t -p -c等*/
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
/*TODO 根据之前添加的顺序,挨个判断是否active:Generic、Yarn、Default*/
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
/*TODO 获取 用户的jar包和其他依赖*/
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
/*TODO 获取有效配置:HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数...*/
final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);
try {
/*TODO 执行程序*/
executeProgram(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
}
在判断是否active的地方可以进去查看isActive方法:
@Override
public boolean isActive(CommandLine commandLine) {
final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
/*TODO ID是固定的字符串 "yarn-cluster"*/
final boolean yarnJobManager = ID.equals(jobManagerOption);
/*TODO 判断是否存在 Yarn Session对应的 AppID*/
final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt())
|| configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET))
|| YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
/*TODO -m yarn-cluster || yarn有appID,或者命令行指定了 || 执行器是yarn的 这三个条件满足一个就走yarn*/
return hasYarnExecutor || yarnJobManager || hasYarnAppId;
}
可以知道session模式的化会指定好applicationId。
如果yarn的不符合,就只能走default,DefaultCLI的isActive逻辑直接返回true。
yarn-per-job新老版本写法:
老版本(<=1.10):flink run -m yarn-cluster -c xxxxx xxx.jar
新版本(>=1.11):flink run -t yarn-per-job -c xxxxx xxx.jar
提交流程-执行用户代码
前面说了client提交的大概流程,我们知道程序开始执行其实是执行StreamExecutionEnvironment.execute()
开始的。这里会生成StreamGraph:
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
/*TODO 获取StreamGraph,并接着执行*/
return execute(getStreamGraph(jobName));
}
层层进去可以到AbstractJobClusterExecutor.execute
,这里将streamGraph转换为jobGragh:
@Override
public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception {
/*TODO 将 流图(StreamGraph) 转换成 作业图(JobGraph)*/
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
/*TODO 集群描述器:创建、启动了 YarnClient, 包含了一些yarn、flink的配置和环境信息*/
try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
/*TODO 集群特有资源配置:JobManager内存、TaskManager内存、每个Tm的slot数*/
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
final ClusterClientProvider