聊聊reactor extra的retry

本文主要研究一下reactor extra的retry

maven

        <dependency>
            <groupId>io.projectreactor.addons</groupId>
            <artifactId>reactor-extra</artifactId>
            <version>3.1.4.RELEASE</version>
        </dependency>

实例

        TcpClient client = TcpClient.create("localhost", 8888);
        client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .subscribe();

上面这个TcpClient,在server没有启动的情况下连接会直接报错

io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8888
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
    ... 10 more

连接失败重连

简单重试

        client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retry(3)
                .subscribe();

retry可以直接指定重试次数

高级重试

        client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retryWhen(Retry.allBut(RuntimeException.class)
                .retryMax(5000)
                .fixedBackoff(Duration.ofSeconds(5))
                .doOnRetry(e -> {
                    e.exception().printStackTrace();
                })
        )
                .subscribe();

利用reactor extra项目中的Retry帮助类,可以轻松指定高级重试策略,比如fixedBackoff,亦或是exponentialBackoff等

        client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retryWhen(Retry.allBut(RuntimeException.class)
                .retryMax(5000)           .exponentialBackoffWithJitter(Duration.ofMillis(100),Duration.ofMillis(500))
                .doOnRetry(e -> {
                    e.exception().printStackTrace();
                })
        )
                .subscribe();

这里使用了exponentialBackoffWithJitter,第一个参数是firstBackoff时间,第二个参数是maxBackoff,也就是maxBackoffInterval,如果为null则相当于Duration.ofSeconds(Long.MAX_VALUE)

Retry

reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/Retry.java

    /**
     * Returns a retry function that retries errors resulting from all exceptions except
     * the specified non-retriable exceptions. More constraints may be added using
     * {@link #retryMax(int)} or {@link #timeout(Duration)}.
     *
     * @param nonRetriableExceptions exceptions that may not be retried
     * @return retry function that retries all exceptions except the specified non-retriable exceptions.
     */
    @SafeVarargs
    static <T> Retry<T> allBut(final Class<? extends Throwable>... nonRetriableExceptions) {
        Predicate<? super RetryContext<T>> predicate = context -> {
            Throwable exception = context.exception();
            if (exception == null)
                return true;
            for (Class<? extends Throwable> clazz : nonRetriableExceptions) {
                if (clazz.isInstance(exception))
                    return false;
            }
            return true;
        };
        return DefaultRetry.<T>create(predicate);
    }

可以看到使用DefaultRetry来创建

reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/DefaultRetry.java

    public static <T> DefaultRetry<T> create(Predicate<? super RetryContext<T>> retryPredicate) {
        return new DefaultRetry<T>(retryPredicate,
                1,
                null,
                Backoff.zero(),
                Jitter.noJitter(),
                null,
                NOOP_ON_RETRY,
                (T) null);
    }

注意这里的maxIterations默认为1,也就是如果不指定retryMax,相当于高级重试策略就白费了,这个要额外注意一下。

小结

Reactor Extra提供的Retry工具类非常好用,值得实验一下。

doc

  • Reactor Extra

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-02-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏拂晓风起

jQuery 和 json 简单例子(注意callback函数的处理!!) (servlet返回json,jquery更新,java json)

1223
来自专栏函数式编程语言及工具

SDP(2):ScalikeJDBC-Connection Pool Configuration

  scalikeJDBC可以通过配置文件来设置连接池及全局系统参数。对配置文件的解析是通过TypesafeConfig工具库实现的。默认加载classpath...

3574
来自专栏菩提树下的杨过

C#以post方式调用struts rest-plugin service的问题

struts2: 玩转 rest-plugin 一文中,学习了用struts2开发restful service的方法,发现用c#以post方式调用时各种报错,...

2969
来自专栏小樱的经验随笔

HDU 2438 Turn the corner(三分查找)

托一个学弟的福,学了一下他的最简便三分写法,然后找了一道三分的题验证了下,AC了一题,写法确实方便,还是我太弱了,漫漫AC路!各路大神,以后你们有啥好的简便写法...

2965
来自专栏码匠的流水账

聊聊jesque的WorkerImpl与WorkerPool

Resque是一个使用redis来创建后台任务的ruby组件。而jesque是其java版本。通常用来做延时队列。

751
来自专栏lgp20151222

SSH上一个随笔的基础上添加上hibernate支持

熟悉的pom.xml其中lo4g和slf4j这两个包第一眼看上去有点莫名奇妙,我也是这么觉得的,实际作用是在后台输出sql语句,不导入hibernate就会报错...

861
来自专栏恰童鞋骚年

Hadoop学习笔记—12.MapReduce中的常见算法

    "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据...

1442
来自专栏码匠的流水账

聊聊rocketmq的PushConsumerImpl

io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

1862
来自专栏函数式编程语言及工具

Akka(8): 分布式运算:Remoting-远程查找式

  Akka是一种消息驱动运算模式,它实现跨JVM程序运算的方式是通过能跨JVM的消息系统来调动分布在不同JVM上ActorSystem中的Actor进行运算,...

4369

.NET中的密钥加密

本教程将演示如何通过System.Security.Cryptography在.NET Framework 1.1中实现对称加密/密钥加密。

6168

扫码关注云+社区

领取腾讯云代金券