聊聊reactive streams的processors

本文主要研究一下reactive streams的processors

processors分类

processors既是Publisher也是Subscriber。在project reactor中processor有诸多实现,他们的分类大致如下:

  • direct(DirectProcessor以及UnicastProcessor)
  • synchronous(EmitterProcessor及ReplayProcessor)
  • asynchronous(TopicProcessor及WorkQueueProcessor)

direct

DirectProcessor

它不支持backpressure特性,如果publisher发布了N个数据,如果其中一个subscriber请求数<N,则抛出IllegalStateException.

    @Test
    public void testDirectProcessor(){
        DirectProcessor<Integer> directProcessor = DirectProcessor.create();
        Flux<Integer> flux = directProcessor
                .filter(e -> e % 2 == 0)
                .map(e -> e +1);
        flux.subscribe(new Subscriber<Integer>() {
            private Subscription s;
            @Override
            public void onSubscribe(Subscription s) {
                this.s = s;
//                s.request(2);
            }

            @Override
            public void onNext(Integer integer) {
                LOGGER.info("subscribe:{}",integer);
            }

            @Override
            public void onError(Throwable t) {
                LOGGER.error(t.getMessage(),t);
            }

            @Override
            public void onComplete() {

            }
        });

        IntStream.range(1,20)
                .forEach(e -> {
                    directProcessor.onNext(e);
                });

        directProcessor.onComplete();
        directProcessor.blockLast();
    }

输出如下

16:00:11.201 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
16:00:11.216 [main] ERROR com.example.demo.ProcessorTest - Can't deliver value due to lack of requests
reactor.core.Exceptions$OverflowException: Can't deliver value due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:304)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:106)
    at com.example.demo.ProcessorTest.lambda$testDirectProcessor$5(ProcessorTest.java:82)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)
    at com.example.demo.ProcessorTest.testDirectProcessor(ProcessorTest.java:81)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

