flink教程-详解flink 1.11中的新部署模式-Application模式

背景

目前对于flink来说,生产环境一般有两个部署模式,一个是 session模式,一个是per job模式。

session模式

这种模式会预先在yarn或者或者k8s上启动一个flink集群,然后将任务提交到这个集群上,这种模式,集群中的任务使用相同的资源,如果某一个任务出现了问题导致整个集群挂掉,那就得重启集群中的所有任务,这样就会给集群造成很大的负面影响。

per job模式

考虑到集群的资源隔离情况,一般生产上的任务都会选择per job模式,也就是每个任务启动一个flink集群,各个集群之间独立运行,互不影响,且每个集群可以设置独立的配置。

per job模式的问题

目前,对于per job模式,jar包的解析、生成JobGraph是在客户端上执行的,然后将生成的jobgraph提交到集群。很多公司都会有自己的实时计算平台,用户可以使用这些平台提交flink任务,如果任务特别多的话,那么这些生成JobGraph、提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。

此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应 的临时目录,这个也会带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。

引入application模式

所以针对flink per job模式的一些问题,flink 引入了一个新的部署模式–Application模式。 目前 Application 模式支持 Yarn 和 K8s 的部署方式,Yarn Application 模式会在客户端将运行任务需要的依赖都上传到 Flink Master,然后在 Master 端进行任务的提交。

此外,还支持远程的用户jar包来提交任务,比如可以将jar放到hdfs上,进一步减少上传jar所需的时间,从而减少部署作业的时间。

具体的使用命令是:

/bin/flink run-application -p 1 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://localhost/flink/libs" \
hdfs://localhost/user-jars/HelloWold.jar

通过程序提交任务

当我们要做一个实时计算平台的时候,会需要通过程序来提交任务到集群,这时候需要我们自己封装一套API来实现提交flink任务到集群,目前主要的生产环境还是以yarn居多,所以我们今天讲讲怎么通过api的方式把一个任务以application的方法提交到yarn集群。

  • 引入相关的配置到classpath里
    core-site.xml
    hdfs-site.xml
    yarn-site.xml

  • 定义相关的配置参数

		//flink的本地配置目录,为了得到flink的配置
		String configurationDirectory = "/Users/user/work/flink/conf/";
		//存放flink集群相关的jar包目录
		String flinkLibs = "hdfs://hadoopcluster/data/flink/libs";
		//用户jar
		String userJarPath = "hdfs://hadoopcluster/data/flink/user-lib/TopSpeedWindowing.jar";
		String flinkDistJar = "hdfs://hadoopcluster/data/flink/libs/flink-yarn_2.11-1.11.0.jar";
  • 获取flink的配置

这里其实还可以设置很多的配置参数,比如yarn的队列名字等等,大家根据自己的需要来设置。

//		获取flink的配置
		Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
				configurationDirectory);
				
	    //设置为application模式
		flinkConfiguration.set(
				DeploymentOptions.TARGET,
				YarnDeploymentTarget.APPLICATION.getName());
		//yarn application name
		flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobName");
		
		.........
  • 设置用户jar的参数和主类
//		设置用户jar的参数和主类
		ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
  • 提交任务到集群
		YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
				flinkConfiguration,
				yarnConfiguration,
				yarnClient,
				clusterInformationRetriever,
				true);
		ClusterClientProvider<ApplicationId> clusterClientProvider = null;
		try {
			clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
					clusterSpecification,
					appConfig);
		} catch (ClusterDeploymentException e){
			e.printStackTrace();
		}

完整代码请参考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/cluster/SubmitJobApplicationMode.java

Application模式源码解析

通过上面提交的脚本我们看到入口是从flink bin目录下flink命令开始的,我们看下这个文件的最后一行代码,也就是提交任务的入口类:org.apache.flink.client.cli.CliFrontend,接下来我们基于flink 1.11的源码简单梳理一下flink是如何把一个任务提交到yarn集群的。

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

入口

在CliFrontend的main方法里,我们看到做了这么几件事。

  1. 获取flink的配置目录
  2. 加载flink的配置
  3. 加载并解析命令行参数
  4. 通过CliFrontend.parseParameters方法来执行具体的操作
		// 1. find the configuration directory
		final String configurationDirectory = getConfigurationDirectoryFromEnv();

		// 2. load the global configuration
		final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

		// 3. load the custom command lines
		final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
			configuration,
			configurationDirectory);

		try {
			final CliFrontend cli = new CliFrontend(
				configuration,
				customCommandLines);

			SecurityUtils.install(new SecurityConfiguration(cli.configuration));
			int retCode = SecurityUtils.getInstalledContext()
					.runSecured(() -> cli.parseParameters(args));
			System.exit(retCode);
		}

执行具体的操作

在parseParameters方法里,解析出来要执行的操作,然后通过一个switch来进入要执行的方法,我们这里是进入runApplication方法。

			switch (action) {
				case ACTION_RUN:
					run(params);
					return 0;
				case ACTION_RUN_APPLICATION:
					runApplication(params);
					return 0;
				case ACTION_LIST:
					list(params);
					return 0;
				 ..........
		    }		 

runApplication方法

