前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Batch实战(二)

Spring Batch实战(二)

作者头像
xdd
发布2022-07-12 14:27:01
8870
发布2022-07-12 14:27:01
举报
文章被收录于专栏:java技术鸡汤java技术鸡汤

1、SpringBatch支持三种类型的数据源来进行批处理:

  • 数据库类型
  • 文件类型
  • 消息类型

2、概念术语

(1)Job

在Spring Batch中,Job只是Step实例的容器。它将在逻辑上属于一个流的多个步骤组合在一起,并允许配置所有步骤的全局属性,比如可重新启动性。作业配置包含:

  • Job的名称
  • Step实例的定义和顺序
  • 任务是否可以重新启动
代码语言:javascript
复制
@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

如上图所示,我们使用JobBuilderFactory来创建了一个Job,任务的名字叫footballJob,开始第一个Step是playerLoad(),第二个Step是gameLoad(),第三个Step是playerSummarization()。

(2)JobInstance

它的含义就是一个Job对应了一个批处理的Job实例。

(3)JobParameters

JobParameters对象保存一组用于启动批处理作业的参数。它们可以用于识别,甚至在运行过程中作为参考数据,如下图所示:

在前面的示例中,有两个实例,一个用于1月1日,另一个用于1月2日,实际上只有一个Job,但它有两个JobParameter对象:一个以JobParameters01-01-2017开始,另一个以参数01-02-2017开始。因此,它们之间的联系可以定义为:JobInstance = Job +标识JobParameters。这允许开发人员有效地控制JobInstance的定义方式,因为它们控制传入的参数

(4)JobExecution

代表这个Job执行的一些信息,比如Job执行的上下文,Job的创建时间和结束时间。

(5)Step

关于Step,你可以这么理解,一个任务就是一个Job,但是你的任务是由许多步骤组成的,在每个步骤里面会做一些逻辑处理,比如从数据源读取数据、对读取的数据进行清洗转换、最后将干净的数据写入目标数据源,这里的步骤就是Step,所以一个Job可以由一个Step或者多个Step组成。

(6)StepExecution

对应Step,它也有对应的StepExecution,它里面包含了一个Job执行的每个Step的执行上下文等信息。

3、实战

关于SpringBatch的批处理,大概处理流程如下图所示:

下面介绍一个多数据源数据的批处理例子

场景:存在多个数据源,需要将多个数据源的数据抽取同步到单个数据源,其中还需要在同步的过程中对多个数据源的数据进行抽取、清洗、转换等操作,把最终的结果写入单个数据源

1、Job和Step定义

代码语言:javascript
复制
package com.batch.example.demo.job;

import com.batch.example.demo.pojo.FundProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:54
 */
@Configuration
@Slf4j
public class FundProductSyncJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ItemReader<FundProduct> readFundProduct;
    @Autowired
    private ItemWriter<FundProduct> writeProduct;
    @Autowired
    private ItemProcessor<FundProduct, FundProduct> productProcessor;

    @Bean
    public Job productSyncJob() {
        return jobBuilderFactory.get("productSyncJob")
                .start(readFundProductStep())
                .build();
    }

    @Bean
    public Step readFundProductStep() {
        return stepBuilderFactory.get("readFundProductStep")
                .<FundProduct, FundProduct>chunk(1000)
                .reader(readFundProduct)
                .processor(productProcessor)
                .writer(writeProduct)
                .build();
    }
}

2、Reader定义

代码语言:javascript
复制
package com.batch.example.demo.read;

import com.batch.example.demo.pojo.FundProduct;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.OraclePagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:55
 */
@Configuration
@Slf4j
public class FundProductReader {

    @Autowired
    @Qualifier("chDataSource")
    private DataSource chDataSource;

    @Autowired
    @Qualifier("chJdbcTemplate")
    private JdbcTemplate chJdbcTemplate;

    @Bean
    public JdbcPagingItemReader<FundProduct> readFundProduct() {
        JdbcPagingItemReader<FundProduct> itemReader = new JdbcPagingItemReader<>();
        itemReader.setDataSource(chDataSource);
        itemReader.setFetchSize(1000);
        itemReader.setRowMapper((rs, rowNum) -> {
            FundProduct product = new FundProduct();
            product.setProductCode(rs.getString(2));
            product.setProductName(rs.getString(3));
            product.setProductType(rs.getString(4));
            product.setFundManagerCode(rs.getString(5));
            product.setFundTrusteeCode(rs.getString(6));
            return product;
        });
        OraclePagingQueryProvider queryProvider = new OraclePagingQueryProvider();
        queryProvider.setSelectClause("select ID,FSYMBOL,FDSNAME,FDTYPE,KEEPERCODE,TRUSTEECODE");
        queryProvider.setFromClause(" from TQ_FD_BASICINFO  ");
        Map<String, Order> sortedMap = new HashMap<>();
        sortedMap.put("ID", Order.DESCENDING);
        queryProvider.setSortKeys(sortedMap);
        itemReader.setQueryProvider(queryProvider);
        return itemReader;
    }
}

3、Processor定义

代码语言:javascript
复制
package com.batch.example.demo.processor;

import com.batch.example.demo.pojo.FundProduct;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:54
 */
