首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >可以在spring batch中组合分区和并行步骤吗?

可以在spring batch中组合分区和并行步骤吗?
EN

Stack Overflow用户
提问于 2018-09-08 13:58:39
回答 2查看 1.8K关注 0票数 1

我想知道这在Spring Batch中是可行的吗?

Step1

Step2 (flow) -> flow1、flow2、flow3

Step3

其中每个

flow1 ->分区为5个GridSize

flow2 ->分区为5个GridSize

flow3 ->分区为5个GridSize

代码语言:javascript
复制
return jobBuilderFactory.get("dataLoad")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .start(step1())
            .next(step2())
            .next(step3())
            .build()
            .build();
@Bean
public Flow step2() {
    Flow subflow1 = new FlowBuilder<Flow>("readTable1Flow").from(readTable1()).end();
    Flow subflow2 = new FlowBuilder<Flow>("readTable2Flow").from(readTable2()).end();
    Flow subflow3 = new FlowBuilder<Flow>("readTable3Flow").from(readTable3()).end();

    return new FlowBuilder<Flow>("splitflow").split(new SimpleAsyncTaskExecutor())
            .add(subflow1, subflow2, subflow3).build();
}
@Bean
public Step readTable1() {
    return stepBuilderFactory.get("readTable1Step")
            .partitioner(slaveStep1().getName(), partitioner1())
            .partitionHandler(slaveStep1Handler())
            .build();
}

@Bean
public Step readTable2() {
    return stepBuilderFactory.get("readTable2Step")
            .partitioner(slaveStep2().getName(), partitioner2())
            .partitionHandler(slaveStep2Handler())
            .build();
}
@Bean
public Step readTable3() {
    return stepBuilderFactory.get("readTable3Step")
            .partitioner(slaveStep3().getName(), partitioner2())
            .partitionHandler(slaveStep3Handler())
            .build();
}
EN

回答 2

Stack Overflow用户

发布于 2018-09-10 16:15:57

这是可能的。您可以使用拆分流,其中并行运行的每个步骤都是一个分区步骤。下面是一个示例:

代码语言:javascript
复制
import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Step step1() {
        return steps.get("step1")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(Thread.currentThread().getName() + ": step1");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Flow step2() {
        Flow subflow1 = new FlowBuilder<Flow>("step21_master").from(step21_master()).end();
        Flow subflow2 = new FlowBuilder<Flow>("step22_master").from(step22_master()).end();
        Flow subflow3 = new FlowBuilder<Flow>("step23_master").from(step23_master()).end();

        return new FlowBuilder<Flow>("splitflow").split(taskExecutor())
                .add(subflow1, subflow2, subflow3).build();
    }

    @Bean
    public Step step21_master() {
        return steps.get("step21_master")
                .partitioner("workerStep", partitioner("step21_master"))
                .step(workerStep())
                .gridSize(3)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step step22_master() {
        return steps.get("step22_master")
                .partitioner("workerStep", partitioner("step22_master"))
                .step(workerStep())
                .gridSize(3)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step step23_master() {
        return steps.get("step23_master")
                .partitioner("workerStep", partitioner("step23_master"))
                .step(workerStep())
                .gridSize(3)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step step3() {
        return steps.get("step3")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(Thread.currentThread().getName() + ": step3");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step workerStep() {
        return steps.get("workerStep")
                .tasklet(getTasklet(null))
                .build();
    }

    @Bean
    @StepScope
    public Tasklet getTasklet(@Value("#{stepExecutionContext['data']}") String partitionData) {
        return (contribution, chunkContext) -> {
            System.out.println(Thread.currentThread().getName() + " processing partitionData = " + partitionData);
            return RepeatStatus.FINISHED;
        };
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .flow(step1()).on("*").to(step2())
                .next(step3())
                .build()
                .build();
    }

    @Bean
    public SimpleAsyncTaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }

    public Partitioner partitioner(String stepName) {
        return gridSize -> {
            Map<String, ExecutionContext> map = new HashMap<>(gridSize);
            for (int i = 0; i < gridSize; i++) {
                ExecutionContext executionContext = new ExecutionContext();
                executionContext.put("data", stepName + ":data" + i);
                String key = stepName + ":partition" + i;
                map.put(key, executionContext);
            }
            return map;
        };
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

在本例中,step2是一个拆分流(如您的示例所示),其中每个子流都是一个具有3个工作步骤的分区步骤(主步骤)。如果运行此示例,您应该会看到类似以下内容:

代码语言:javascript
复制
[main] INFO org.springframework.batch.core.launch.support.SimpleJobLauncher - Job: [FlowJob: [name=job]] launched with the following parameters: [{}]
[main] INFO org.springframework.batch.core.job.SimpleStepHandler - Executing step: [step1]
main: step1
[SimpleAsyncTaskExecutor-1] INFO org.springframework.batch.core.job.SimpleStepHandler - Executing step: [step21_master]
[SimpleAsyncTaskExecutor-3] INFO org.springframework.batch.core.job.SimpleStepHandler - Executing step: [step23_master]
[SimpleAsyncTaskExecutor-2] INFO org.springframework.batch.core.job.SimpleStepHandler - Executing step: [step22_master]
SimpleAsyncTaskExecutor-4 processing partitionData = step21_master: data2
SimpleAsyncTaskExecutor-12 processing partitionData = step22_master: data2
SimpleAsyncTaskExecutor-11 processing partitionData = step22_master: data0
SimpleAsyncTaskExecutor-10 processing partitionData = step22_master: data1
SimpleAsyncTaskExecutor-9 processing partitionData = step23_master: data1
SimpleAsyncTaskExecutor-8 processing partitionData = step23_master: data2
SimpleAsyncTaskExecutor-7 processing partitionData = step23_master: data0
SimpleAsyncTaskExecutor-5 processing partitionData = step21_master: data0
SimpleAsyncTaskExecutor-6 processing partitionData = step21_master: data1
main: step3
[main] INFO org.springframework.batch.core.job.SimpleStepHandler - Executing step: [step3]
[main] INFO org.springframework.batch.core.launch.support.SimpleJobLauncher - Job: [FlowJob: [name=job]] completed with the following parameters: [{}] and the following status: [COMPLETED]

step1step2step3按顺序运行,其中step2分为并行运行的3个子步骤,其中每个子步骤是并行运行的3个工作步骤的主步骤。

票数 1
EN

Stack Overflow用户

发布于 2018-09-11 05:07:27

我现在让它起作用了。我不能让它工作的结果是线程死锁。spring batch正在尝试向数据库(HsqlDB)中插入/更新元数据。每个流都处于等待状态。一旦我切换到不同的数据库,它就能工作。谢谢Mahmoud提供的信息!!

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52232490

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档