UnicastProcessor

  • 支持backpressure特性,但是代价是至多只能有一个subscriber。默认是无界的,如果发布数据之后,subscriber还没来得及request,则它会把数据缓存下来。
  • 如果设置了一个有界的queue,当buffer满而且subscriber没有发送足够多的request的时候,processor会拒绝推送数据。在这种场景下,processor内置了一个callback,每当一个element被rejected的时候会触发. @Test public void testUnicastProcessor() throws InterruptedException { UnicastProcessor<Integer> unicastProcessor = UnicastProcessor.create(Queues.<Integer>get(8).get()); Flux<Integer> flux = unicastProcessor .map(e -> e) .doOnError(e -> { LOGGER.error(e.getMessage(),e); }); IntStream.rangeClosed(1,12) .forEach(e -> { LOGGER.info("emit:{}",e); unicastProcessor.onNext(e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } }); LOGGER.info("begin to sleep 7 seconds"); TimeUnit.SECONDS.sleep(7); //UnicastProcessor allows only a single Subscriber flux.subscribe(e -> { LOGGER.info("flux subscriber:{}",e); }); unicastProcessor.onComplete(); TimeUnit.SECONDS.sleep(10); // unicastProcessor.blockLast(); //blockLast也是一个subscriber } 输出实例 16:31:04.970 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 16:31:04.977 [main] INFO com.example.demo.ProcessorTest - emit:1 16:31:05.990 [main] INFO com.example.demo.ProcessorTest - emit:2 16:31:06.991 [main] INFO com.example.demo.ProcessorTest - emit:3 16:31:07.994 [main] INFO com.example.demo.ProcessorTest - emit:4 16:31:08.998 [main] INFO com.example.demo.ProcessorTest - emit:5 16:31:10.002 [main] INFO com.example.demo.ProcessorTest - emit:6 16:31:11.007 [main] INFO com.example.demo.ProcessorTest - emit:7 16:31:12.010 [main] INFO com.example.demo.ProcessorTest - emit:8 16:31:13.014 [main] INFO com.example.demo.ProcessorTest - emit:9 16:31:14.029 [main] INFO com.example.demo.ProcessorTest - emit:10 16:31:14.030 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 10 16:31:15.034 [main] INFO com.example.demo.ProcessorTest - emit:11 16:31:15.034 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 11 16:31:16.038 [main] INFO com.example.demo.ProcessorTest - emit:12 16:31:16.038 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 12 16:31:17.043 [main] INFO com.example.demo.ProcessorTest - begin to sleep 7 seconds 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:1 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:2 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:3 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:4 16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:5 16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:6 16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:7 16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:8 16:31:24.058 [main] ERROR com.example.demo.ProcessorTest - The receiver is overrun by more signals than expected (bounded queue...) reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...) at reactor.core.Exceptions.failWithOverflow(Exceptions.java:202) at reactor.core.publisher.UnicastProcessor.onNext(UnicastProcessor.java:330) at com.example.demo.ProcessorTest.lambda$testUnicastProcessor$8(ProcessorTest.java:108) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)

synchronous

EmitterProcessor

  • 能够支持多个subscriber,同时还对每个subscriber支持backpressure。它也可以订阅publisher,然后把数据同步重放。
  • 它有一个bufferSize参数,用来在发布数据之后还没有订阅者期间的数据,onNext会一直阻塞直到数据被消费;当第一个订阅者订阅之后,它会接收到buffer里头的数据,而后续的订阅者就只能消费到自他们订阅那个时候起发布的数据。
  • 当所有的subscriber都取消订阅之后,该processor会清空buffer,并停止接收新的订阅。 @Test public void testEmitterProcessor() throws InterruptedException { int bufferSize = 3; //小于8的会被重置为8 FluxProcessor<Integer, Integer> processor = EmitterProcessor.create(bufferSize); Flux<Integer> flux1 = processor.map(e -> e); Flux<Integer> flux2 = processor.map(e -> e*10); IntStream.rangeClosed(1,8).forEach(e -> { LOGGER.info("emit:{}",e); processor.onNext(e); //如果发布的未消费数据超过bufferSize,则会阻塞在这里 }); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); }); IntStream.rangeClosed(9,10).forEach(e -> { LOGGER.info("emit:{}",e); processor.onNext(e); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } }); //这个是后面添加的订阅,只能消费之后发布的数据 flux2.subscribe(e -> { LOGGER.info("flux2 subscriber:{}",e); }); processor.onNext(11); processor.onNext(12); processor.onComplete(); processor.blockLast(); } 输出实例 17:27:01.008 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 17:27:01.044 [main] INFO com.example.demo.ProcessorTest - emit:1 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:2 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:3 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:4 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:5 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:6 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:7 17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:8 17:27:01.086 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:1 17:27:01.086 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:2 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:3 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:4 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:5 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:6 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:7 17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:8 17:27:01.088 [main] INFO com.example.demo.ProcessorTest - emit:9 17:27:01.088 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:9 17:27:02.091 [main] INFO com.example.demo.ProcessorTest - emit:10 17:27:02.092 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:10 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:11 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:110 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:12 17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:120

ReplayProcessor

可以缓存通过sink产生的数据或者订阅publisher的数据,然后重放给后来的订阅者。有如下四种配置

  • cacheLast 只缓存最后一个数据
  • create(int) 缓存最后N个数据
  • createTimeout(Duration) 对每个数据打上时间戳标签,只缓存age在指定ttl内的数据
  • createSizeOrTimeout(int,Duration) 对每个数据打上时间戳标签,只缓存age在指定ttl内的N个数据

实例

    @Test
    public void testReplayProcessor() throws InterruptedException {
        ReplayProcessor<Integer> replayProcessor = ReplayProcessor.create(3);
        Flux<Integer> flux1 = replayProcessor
                .map(e -> e);
        Flux<Integer> flux2 = replayProcessor
                .map(e -> e);

        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
        });

        IntStream.rangeClosed(1,5)
                .forEach(e -> {
                    replayProcessor.onNext(e);
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });

        LOGGER.info("finish publish data");
        TimeUnit.SECONDS.sleep(3);

        LOGGER.info("begin to subscribe flux2");
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });

        replayProcessor.onComplete();
        replayProcessor.blockLast();
    }

输出如下

15:13:39.415 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:13:39.438 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:1
15:13:40.445 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:2
15:13:41.449 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:3
15:13:42.454 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:4
15:13:43.459 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:5
15:13:44.463 [main] INFO com.example.demo.ProcessorTest - finish publish data
15:13:47.466 [main] INFO com.example.demo.ProcessorTest - begin to subscribe flux2
15:13:47.467 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:3
15:13:47.467 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:4
15:13:47.468 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:5

