前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Batch实战(三)

Spring Batch实战(三)

作者头像
xdd
发布2022-07-12 14:27:34
1.2K0
发布2022-07-12 14:27:34
举报
文章被收录于专栏:java技术鸡汤java技术鸡汤

今天这篇文章,我们来了解一下SpringBatch的ItemReaders、ItemWriters、ItemStream以及怎么注册一个Step。前一篇文章我分析了一下怎么去从database中load数据使用ItemReader的一个子类JdbcPageQueryProvider,今天就进一步分析一下读取数据库数据源时的两个关键类ItemReader和ItemStream,以及写入数据库时的ItemWriter。

1、ItemReader

对于ItemReader,大家应该有个整体的认识,就是它是将许多不同的数据源数据来进行读取,然后使用ItemProcessor或者ItemWriter来写入到目标数据库或者NoSQL中,其中我们最长读取的三种类型就是:

(1)Flat文件:

怎么来理解这个Flat文件,我的个人理解是ItemReader是可以从该文件中按照行模式来读取数据,该文件要么就是有固定的格式,比如字段名,下面是value或者是每个记录的不同内容之间采用逗号之类的做隔离,比如下面两张图就是Flat File的两种类型:

带有字段定义的txt文件

这种是不带字段定义的txt文件。

(2)XML文件:

XML ItemReaders处理XML独立于用于解析、映射和验证对象的技术。输入数据允许根据XSD模式验证XML文件,具体例子见下图:

(3)关系型数据库:

访问数据库资源以返回可映射到对象进行处理的ResultSet。默认的SQL ItemReader实现调用一个RowMapper来返回对象。

下面是ItemReader的接口定义:

代码语言:javascript
复制
public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

该接口只提供一个read方法,定义了ItemReader最基本的方法。调用它将返回一个数据项,如果没有剩余数据项则返回null。一个数据项可以表示文件中的一行、数据库中的一行或XML文件中的一个元素。通常期望这些被映射到一个领域对象(例如Trade、Foo或其他)。

如果底层资源是事务性的(例如JMS队列),那么在回滚场景中的后续调用中调用read方法可能返回相同的逻辑项。值得注意的是,缺少要由ItemReader处理的项并不会导致抛出异常。例如,配置了返回0结果的查询的数据库ItemReader在第一次调用read时返回null。

2、ItemWriter

ItemWriter在功能上类似于ItemReader,但具有反向操作。资源仍然需要定位、打开和关闭,但它们的不同之处在于ItemWriter是写入,而不是读取。对于数据库或队列,这些操作可能是插入、更新或发送。输出序列化的格式特定于每个批处理作业。

如下所示,是ItemWriter接口的定义:

代码语言:javascript
复制
public interface ItemWriter<T> {
    void write(List<? extends T> items) throws Exception;
}

ItemWriter是来把从ItemReader中读取的数据来批量写入目标数据源,SpringBatch的设计是希望批量写入,这里需要提一下chunk,它里面是可以设置每一批处理多少条记录的。

3、ItemStream

itemreader和itemwriter都很好地满足了各自的目的,但它们之间存在一个共同的问题,需要另一个接口。通常,作为批处理作业范围的一部分,需要打开和关闭读取器和写入器,并需要一种持久化状态的机制。ItemStream接口实现了这个目的,如下面的例子所示:

代码语言:javascript
复制
public interface ItemStream {
    void open(ExecutionContext executionContext) throws ItemStreamException;
    void update(ExecutionContext executionContext) throws ItemStreamException;
    void close() throws ItemStreamException;
}

实现了ItemStream的ItemReader的客户端应该在任何读取调用之前调用open,以打开任何资源(如文件)或获取连接。类似的限制也适用于实现ItemStream的ItemWriter。如果在ExecutionContext中找到了预期的数据,那么可以使用它在初始状态以外的位置启动ItemReader或ItemWriter。相反,调用close是为了确保在open期间分配的任何资源都被安全释放。调用update主要是为了确保当前保持的任何状态都被加载到提供的ExecutionContext中。在提交之前调用此方法,以确保在提交之前将当前状态持久化到数据库中。

4、如果数据源是文件类型,txt、csv,xml中之类,用SpringBatch又来怎么处理

demo-SpringBatch从txt文件读取内容

代码语言:javascript
复制
package com.example.springbatch.file;