在这个方法里,主要是用传进来的命令行参数构造出来flink的配置对象Configuration,以及application模式所需的配置ApplicationConfiguration,包括入口类,jar包参数,最后

   // 用传进来的命令行参数构造出来flink的配置对象Configuration
	final Configuration effectiveConfiguration = getEffectiveConfiguration(
				activeCommandLine, commandLine, programOptions, Collections.singletonList(uri.toString()));
				
		//构造包含入口类和jar包参数的配置ApplicationConfiguration
		final ApplicationConfiguration applicationConfiguration =
				new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
				
		deployer.run(effectiveConfiguration, applicationConfiguration);

构造ClusterDescriptor

上面的方法会进入ApplicationClusterDeployer的run方法,在这里会根据配置使用工厂类构造不同的ClusterDescriptor,比如是k8s的话会构造KubernetesClusterDescriptor,部署在yarn的话会构造YarnClusterDescriptor。之后会通过deployApplicationCluster来部署application模式的flink程序。

Deploy Application Cluster

我们这里以yarn集群为例,进入YarnClusterDescriptor#deployApplicationCluster方法,在这个方法里,我们看到经过一些简单的检查之后,调用了private方法YarnClusterDescriptor#deployInternal,这个deployInternal是一个提供公共功能的方法,可以看下其他的部署模式,yarn session模式,per job模式,都是调用的这个方法,只是参数不同而已。

我们简单看下这个方法:

	/**
	 * This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
	 *
	 * @param clusterSpecification 一些配置参数
	 * @param applicationName yarn job的名字
	 * @param yarnClusterEntrypoint 入口类
	 * @param jobGraph 程序的jobGraph,可为空
	 * @param detached 是否是隔离模式
	 */
	private ClusterClientProvider<ApplicationId> deployInternal(
			ClusterSpecification clusterSpecification,
			String applicationName,
			String yarnClusterEntrypoint,
			@Nullable JobGraph jobGraph,
			boolean detached) throws Exception {

在这个方法里,将会根据不同的部署模式做一些必要的检查,然后启动yarn容器的操作。比如per job模式,上传flink jar包等等,都是在这个方法完成的。此外,该方法会一直阻塞到ApplicationMaster/JobManager部署成功,之后会进入用户程序的入口类ApplicationClusterEntryPoint来执行用户程序。

ApplicationClusterEntryPoint

yarn组件启动完成之后,开始执行用户的程序,在这个类里,会做以下的一些工作:

  • 下载必要的jar或者resources
  • 进行leader选举,决定谁执 main 方法
  • 用户程序退出时终止集群
  • 保证HA和容错

application模式提交任务到yarn集群,大概的流程就先讲到这里,flink任务执行的流程,后续再写篇文章专门介绍。

更多精彩信息,欢迎关注我的公众号【大数据技术与应用实战】
在这里插入图片描述

### 解决 IntelliJ IDEA 中 `@Autowired` 注解导致的红色波浪线错误 在使用 Spring 框架时,如果遇到 `@Autowired` 注解下的依赖注入对象显示为红色波浪线错误或者黄色警告的情况,通常是由以下几个原因引起的: #### 1. **Spring 插件未启用** 如果 Spring 支持插件未被激活,则可能导致 IDE 无法识别 `@Autowired` 或其他 Spring 特定的功能。可以通过以下方式解决问题: - 打开设置菜单:`File -> Settings -> Plugins`。 - 确认已安装并启用了名为 “Spring Framework Support” 的官方插件[^1]。 #### 2. **项目配置文件缺失或不正确** Spring 需要通过 XML 文件、Java Config 类或其他形式来定义 Bean 定义。如果没有正确加载这些配置文件,可能会导致 `@Autowired` 报错。 - 确保项目的 `applicationContext.xml` 或者基于 Java 的配置类(带有 `@Configuration` 和 `@Bean` 注解)已被正确定义和引入。 - 对于 Spring Boot 项目,确认是否存在 `spring.factories` 文件以及是否包含了必要的组件扫描路径[^3]。 #### 3. **模块依赖关系问题** 当前模块可能缺少对 Spring Core 或 Context 组件库的有效引用。这可能是由于 Maven/Gradle 构建工具中的依赖项声明不足造成的。 - 检查 `pom.xml` (Maven) 或 `build.gradle` (Gradle),确保包含如下核心依赖之一: ```xml <!-- For Maven --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> ``` ```gradle // For Gradle implementation 'org.springframework:spring-context:${springVersion}' ``` -项目依赖树以应用更改:右键点击项目根目录 -> `Maven -> Reload Project` 或运行命令 `./gradlew build --refresh-dependencies`。 #### 4. **IDE 缓存损坏** Intellij IDEA 的缓存机制有时会因各种因素而失效,从而引发误报错误。清除缓存可以有效缓解此类情况。 - 使用快捷组合键 `Ctrl + Alt + Shift + S` 进入项目结构对话框;也可以尝试执行操作序列:`File -> Invalidate Caches / Restart... -> Invalidate and Restart`. #### 5. **启动异常影响正常解析** 若之前存在类似 `com.intellij.diagnostic.PluginException` 的严重初始化失败日志记录,则表明某些关键服务未能成功加载,进而干扰到后续功能表现[^2]。建议重下载最稳定版本的 IDEA 并按照标准流程完成初次部署工作。 ```java // 示例代码片段展示如何正确运用 @Autowired 注解实现自动装配 @Service public class StudentService { private final Repository repository; public StudentService(@Qualifier("specificRepository") Repository repo){ this.repository = repo; } } @Component class SpecificComponent{ @Autowired private transient StudentService studentService; // 此处应无任何编译期告警现象发生 } ```
评论 5
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值