asynchronous

TopicProcessor

  • TopicProcessor是一个异步的processor,当shared设置为true的时候,支持对多个publisher的并发重放。如果订阅的publisher是一个并发的stream或者是需要并发调用Topicrocessor的onNext,onCompleete,onError方法,则必须强制开启share。关闭share则是遵循reactive streams规范的processor,不允许并发调用。
  • TopicProcessor也支持把消息广播(fan-out)到多个subscriber,它给每个subscriber绑定一个线程。能够支持的subscriber的最大个数由线程池executor限制。
  • TopicProcessor使用了RingBuffer数据结构来推送数据,每个subscriber线程都在RingBuffer记录其消费的位置
  • TopicProcessor也支持autoCancel选项,默认为true,也就是当所有subscriber都取消订阅的时候,publisher也会被自动cannel @Test public void testTopicProcessor() throws InterruptedException { TopicProcessor<Integer> topicProcessor = TopicProcessor.<Integer>builder() .share(true) // .executor(Executors.newSingleThreadExecutor()) .build(); Flux<Integer> flux1 = topicProcessor .map(e -> e); Flux<Integer> flux2 = topicProcessor .map(e -> e); Flux<Integer> flux3 = topicProcessor .map(e -> e); AtomicInteger count = new AtomicInteger(0); flux1.subscribe(e -> { LOGGER.info("flux1 subscriber:{}",e); count.incrementAndGet(); }); flux2.subscribe(e -> { LOGGER.info("flux2 subscriber:{}",e); }); flux3.subscribe(e -> { LOGGER.info("flux3 subscriber:{}",e); }); IntStream.rangeClosed(1,100) .parallel() .forEach(e -> { // LOGGER.info("emit:{}",e); topicProcessor.onNext(e); }); topicProcessor.onComplete(); topicProcessor.blockLast(); TimeUnit.SECONDS.sleep(10); System.out.println(count.get()); } 注意两个地方:
  • share share背后设置的是EventLoopProcessor的multiproducers属性 reactor-core-3.1.2.RELEASE-sources.jar!/reactor/core/publisher/EventLoopProcessor.java
EventLoopProcessor(
            int bufferSize,
            @Nullable ThreadFactory threadFactory,
            @Nullable ExecutorService executor,
            ExecutorService requestExecutor,
            boolean autoCancel,
            boolean multiproducers,
            Supplier<Slot<IN>> factory,
            WaitStrategy strategy) {

        if (!Queues.isPowerOfTwo(bufferSize)) {
            throw new IllegalArgumentException("bufferSize must be a power of 2 : " + bufferSize);
        }

        if (bufferSize < 1){
            throw new IllegalArgumentException("bufferSize must be strictly positive, " +
                    "was: "+bufferSize);
        }

        this.autoCancel = autoCancel;

        contextClassLoader = new EventLoopContext(multiproducers);

        this.name = defaultName(threadFactory, getClass());

        this.requestTaskExecutor = Objects.requireNonNull(requestExecutor, "requestTaskExecutor");

        if (executor == null) {
            this.executor = Executors.newCachedThreadPool(threadFactory);
        }
        else {
            this.executor = executor;
        }

        if (multiproducers) {
            this.ringBuffer = RingBuffer.createMultiProducer(factory,
                    bufferSize,
                    strategy,
                    this);
        }
        else {
            this.ringBuffer = RingBuffer.createSingleProducer(factory,
                    bufferSize,
                    strategy,
                    this);
        }
    }

如果share为true,则创建的是createMultiProducer. 具体的表象就是如果有多线程调用processor的onNext方法,而没有开启share的话,会有并发问题,即数据会丢失.比如上面的代码,如果注释掉share(true),则最后count的大小就不一定是100,而开启share为true就能保证最后count的大小是100 如果设置executor(Executors.newSingleThreadExecutor()),则flux1,flux2,flux3的订阅者则是顺序执行,而不是并发的.

WorkQueueProcessor

  • WorkQueueProcessor也是一个异步的processor,当shared设置为true的时候,支持对多个publisher的并发重放。
  • WorkQueueProcessor使用了RingBuffer数据结构来推送数据。
  • WorkQueueProcessor不是每来一个subscriber就给其创建一个线程,因此比TopicProcessor的伸缩性更好一点。能够支持的subscriber的最大个数由线程池executor限制。但是值得注意的是最好不要给WorkQueueProcessor添加过多的subscriber,这样会增加processor的锁竞争。最好使用ThreadPoolExecutor或者ForkJoinPool,processor可以检测他们的容量然后再订阅者过多的时候抛出异常。
  • WorkQueueProcessor不遵循reactive streams的规范,因此比TopicProcessor所消耗的资源更少。作为trade-off,所有subscriber的request会累加在一起,然后WorkQueueProcessor每次只给一个subscriber重放数据,相比于TopicProcessorde fan-out广播模式,它类似于round-robin模式,但是公平的round-robin模式是不被保证的。
    @Test
    public void testWorkQueueProcessor(){
        WorkQueueProcessor<Integer> workQueueProcessor = WorkQueueProcessor.create();
        Flux<Integer> flux1 = workQueueProcessor
                .map(e -> e);
        Flux<Integer> flux2 = workQueueProcessor
                .map(e -> e);
        Flux<Integer> flux3 = workQueueProcessor
                .map(e -> e);

        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
        });
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });
        flux3.subscribe(e -> {
            LOGGER.info("flux3 subscriber:{}",e);
        });

        IntStream.range(1,20)
                .forEach(e -> {
                    workQueueProcessor.onNext(e);
                });

        workQueueProcessor.onComplete();
        workQueueProcessor.blockLast();
    }