import com.example.springbatch.item.writer.TxtFileWriter;
import com.example.springbatch.pojo.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class TxtFileReaderJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private TxtFileWriter txtFileWriter;
  
    @Bean
    public Job txtFileReadJob(){
        // 创建一个Job
        return jobBuilderFactory.get("txtFileReadJob")
                .start(txtFileReadStep())
                .build();
    }

    @Bean
    public Step txtFileReadStep() {
        // 创建一个Step,每一批次处理的数据个数为4条记录
        return stepBuilderFactory.get("txtFileReadStep")
                .<Product,Product>chunk(4)
                .reader(txtFileRead())
                .writer(txtFileWriter)
                .build();
    }

    @Bean
    public FlatFileItemReader<Product> txtFileRead() {
        // 此处使用FlatFileItemReader来读取txt文件内容
        FlatFileItemReader<Product> flatFileItemReader = new FlatFileItemReader<>();
        ClassPathResource resource = new ClassPathResource("product.txt");
        flatFileItemReader.setResource(resource);
        // 此处之所以使用setLinesToSkip,主要作用是我们的txt文件内容第一行事标题
        // 我们只需要从第一行数据开始读取即可
        flatFileItemReader.setLinesToSkip(1);
        // 下面的内容就是解析txt文件内容
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
        delimitedLineTokenizer.setNames(new String[]{"id","productName","productCode","productCompany"});
        DefaultLineMapper<Product> defaultLineMapper = new DefaultLineMapper<>();
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        defaultLineMapper.setFieldSetMapper(fieldSet -> {
            Product product = new Product();
            product.setId(fieldSet.readLong("id"));
            product.setProductName(fieldSet.readString("productName"));
            product.setProductCode(fieldSet.readString("productCode"));
            product.setProductCompany(fieldSet.readString("productCompany"));
            return product;
        });
        defaultLineMapper.afterPropertiesSet();
        flatFileItemReader.setLineMapper(defaultLineMapper);
        return flatFileItemReader;
    }
}
代码语言:javascript
复制
package com.example.springbatch.item.writer;

import com.example.springbatch.pojo.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Slf4j
public class TxtFileWriter implements ItemWriter<Product> {
    @Override
    public void write(List<? extends Product> items) throws Exception {
        // 此处的writer主要是将reader的结果打印出来
        items.forEach(p->log.info("基金产品信息:{}",p));
    }
}

Demo-SpringBatch将内容写入txt文件

代码语言:javascript
复制
package com.example.springbatch.file;

import com.example.springbatch.pojo.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class WriteDataToFile {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ItemReader<Product> readDataBaseData;
    @Autowired
    private ItemWriter<Product> writeToTxtFile;

    @Bean
    public Job writeDataToFileJob(){
        return jobBuilderFactory.get("writeDataToFileJob")
                .start(writeDataToFileStep())
                .build();
    }

    @Bean
    public Step writeDataToFileStep() {
        return stepBuilderFactory.get("writeDataToFileStep")
                .<Product,Product>chunk(5)
                .reader(readDataBaseData)
                .writer(writeToTxtFile)
                .build();
    }
}

下面是writeToTxtFile的实现:

代码语言:javascript
复制
package com.example.springbatch.config;

import com.example.springbatch.pojo.Product;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

@Configuration
@Slf4j
public class WriteToTxtFileConfig {

    @Bean
    public FlatFileItemWriter<Product> writeToTxtFile() throws Exception {
        FlatFileItemWriter<Product> writer = new FlatFileItemWriter<>();
        String filePath = "/Users/yongyongli/reading/msad/product.txt";
        writer.setResource(new FileSystemResource(filePath));
        writer.setLineAggregator(item -> {
            ObjectMapper objectMapper = new ObjectMapper();
            String result = null;
            try {
                result = objectMapper.writeValueAsString(item);
            } catch (JsonProcessingException e) {
                log.error("异常",e);
            }
            return result;
        });
        writer.afterPropertiesSet();
        return writer;
    }
}

Demo-SpringBatch从XML文件读取内容

代码语言:javascript
复制
@Bean
    public StaxEventItemReader<Product> xmlFileRead() {
        StaxEventItemReader<Product> staxEventItemReader =new StaxEventItemReader<>();
        staxEventItemReader.setResource(new ClassPathResource("product.xml"));
        staxEventItemReader.setFragmentRootElementName("product");
        XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
        Map<String, Class> aliases = new HashMap<>();
        aliases.put("product",Product.class);
        xStreamMarshaller.setAliases(aliases);
        staxEventItemReader.setUnmarshaller(xStreamMarshaller);
        return staxEventItemReader;
    }

Demo-SpringBatch将内容写入XML文件

代码语言:javascript
复制
@Bean
    public StaxEventItemWriter<Product> writeDataToXml() throws Exception {
        StaxEventItemWriter<Product> writer = new StaxEventItemWriter<>();
        XStreamMarshaller xStreamMarshaller = new XStreamMarshaller() ;
        Map<String,Class> aliases = new HashMap<>();
        aliases.put("product",Product.class);
        xStreamMarshaller.setAliases(aliases);
        writer.setRootTagName("products");
        writer.setMarshaller(xStreamMarshaller);
        String filePath = "/Users/yongyongli/reading/msad/product01.xml";
        writer.setResource(new FileSystemResource(filePath));
        writer.afterPropertiesSet();
        return writer;
    }

今天就主要分享了SpringBatch中的ItemReader、ItemWriter、ItemStream,并在结尾附加了SpringBatch的一些实战案例,从txt文件、xml等数据源读取数据,把数据写入到xml或者txt文件中去,希望对大家了解SpringBatch有所帮助,如果文章存在错误之处,还请指正。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-05-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java技术鸡汤 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档