前言碎语
关于spring batch概念及基本使用,可移步《spring batch精选,一文吃透spring batch》,本文主要内容为spring batch的进阶内容,也就是spring batch的扩展(Multithreaded Step 多线程执行一个Step;Parallel Step 通过多线程并行执行多个Step;Remote Chunking 在远端节点上执行分布式Chunk作;Partitioning Step 对数据进行分区,并分开执行;)的Partitioning Step。本文构建的实例可为主服务,从服务,主从混用等模式,可以大大提高spring batch在单机处理时的时效。
本文项目源码:https://gitee.com/kailing/partitionjob
spring batch远程分区Step的原理
master节点将数据根据相关逻辑(ID,hash),拆分成一段一段要处理的数据集,然后将数据集放到消息中间件中(ActiveMQ,RabbitMQ ),从节点监听到消息,获取消息,读取消息中的数据集处理并发回结果。如下图:
下面按原理分步骤实施,完成spring batch的远程分区实例
第一步,首先引入相关依赖
见:https://gitee.com/kailing/partitionjob/blob/master/pom.xml
分区job主要依赖为:spring-batch-integration,提供了远程通讯的能力
第二步,Master节点数据分发
@Profile({"master", "mixed"})
@Bean
public Job job(@Qualifier("masterStep") Step masterStep) {
return jobBuilderFactory.get("endOfDayjob")
.start(masterStep)
.incrementer(new BatchIncrementer())
.listener(new JobListener())
.build();
}
@Bean("masterStep")
public Step masterStep(@Qualifier("slaveStep") Step slaveStep,
PartitionHandler partitionHandler,
DataSource dataSource) {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep.getName(), new ColumnRangePartitioner(dataSource))
.step(slaveStep)
.partitionHandler(partitionHandler)
.build();
}
master节点关键部分是,他的Step需要设置从节点Step的Name,和一个数据分区器,数据分区器需要实现Partitioner接口,它返回一个Map<String, ExecutionContext>的数据结构,这个结构完整的描述了每个从节点需要处理的分区片段。ExecutionContext保存了从节点要处理的数据边界,当然,ExecutionContext里的参数是根据你的业务来的,我这里,已数据ID为边界划分了每个区。具体的Partitioner实现如下:
/**
* Created by kl on 2018/3/1.
* Content :根据数据ID分片
*/
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
ColumnRangePartitioner(DataSource dataSource){
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(arcid) from kl_article", Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(arcid) from kl_article", Integer.class);
int targetSize = (max - min) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
}
第三步,Integration配置
spring batch Integration提供了远程分区通讯能力,Spring Integration拥有丰富的通道适配器(例如JMS和AMQP),基于ActiveMQ,RabbitMQ等中间件都可以实现远程分区处理。本文使用RabbitMQ来做为通讯的中间件。关于RabbitMQ的安装等不在本篇范围,下面代码描述了如何配置MQ连接,以及spring batch分区相关队列,消息适配器等。
/**
* Created by kl on 2018/3/1.
* Content :远程分区通讯
*/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbit")
public class IntegrationConfiguration {
private String host;
private Integer port=5672;
private String username;
private String password;
private String virtualHost;
private int connRecvThreads=5;
private int channelCacheSize=10;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(connRecvThreads);
executor.initialize();
connectionFactory.setExecutor(executor);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setChannelCacheSize(channelCacheSize);
return connectionFactory;
}
@Bean
public MessagingTemplate messageTemplate() {
MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());
messagingTemplate.setReceiveTimeout(60000000l);
return messagingTemplate;
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundRequests")
public AmqpOutboundEndpoint amqpOutboundEndpoint(AmqpTemplate template) {
AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(template);
endpoint.setExpectReply(true);
endpoint.setOutputChannel(inboundRequests());
endpoint.setRoutingKey("partition.requests");
return endpoint;
}
@Bean
public Queue requestQueue() {
return new Queue("partition.requests", false);
}
@Bean
@Profile({"slave","mixed"})
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inboundRequests());
adapter.afterPropertiesSet();
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("partition.requests");
container.setAutoStartup(false);
return container;
}
@Bean
public PollableChannel outboundStaging() {
return new NullChannel();
}
@Bean
public QueueChannel inboundRequests() {
return new QueueChannel();
}
第四步,从节点接收分区信息并处理
@Bean
@Profile({"slave","mixed"})
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();
stepLocator.setBeanFactory(this.applicationContext);
stepExecutionRequestHandler.setStepLocator(stepLocator);
stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);
return stepExecutionRequestHandler;
}
@Bean("slaveStep")
public Step slaveStep(MyProcessorItem processorItem,
JpaPagingItemReader reader) {
CompositeItemProcessor itemProcessor = new CompositeItemProcessor();
List<ItemProcessor> processorList = new ArrayList<>();
processorList.add(processorItem);
itemProcessor.setDelegates(processorList);
return stepBuilderFactory.get("slaveStep")
.<Article, Article>chunk(1000)//事务提交批次
.reader(reader)
.processor(itemProcessor)
.writer(new PrintWriterItem())
.build();
}
从节点最关键的地方在于StepExecutionRequestHandler,他会接收MQ消息中间件中的消息,并从分区信息中获取到需要处理的数据边界,如下ItemReader:
@Bean(destroyMethod = "")
@StepScope
public JpaPagingItemReader<Article> jpaPagingItemReader(
@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.err.println("接收到分片参数["+minValue+"->"+maxValue+"]");
JpaPagingItemReader<Article> reader = new JpaPagingItemReader<>();
JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider<>();
String sql = "select * from kl_article where arcid >= :minValue and arcid <= :maxValue";
queryProvider.setSqlQuery(sql);
queryProvider.setEntityClass(Article.class);
reader.setQueryProvider(queryProvider);
Map queryParames= new HashMap();
queryParames.put("minValue",minValue);
queryParames.put("maxValue",maxValue);
reader.setParameterValues(queryParames);
reader.setEntityManagerFactory(entityManagerFactory);
return reader;
}
中的minValuemin,maxValue,正是前文中Master节点分区中设置的值
文末总结
如上,已经完成了整个spring batch 远程分区处理的实例,需要注意的是,一个实例,即可主可从可主从,是有spring profile来控制的,细心的人可能会发现@Profile({"master", "mixed"})等注解,所以如果你在测试的时候,别忘了在spring boot中配置好spring.profiles.active=slave等
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。