RxJava2学习笔记(2)

上一篇已经熟悉了Observable的基本用法,但是如果仅仅只是“生产-消费”的模型,这就体现不出优势了,java有100种办法可以玩这个:)

一、更简单的多线程

正常情况下,生产者与消费者都在同一个线程里处理,参考下面的代码:

final long start = System.currentTimeMillis();

Observable<String> fileSender = Observable.create(emitter -> {
    for (int i = 1; i < 6; i++) {
        Thread.sleep(1000);
        String temp = "thread:" + Thread.currentThread().getId() + " , file " + i + " 的内容";
        System.out.println(temp);
        emitter.onNext(temp);
    }
    emitter.onComplete();
});

Observer<String> fileHander = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        System.out.println("准备处理文件...");
    }

    @Override
    public void onNext(@NonNull String s) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("thread:" + Thread.currentThread().getId() + " , [" + s + "] 已处理!");
    }

    @Override
    public void onError(@NonNull Throwable e) {
        System.out.println("师傅,有妖怪!");
    }

    @Override
    public void onComplete() {
        System.out.println("总算完事儿,累屎大爷了!");
        long end = System.currentTimeMillis();
        System.out.println("耗时:" + (end - start));
    }
};

fileSender.subscribe(fileHander);

Thread.sleep(60000);

假设生产者在读取一堆文件,然后发给消费者处理,通常情况下,这类涉及IO的操作都是很耗时的,我们用sleep(1000)来模拟。

从输出结果上看,生产者与消费者的thread id相同,耗时约为10s。

fileSender.subscribe(fileHander);

如果上面这行,换成

fileSender.subscribeOn(Schedulers.io()) //生产者处理时,放在io线程中
        .observeOn(Schedulers.newThread()) //消费者处理时,用新线程
        .subscribe(fileHander); 

注:subscribeOn() 是生产者发送子弹的线程, observeOn() 则是消费者(靶子)收子弹的线程,如果有多个消费者,每次调用observeOn() 消费者线程便会切换一次

这样生产者、消费者就变成不同的线程了,跑一下看看:

可以看到二个线程id不一样,说明分别在不同的线程里,而且总耗时明显缩短了。

二、更平滑的链式调用

假设我们有一个经典的在线电商场景:用户提交订单后,马上跳到支付页面付款。传统写法,通常是中规中矩的封装2个方法,依次调用。用rxjava后,可以写得更流畅,先做点准备工作:

先定义二个服务接口:订单服务(OrderService)以及支付服务(PayService)

OrderService.java

public interface OrderService {
    Observable<CreateOrderResponse> createOrder(CreateOrderRequest request) throws Exception;
}

PayService.java

public interface PayService {
    Observable<PayResponse> payOrder(PayRequest request) throws Exception;
}

然后来二个实现:

OrderServiceImpl

public class OrderServiceImpl implements OrderService {

    @Override
    public Observable<CreateOrderResponse> createOrder(CreateOrderRequest request) throws InterruptedException {
        System.out.println("threadId:" + Thread.currentThread().getId() + ", 订单创建中:" + request.toString());
        CreateOrderResponse response = new CreateOrderResponse();
        response.setOrderNo(UUID.randomUUID().toString().replace("-", ""));
        response.setOrderStatus("NEW");
        response.setOrderAmount(request.getOrderAmount());
        response.setOrderDesc(request.getOrderDesc());
        return Observable.create(emitter -> emitter.onNext(response));
    }
}

PayServiceImpl

public class PayServiceImpl implements PayService {

    @Override
    public Observable<PayResponse> payOrder(PayRequest request) throws InterruptedException {
        System.out.println("threadId:" + Thread.currentThread().getId() + ", 正在请求支付:" + request);
        PayResponse response = new PayResponse();
        response.setSuccess(true);
        response.setOrderNo(request.getOrderNo());
        response.setTradeNo(UUID.randomUUID().toString().replace("-", ""));
        return Observable.create(emitter -> emitter.onNext(response));
    }
}

然后测试一把:

    @Test
    public void test1() throws Exception {
        OrderService orderService = new OrderServiceImpl();
        PayService payService = new PayServiceImpl();
        orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00))) //创建订单
                //将"创建订单的Response" 转换成 "支付订单的Response"
                .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                //支付完成的处理
                .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"));
        Thread.sleep(1000);//等待执行完毕
    }

链式的写法,更符合阅读习惯,注:flatMap这个操作,通俗点讲,就是将一种口径的子弹,转换成另一种口径的子弹,然后再继续发射。

输出:

threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
threadId:1, 正在请求支付:PayRequest(orderNo=81419b0580d547acbb53955978ace6b8, paymentAmount=8888)
threadId:1, 支付完成

可以看到,默认情况下,创建订单/支付订单在同一个线程中,结合前面学到的知识,也可以将它们划分到不同的线程里:(虽然就这个场景而言,这样做的意义不大,因为支付前,肯定要等订单先提交,这个没办法并发处理,这里只是意思一下,可以这样做而已)

    @Test
    public void test2() throws Exception {
        OrderService orderService = new OrderServiceImpl();
        PayService payService = new PayServiceImpl();
        orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
                .subscribeOn(Schedulers.newThread())  //(生产者)创建订单时,使用新线程
                .observeOn(Schedulers.newThread()) //(消费者1)接收订单时,使用新线程
                .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                .observeOn(Schedulers.newThread()) //(消费者2)接收支付结果时,使用新线程
                .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"));
        Thread.sleep(1000);//等待执行完毕
    }

输出:

threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
threadId:13, 正在请求支付:PayRequest(orderNo=d5ff7890f22f486bb1bf8aa8e4f0a3bf, paymentAmount=8888)
threadId:14, 支付完成

从threadId看,已经是不同的线程了。

上面的代码,都没考虑到出错的情况,如果支付时出异常了,rxjava如何处理呢?

先改下支付的实现,人为抛个异常:

public class PayServiceImpl implements PayService {

    @Override
    public Observable<PayResponse> payOrder(PayRequest request) throws Exception {
        throw new Exception("支付失败!");
    }
}

rxjava里有一个重载版本,见: io.reactivex.Observable

    @CheckReturnValue
    @SchedulerSupport("none")
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
        return this.subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

使用这个版本即可:

    @Test
    public void test3() throws Exception {
        OrderService orderService = new OrderServiceImpl();
        PayService payService = new PayServiceImpl();
        orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
                .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"),
                        //异常处理
                        err -> System.out.println("支付出错啦:" + err.getMessage()));
        Thread.sleep(1000);//等待执行完毕
    }

输出:

threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
支付出错啦:支付失败!  

如果想在订单创建完后,先做些处理,再进行支付,可以这么写:

    @Test
    public void test4() throws Exception {
        OrderService orderService = new OrderServiceImpl();
        PayService payService = new PayServiceImpl();
        orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
                //订单创建完成后的处理
                .doOnNext(response -> System.out.println("订单创建完成:" + response))
                .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"),
                        err -> System.out.println("支付出错啦:" + err.getMessage()));
        Thread.sleep(1000);//等待执行完毕
    }

输出:

threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
订单创建完成:CreateOrderResponse(orderNo=8c194b1d07c044a8af3771159e1bb2bf, orderDesc=iphone X, orderAmount=8888, orderStatus=NEW)
支付出错啦:支付失败!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码神联盟

Shiro系列 | 《Shiro开发详细教程》第五章:Shiro编码加密

在涉及到密码存储问题上,应该加密或者生成密码摘要存储,而不是存储明文密码。为避免数据泄露对用户造成很大的损失,应该加密或者生成不可逆的摘要方式存储。

12320
来自专栏刘君君

很方便的密码加密算法BCrypt

1.9K50
来自专栏Java与Android技术栈

RxJava处理业务异常的几种方式关于异常处理业务异常总结

运行时异常: RuntimeException类及其子类都被称为运行时异常,这种异常的特点是Java编译器不去检查它,也就是说,当程序中可能出现这类异常时,即...

39830
来自专栏ccylovehs

原生js格式化json工具

50310
来自专栏蘑菇先生的技术笔记

qt5中信号和槽的新语法

30750
来自专栏Danny的专栏

【J2SE快速进阶】——多线程之synchronized

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

9820
来自专栏用户画像

Ping++ 支付接口对接

14310
来自专栏IT综合技术分享

使用Dropwizard搭建一个Hello World

官方例子中main方法写在Application类中,我这里将其提出来了,方便大家理解。

25830
来自专栏C#

DotNet加密方式解析--散列加密

   没时间扯淡了,赶紧上车吧。    在现代社会中,信息安全对于每一个人都是至关重要的,例如我们的银行账户安全、支付宝和微信账户安全、以及邮箱等等,说到信息安...

24380
来自专栏三好码农的三亩自留地

Java动态代理-实战

说动态代理之前,要先搞明白什么是代理,代理的字面意思已经很容易理解了,我们这里撇开其他的解释,我们只谈设计模式中的代理模式

31120

扫码关注云+社区

领取腾讯云代金券