前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >配置 Spring Batch 批处理失败重试

配置 Spring Batch 批处理失败重试

作者头像
程序猿DD
发布2023-04-04 15:10:58
1.2K0
发布2023-04-04 15:10:58
举报
文章被收录于专栏:程序猿DD

点击上方蓝色“程序猿DD”,选择“设为星标”

回复“资源”获取独家整理的学习资料!

1. 引言

默认情况下,Spring批处理作业在执行过程中出现任何错误都会失败。然而有些时候,为了提高应用程序的弹性,我们就需要处理这类间歇性的故障。在这篇短文中,我们就来一起探讨 如何在Spring批处理框架中配置重试逻辑

如果对spring batch不了解,可以参考以前的一篇文章:

开车!Spring Batch 入门级示例教程! (http://mp.weixin.qq.com/s?__biz=MzIzNzYxNDYzNw==&mid=2247483959&idx=1&sn=65e9d738892483c9eaf9b54bfb3b8455&chksm=e8c4a175dfb32863959106f81ae65ff6194b3cf5efbbf5aeeb58293b238b542394f23f9eb6ed&scene=21#wechat_redirect)

2. 简单举例

假设有一个批处理作业,它读取一个CSV文件作为输入:

代码语言:javascript
复制
username, userid, transaction_date, transaction_amount
sammy, 1234, 31/10/2015, 10000
john, 9999, 3/12/2015, 12321

然后,它通过访问REST端点来处理每条记录,获取用户的 age 和 postCode 属性:

代码语言:javascript
复制
public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {
    
    @Override
    public Transaction process(Transaction transaction) throws IOException {
        log.info("RetryItemProcessor, attempting to process: {}", transaction);
        HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
        //parse user's age and postCode from response and update transaction
        ...
        return transaction;
    }
    ...
}

最后,它生成并输出一个合并的XML

代码语言:javascript
复制
<transactionRecord>
    <transactionRecord>
        <amount>10000.0</amount>
        <transactionDate>2015-10-31 00:00:00</transactionDate>
        <userId>1234</userId>
        <username>sammy</username>
        <age>10</age>
        <postCode>430222</postCode>
    </transactionRecord>
    ...
</transactionRecord>

3. ItemProcessor 中添加重试

现在假设,如果到REST端点的连接由于某些网络速度慢而超时,该怎么办?如果发生这种情况,则我们的批处理工作将失败。

在这种情况下,我们希望失败的 item 处理重试几次。因此,接下来我将批处理作业配置为:在出现故障时执行最多三次重试

代码语言:javascript
复制
@Bean
public Step retryStep(
  ItemProcessor<Transaction, Transaction> processor,
  ItemWriter<Transaction> writer) throws ParseException {
    return stepBuilderFactory
      .get("retryStep")
      .<Transaction, Transaction>chunk(10)
      .reader(itemReader(inputCsv))
      .processor(processor)
      .writer(writer)
      .faultTolerant()
      .retryLimit(3)
      .retry(ConnectTimeoutException.class)
      .retry(DeadlockLoserDataAccessException.class)
      .build();
}

这里调用了 faultTolerant() 来启用重试功能。另外,我们使用 retry 和 retryLimit 分别定义符合重试条件的异常和 item 的最大重试次数

4. 测试重试次数

假设我们有一个测试场景,其中返回 age 和 postCode 的REST端点关闭了一段时间。在这个测试场景中,我们只对前两个 API 调用获取一个 ConnectTimeoutException ,而第三个调用将成功:

代码语言:javascript
复制
@Test
public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {
    FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);
    FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);

    when(httpResponse.getEntity())
      .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }"));
 
    //fails for first two calls and passes third time onwards
    when(httpClient.execute(any()))
      .thenThrow(new ConnectTimeoutException("Timeout count 1"))
      .thenThrow(new ConnectTimeoutException("Timeout count 2"))
      .thenReturn(httpResponse);

    JobExecution jobExecution = jobLauncherTestUtils
      .launchJob(defaultJobParameters());
    JobInstance actualJobInstance = jobExecution.getJobInstance();
    ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

    assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
    assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED"));
    AssertFile.assertFileEquals(expectedResult, actualResult);
}

在这里,我们的工作成功地完成了。另外,从日志中可以明显看出 第一条记录 id=1234 失败了两次,最后在第三次重试时成功了

代码语言:javascript
复制
19:06:57.742 [main] INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep]
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999
19:06:57.773 [main] INFO  o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms

同样,看下另一个测试用例,当所有重试次数都用完时会发生什么:

代码语言:javascript
复制
@Test
public void whenEndpointAlwaysFail_thenJobFails() throws Exception {
    when(httpClient.execute(any()))
      .thenThrow(new ConnectTimeoutException("Endpoint is down"));

    JobExecution jobExecution = jobLauncherTestUtils
      .launchJob(defaultJobParameters());
    JobInstance actualJobInstance = jobExecution.getJobInstance();
    ExitStatus actualJobExitStatus = jobExecution.getExitStatus();

    assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
    assertThat(actualJobExitStatus.getExitCode(), is("FAILED"));
    assertThat(actualJobExitStatus.getExitDescription(),
      containsString("org.apache.http.conn.ConnectTimeoutException"));
}

在这个测试用例中,在作业因 ConnectTimeoutException 而失败之前,会尝试对第一条记录重试三次。

5. 使用XML配置重试

最后,让我们看一下与上述配置等价的XML:

代码语言:javascript
复制
<batch:job id="retryBatchJob">
    <batch:step id="retryStep">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter"
              processor="retryItemProcessor" commit-interval="10"
              retry-limit="3">
                <batch:retryable-exception-classes>
                    <batch:include class="org.apache.http.conn.ConnectTimeoutException"/>
                    <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
                </batch:retryable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

6. 简单总结

在本文中,我们学习了如何在Spring批处理中配置重试逻辑,其中包括使用Java和XML配置。以及使用单元测试来观察重试在实践中是如何工作的。

推荐关注本文作者

【往期推荐】

居然还有这种游戏...是不是有点刺激过头了啊...

2020-12-19

Spring Boot 2.4版本前后的分组配置变化及对多环境配置结构的影响

2020-12-19

赞!推荐一款神仙颜值的 Redis 客户端工具

2020-12-19

左滑右滑,在VS Code里滑个妹纸给你写喜欢的代码?

2020-12-18

教你一分钟内导出 Grafana 所有的 Dashboard

2020-12-18

注意!我们熟知的“摩尔定律”被废了...

2020-12-17

深度内容

推荐加入

欢迎加入知识星球,一起探讨技术架构,交流技术人生。

加入方式,长按下方二维码:

已在知识星球更新如下:

素质二连,走一个

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿DD 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 引言
  • 2. 简单举例
  • 3. ItemProcessor 中添加重试
  • 4. 测试重试次数
  • 5. 使用XML配置重试
  • 6. 简单总结
相关产品与服务
腾讯云服务器利旧
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档