【转载】Spring Batch批处理框架深入解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
配置到原 Job 中:

@Bean
public Job firstJob(Step step1) {
    return jobBuilderFactory.get("firstJob")
            .listener(jobCompletionListener) // 绑定监听器
            .start(step1)
            .end()
            .build();
}
1
2
3
4
5
6
7
8
2. 通过调度框架编排任务
使用 Spring Scheduler 或 Quartz 等工具,通过代码控制 Job 执行顺序。

示例:Spring Scheduler 编排任务:

import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class JobScheduler {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    @Qualifier("firstJob")
    private Job firstJob;

    @Autowired
    @Qualifier("secondJob")
    private Job secondJob;

    @Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行
    public void runJobsSequentially() throws Exception {
        // 执行第一个 Job
        JobParameters params1 = new JobParametersBuilder()
                .addLong("startAt", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(firstJob, params1);

        // 第一个 Job 成功后,执行第二个 Job
        JobParameters params2 = new JobParametersBuilder()
                .addLong("startAt", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(secondJob, params2);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
3. 利用工作流引擎(如 Spring Cloud Data Flow)
通过可视化工具编排多个 Job,定义复杂的依赖关系。

实现步骤:

将每个 Job 打包为独立的 Spring Boot 应用。
在 Spring Cloud Data Flow 中定义任务流程:
task create --name jobFlow --definition "firstJob && secondJob"
1
通过 REST API 或 Dashboard 触发流程执行。
4. 通过数据库或文件传递数据
Job A 将结果写入数据库或文件,Job B 读取该数据继续处理。

示例:通过数据库传递数据:

Job A 写入数据到临时表:
CREATE TABLE temp_data (id INT, result VARCHAR(100));
1
Job B 读取临时表数据:
@Bean
public JdbcCursorItemReader<Data> reader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<Data>()
            .name("dataReader")
            .dataSource(dataSource)
            .sql("SELECT id, result FROM temp_data")
            .rowMapper((rs, rowNum) -> new Data(rs.getInt("id"), rs.getString("result")))
            .build();
}
1
2
3
4
5
6
7
8
9
三、关键注意事项
1. 避免重复执行同一 Job 实例
• 每次启动 Job 时,使用不同的 JobParameters(如时间戳)。
• 使用 RunIdIncrementer 自动生成唯一参数:

@Bean
public Job firstJob(Step step1) {
    return jobBuilderFactory.get("firstJob")
            .incrementer(new RunIdIncrementer()) // 自动生成唯一参数
            .start(step1)
            .end()
            .build();
}
1
2
3
4
5
6
7
8
2. 错误处理与事务一致性
• 若 Job A 失败,后续 Job 不应执行(通过监听器或调度逻辑控制)。
• 使用数据库事务确保临时数据的原子性:

@Bean
public Step step1(DataSource dataSource) {
    return stepBuilderFactory.get("step1")
            .<Data, Data>chunk(10)
            .reader(reader())
            .writer(writer(dataSource))
            .transactionManager(new DataSourceTransactionManager(dataSource))
            .build();
}
1
2
3
4
5
6
7
8
9
四、适用场景总结
场景    解决方案
简单顺序依赖    JobExecutionListener 或调度器
复杂条件分支    Spring Cloud Data Flow
跨任务数据传递    数据库/文件/消息队列
需要可视化编排    工作流引擎(如 Airflow、SCDF)
通过上述方法,可灵活解决批处理任务间的依赖问题,确保任务按需执行且数据一致。
————————————————

                                   
原文链接:https://blog.csdn.net/wendao76/article/details/146612925

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值