输出实例

21:56:58.203 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:56:58.214 [main] DEBUG reactor.core.publisher.UnsafeSupport - Starting UnsafeSupport init in Java 1.8
21:56:58.215 [main] DEBUG reactor.core.publisher.UnsafeSupport - Unsafe is available
21:56:58.228 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:1
21:56:58.228 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:3
21:56:58.228 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:2
21:56:58.229 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:4
21:56:58.229 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:5
21:56:58.229 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:6
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:7
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:8
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:9
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:10
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:11
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:12
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:13
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:14
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:15
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:17
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:16
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:19
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:18

可以看到WorkQueueProcessor的subscriber就类似kafka的同属于一个group的consumer,各自消费的消息总和就是publisher发布的总消息,不像TopicProcessor那种广播式的消息传递.

doc

  • processor-overview
  • disruptor-3.3.2源码解析(3)-发布事件

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-01-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏一个会写诗的程序员的博客

Kotlin集成 SpringBoot 混合Java库开发

apply plugin: 'org.springframework.boot' apply plugin: 'kotlin'

8910
来自专栏码匠的流水账

聊聊resilience4j的CircuitBreakerConfig

本文主要研究一下resilience4j的CircuitBreakerConfig

34120
来自专栏闻道于事

Spring boot之SpringApplicationBuilder,@@Configuration注解,@Component注解

45400
来自专栏一个会写诗的程序员的博客

axios : Promise based HTTP client for the browser and node.jsaxios

Promise based HTTP client for the browser and node.js

17920
来自专栏Java技术栈

Spring Boot 集成 Mybatis 实现双数据源

这里用到了Spring Boot + Mybatis + DynamicDataSource配置动态双数据源,可以动态切换数据源实现数据库的读写分离。

22120
来自专栏一枝花算不算浪漫

SpringBoot自定义序列化的使用方式--WebMvcConfigurationSupport

19810
来自专栏杨建荣的学习笔记

ORA-17500 ODM err的问题排查(r2笔记78天)

今天在一套环境中做系统检查的时候,发现alert日志中有一段ODM的错误。 日志内容大体如下,可以看到是在半夜4点多报的错误。 Clearing Resourc...

30830
来自专栏张善友的专栏

Mono 3.2 上跑NUnit测试

NUnit是一款堪与JUnit齐名的开源的回归测试框架,供.net开发人员做单元测试之用,可以从www.nunit.org网站上免费获得,最新版本是2.5。Mo...

21570
来自专栏公众号_薛勤的博客

2小时学会Spring Boot(IDE:eclipse)

1.)使启动类继承SpringBootServletInitializer 覆写configure()方法。

46940
来自专栏技术专栏

logback日志写入kafka遇到的那些坑

这两天在学习storm实时流的时候需要将logback日志写入kafka,这期间遇到了很多坑,这里把遇到的坑和解决的问题记录一下,和大家共勉

1.9K30

扫码关注云+社区

领取腾讯云代金券