前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Batch 批处理(6) - ItemProcessor

Spring Batch 批处理(6) - ItemProcessor

作者头像
chenchenchen
发布2020-05-26 17:17:52
1.4K0
发布2020-05-26 17:17:52
举报
文章被收录于专栏:chenchenchenchenchenchen

ItemProcessor

在开发过程中,我们经常需要读取数据后,经过一系列业务逻辑的操作,进而写入数据到指定持久化过程。Spring Batch为我们提供了ItemProcessor接口进行数据处理。

1.ItemProcessor:spring-batch中数据处理的过程

2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等

3.Spring-batch为我们提供ItemProcessor<I,O>这个接口,传入一个类型I,然后由Processor处理成为O

代码语言:javascript
复制
public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

我们可以构建CompositeItemProcessor 的Bean, 在一个Step中可以使用多个Processor来按照顺序处理业务。 我们可以构建CompositeItemProcessor 的Bean,存储多个Processor,再与Step绑定;

代码语言:javascript
复制
  @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    private DataSource dataSource;
 
    @Autowired
    private ItemProcessor<Customer, Customer> firstNameUpperCaseProcessor;
 
    @Autowired
    private ItemProcessor<Customer, Customer> idFilterProcessor;
 
    @Bean
    public Job processorDemoJob() throws Exception {
        return jobBuilderFactory.get("processorDemoJob")
                .start(processorDemoStep())
                .build();
 
    }
 
    @Bean
    public Step processorDemoStep() throws Exception {
        return stepBuilderFactory.get("processorDemoStep")
                .<Customer,Customer>chunk(100)
                .reader(dbJdbcDemoReader())
                .processor(processorDemoProcessor())
                .writer(flatFileDemoFlatFileWriter())
                .build();
    }
 
    @Bean
    public CompositeItemProcessor<Customer,Customer> processorDemoProcessor(){
        CompositeItemProcessor<Customer,Customer> processor = new CompositeItemProcessor<>();
			  // 多种处理方式
        List<ItemProcessor<Customer,Customer>> list = new ArrayList<>();
        list.add(firstNameUpperCaseProcessor);
        list.add(idFilterProcessor);
        processor.setDelegates(list);
        
        return processor;
    }
 
    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> dbJdbcDemoReader() {
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
 
        reader.setDataSource(this.dataSource);
        reader.setFetchSize(100);
        reader.setRowMapper((rs,rowNum)->{
            return Customer.builder().id(rs.getLong("id"))
                    .firstName(rs.getString("firstName"))
                    .lastName(rs.getString("lastName"))
                    .birthdate(rs.getString("birthdate"))
                    .build();
 
        });
 
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from Customer");
        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys);
 
        reader.setQueryProvider(queryProvider);
 
        return reader;
 
    }
 
    @Bean
    public FlatFileItemWriter<Customer> flatFileDemoFlatFileWriter() throws Exception {
        FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
        String path = File.createTempFile("customerInfo",".data").getAbsolutePath();
        System.out.println(">> file is created in: " + path);
        itemWriter.setResource(new FileSystemResource(path));
 
        itemWriter.setLineAggregator(new MyCustomerLineAggregator());
        itemWriter.afterPropertiesSet();
 
        return itemWriter;
 
    }

具体ItemProcessor实现

代码语言:javascript
复制
@Component
public class FirstNameUpperCaseProcessor implements ItemProcessor<Customer,Customer> {
    @Override
    public Customer process(Customer item) throws Exception {
		    // FirstName转大写
        return new Customer(item.getId(),item.getFirstName().toUpperCase(),item.getLastName(),
                item.getBirthdate());
    }
}
 

@Component
public class IdFilterProcessor implements ItemProcessor<Customer,Customer> {
    @Override
    public Customer process(Customer item) throws Exception {
		    // 过滤id为基数的数据
        if (item.getId() % 2 == 0){
            return item;
        } else {
            return null;
        }
    }
}

输出结果

file
file

参考:

https://blog.csdn.net/wuzhiwei549/article/details/88622281

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-03-10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ItemProcessor
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档