1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
配置到原 Job 中:
@Bean
public Job firstJob(Step step1) {
return jobBuilderFactory.get("firstJob")
.listener(jobCompletionListener) // 绑定监听器
.start(step1)
.end()
.build();
}
1
2
3
4
5
6
7
8
2. 通过调度框架编排任务
使用 Spring Scheduler 或 Quartz 等工具,通过代码控制 Job 执行顺序。
示例:Spring Scheduler 编排任务:
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class JobScheduler {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("firstJob")
private Job firstJob;
@Autowired
@Qualifier("secondJob")
private Job secondJob;
@Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行
public void runJobsSequentially() throws Exception {
// 执行第一个 Job
JobParameters params1 = new JobParametersBuilder()
.addLong("startAt", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(firstJob, params1);
// 第一个 Job 成功后,执行第二个 Job
JobParameters params2 = new JobParametersBuilder()
.addLong("startAt", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(secondJob, params2);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
3. 利用工作流引擎(如 Spring Cloud Data Flow)
通过可视化工具编排多个 Job,定义复杂的依赖关系。
实现步骤:
将每个 Job 打包为独立的 Spring Boot 应用。
在 Spring Cloud Data Flow 中定义任务流程:
task create --name jobFlow --definition "firstJob && secondJob"
1
通过 REST API 或 Dashboard 触发流程执行。
4. 通过数据库或文件传递数据
Job A 将结果写入数据库或文件,Job B 读取该数据继续处理。
示例:通过数据库传递数据:
Job A 写入数据到临时表:
CREATE TABLE temp_data (id INT, result VARCHAR(100));
1
Job B 读取临时表数据:
@Bean
public JdbcCursorItemReader<Data> reader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Data>()
.name("dataReader")
.dataSource(dataSource)
.sql("SELECT id, result FROM temp_data")
.rowMapper((rs, rowNum) -> new Data(rs.getInt("id"), rs.getString("result")))
.build();
}
1
2
3
4
5
6
7
8
9
三、关键注意事项
1. 避免重复执行同一 Job 实例
• 每次启动 Job 时,使用不同的 JobParameters(如时间戳)。
• 使用 RunIdIncrementer 自动生成唯一参数:
@Bean
public Job firstJob(Step step1) {
return jobBuilderFactory.get("firstJob")
.incrementer(new RunIdIncrementer()) // 自动生成唯一参数
.start(step1)
.end()
.build();
}
1
2
3
4
5
6
7
8
2. 错误处理与事务一致性
• 若 Job A 失败,后续 Job 不应执行(通过监听器或调度逻辑控制)。
• 使用数据库事务确保临时数据的原子性:
@Bean
public Step step1(DataSource dataSource) {
return stepBuilderFactory.get("step1")
.<Data, Data>chunk(10)
.reader(reader())
.writer(writer(dataSource))
.transactionManager(new DataSourceTransactionManager(dataSource))
.build();
}
1
2
3
4
5
6
7
8
9
四、适用场景总结
场景 解决方案
简单顺序依赖 JobExecutionListener 或调度器
复杂条件分支 Spring Cloud Data Flow
跨任务数据传递 数据库/文件/消息队列
需要可视化编排 工作流引擎(如 Airflow、SCDF)
通过上述方法,可灵活解决批处理任务间的依赖问题,确保任务按需执行且数据一致。
————————————————
原文链接:https://blog.csdn.net/wendao76/article/details/146612925