@Component
@Slf4j
public class FundProductProcessor implements ItemProcessor<FundProduct, FundProduct> {

    @Override
    public FundProduct process(FundProduct fundProduct) throws Exception {
        FundProduct product = new FundProduct();
        // 基金代码标准化
        product.setId(UUID.randomUUID().toString().replace("-", ""));
        product.setProductCode("standard" + fundProduct.getProductCode());
        product.setFundTrusteeCode(fundProduct.getFundTrusteeCode());
        product.setFundManagerCode(fundProduct.getFundManagerCode());
        product.setProductType(fundProduct.getProductType());
        product.setProductName(fundProduct.getProductName());
        return product;
    }
}

4、Writer定义

代码语言:javascript
复制
package com.batch.example.demo.write;

import com.batch.example.demo.pojo.FundProduct;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:55
 */
@Configuration
@Slf4j
public class FundProductWriter {

    @Autowired
    @Qualifier("primaryDataSource")
    private DataSource primaryDataSource;

    @Bean
    public JdbcBatchItemWriter<FundProduct> writeProduct() {
        JdbcBatchItemWriter<FundProduct> batchItemWriter = new JdbcBatchItemWriter<>();
        batchItemWriter.setDataSource(primaryDataSource);
        batchItemWriter.setSql(" INSERT INTO product (id, product_code, product_name, product_type, "
                + "fund_manager_code, fund_trustee_code) VALUES (:id, :productCode, :productName, "
                + ":productType, :fundManagerCode, :fundTrusteeCode) ");
        batchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        return batchItemWriter;
    }
}

5、多数据源定义

代码语言:javascript
复制
package com.batch.example.demo.config;

import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcTemplate;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:06
 */
@Slf4j
@Configuration
public class DynamicDataSourceConfig {

    @Bean
    @Primary
    @ConfigurationProperties("spring.datasource.primary")
    public DataSourceProperties primaryDataSourceProperties() {
        return new DataSourceProperties();
    }

    @Bean("chDataSourceProperties")
    @ConfigurationProperties("spring.datasource.ch")
    public DataSourceProperties chDataSourceProperties() {
        return new DataSourceProperties();
    }

    @Bean
    @Primary
    public DataSource primaryDataSource(@Autowired DataSourceProperties props) {
        return props.initializeDataSourceBuilder().build();
    }

    @Bean("chDataSource")
    public DataSource chDataSource(@Autowired @Qualifier("chDataSourceProperties") DataSourceProperties props) {
        return props.initializeDataSourceBuilder().build();
    }

    @Bean
    @Primary
    public JdbcTemplate primaryJdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean(name = "chJdbcTemplate")
    public JdbcTemplate chJdbcTemplate(@Qualifier("chDataSource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }
}

6、配置信息

代码语言:javascript
复制
# 多数据源配置
# target postgresql database
spring.datasource.primary.driver-class-name=org.postgresql.Driver
spring.datasource.primary.url=jdbc:postgresql://host:port/dbname
spring.datasource.primary.username=
spring.datasource.primary.password=
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-drop-postgresql.sql
spring.batch.jdbc.initialize-schema=always
spring.datasource.primary.type=com.alibaba.druid.pool.DruidDataSource
# 财汇 oracle
spring.datasource.ch.url=jdbc:oracle:thin:@ip:port:instance
spring.datasource.ch.username=
spring.datasource.ch.password=
spring.datasource.ch.driver-class-name=oracle.jdbc.OracleDriver
spring.datasource.ch.type=com.alibaba.druid.pool.DruidDataSource
# wind oracle
spring.datasource.wind.url=jdbc:oracle:thin:@ip:port:instance
spring.datasource.wind.username=
spring.datasource.wind.password=
spring.datasource.wind.driver-class-name=oracle.jdbc.OracleDriver
spring.datasource.wind.type=com.alibaba.druid.pool.DruidDataSource

7、建表语句

代码语言:javascript
复制
create table product
(
    id                varchar(32) not null
        constraint product_pkey
            primary key,
    product_code      varchar(128),
    product_name      varchar(1024),
    product_type      varchar(128),
    fund_manager_code varchar(128),
    fund_trustee_code varchar(128)
);

comment on table product is '产品表';

comment on column product.id is '产品id';

comment on column product.product_code is '基金代码';

comment on column product.product_name is '基金简称';

comment on column product.product_type is '基金类型';

comment on column product.fund_manager_code is '基金管理人code';

comment on column product.fund_trustee_code is '基金托管人code';

alter table product
    owner to product_dev3;

9、数据对象定义

代码语言:javascript
复制
package com.batch.example.demo.pojo;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:51
 */
@Getter
@Setter
@ToString
public class FundProduct {


    private String id;

    /**
     * 基金代码
     */
    private String productCode;

    /**
     * 基金简称
     */
    private String productName;
    /**
     * 基金类型
     */
    private String productType;
    /**
     * 基金管理人代码
     */
    private String fundManagerCode;
    /**
     * 基金托管人代码
     */
    private String fundTrusteeCode;


}

这篇文章主要是对SpringBatch中的其余概念做补充,并写了一个springbatch对于多数据源数据抽取转换的一个demo,供大家参考。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-05-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java技术鸡汤 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档