Flink内核源码解析(出自B站尚硅谷)


本套教程针对Flink 1.12.0版本的核心模块进行源码级讲解,从任务提交流程、通讯过程、Task调度、内存模型四大方面入手,庖丁解牛逐行分析源码,手术刀级别剖析Flink内核架构!

任务提交流程

在这里插入图片描述
实例以yarn-per-job为例。

flink提交作业是通过flink run进行提交的,可以从提交脚本中看到启动类即程序的入口是:

org.apache.flink.client.cli.CliFrontend

查看其中的main方法,执行的逻辑简单总结如下:

  1. 获取flink的conf目录的路径
  2. 根据conf路径,加载配置
  3. 封装命令行接口:按顺序Generic、Yarn、Default

完成配置加载后,真正解析之行flink任务的是下午中的cli.parseAndRun(args):
在这里插入图片描述
点进去可以看到里面对应的多种操作,是取第一个参数进行判断,提交时我们用的是flink run,明显对应下图run的位置:
在这里插入图片描述
简单看下run方法:

	/**
	 * 执行run操作。
	 *
	 * @param args 运行操作的命令行参数。
	 */
	protected void run(String[] args) throws Exception {
   
		LOG.info("Running 'run' command.");

		/*TODO 获取run动作,默认的配置项*/
		final Options commandOptions = CliFrontendParser.getRunCommandOptions();

		/*TODO 根据用户指定的配置项,进行解析 例如-t -p -c等*/
		final CommandLine commandLine = getCommandLine(commandOptions, args, true);

		// evaluate help flag
		if (commandLine.hasOption(HELP_OPTION.getOpt())) {
   
			CliFrontendParser.printHelpForRun(customCommandLines);
			return;
		}

		/*TODO 根据之前添加的顺序,挨个判断是否active:Generic、Yarn、Default*/
		final CustomCommandLine activeCommandLine =
				validateAndGetActiveCommandLine(checkNotNull(commandLine));

		final ProgramOptions programOptions = ProgramOptions.create(commandLine);

		/*TODO 获取 用户的jar包和其他依赖*/
		final List<URL> jobJars = getJobJarAndDependencies(programOptions);

		/*TODO 获取有效配置:HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数...*/
		final Configuration effectiveConfiguration = getEffectiveConfiguration(
				activeCommandLine, commandLine, programOptions, jobJars);

		LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

		final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);

		try {
   
			/*TODO 执行程序*/
			executeProgram(effectiveConfiguration, program);
		} finally {
   
			program.deleteExtractedLibraries();
		}
	}

在判断是否active的地方可以进去查看isActive方法:

	@Override
	public boolean isActive(CommandLine commandLine) {
   
		final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
		/*TODO ID是固定的字符串 "yarn-cluster"*/
		final boolean yarnJobManager = ID.equals(jobManagerOption);
		/*TODO 判断是否存在 Yarn Session对应的 AppID*/
		final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt())
			|| configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
		final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET))
			|| YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
		/*TODO -m yarn-cluster || yarn有appID,或者命令行指定了 || 执行器是yarn的 这三个条件满足一个就走yarn*/
		return hasYarnExecutor || yarnJobManager || hasYarnAppId;
	}

可以知道session模式的化会指定好applicationId。

如果yarn的不符合,就只能走default,DefaultCLI的isActive逻辑直接返回true。

yarn-per-job新老版本写法:
老版本(<=1.10):flink run -m yarn-cluster -c xxxxx xxx.jar
新版本(>=1.11):flink run -t yarn-per-job -c xxxxx xxx.jar

提交流程-执行用户代码

前面说了client提交的大概流程,我们知道程序开始执行其实是执行StreamExecutionEnvironment.execute()开始的。这里会生成StreamGraph:

public JobExecutionResult execute(String jobName) throws Exception {
   
	Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

	/*TODO 获取StreamGraph,并接着执行*/
	return execute(getStreamGraph(jobName));
}

层层进去可以到AbstractJobClusterExecutor.execute,这里将streamGraph转换为jobGragh:

	@Override
	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception {
   
		/*TODO 将 流图(StreamGraph) 转换成 作业图(JobGraph)*/
		final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);

		/*TODO 集群描述器:创建、启动了 YarnClient, 包含了一些yarn、flink的配置和环境信息*/
		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
   
			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);

			/*TODO 集群特有资源配置:JobManager内存、TaskManager内存、每个Tm的slot数*/
			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);

			final ClusterClientProvider
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值