首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >项目反应堆,在创建lambda之外使用一个通量接收器。

项目反应堆,在创建lambda之外使用一个通量接收器。
EN

Stack Overflow用户
提问于 2019-05-09 16:03:34
回答 2查看 4.9K关注 0票数 6
  • 当我的服务启动时,我想构建一个简单的管道。
  • 我想隔离Flux接收器或处理器,以发出事件。
  • 事件将来自多个线程,应该根据管道的subscribeOn()规范进行处理,但是所有的事情似乎都运行在main线程上。
  • 什么是最好的方法?我把我的尝试附在下面。
  • (我使用的是反应堆芯v3.2.8.reactor.)
代码语言:javascript
运行
复制
import org.junit.jupiter.api.Test;

import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

/**
 * I want to construct my React pipelines during creation,
 * then emit events over the lifetime of my services.
 */
public class React1Test
{
    /**
     * Attempt 1 - use a DirectProcessor and send items to it.
     * Doesn't work though - seems to always run on the main thread.
     */
    @Test
    public void testReact1() throws InterruptedException
    {
        // Create the flux and sink.
        FluxProcessor<String, String> fluxProcessor = DirectProcessor.<String>create().serialize();
        FluxSink<String> sink = fluxProcessor.sink();

        // Create the pipeline.
        fluxProcessor
            .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
            .subscribeOn(Schedulers.elastic())
            .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

        // Give the multi-thread pipeline a second.
        Thread.sleep(1000);

        // Time passes ... things happen ...
        // Pass a few messages to the sink, emulating events.
        sink.next("a");
        sink.next("b");
        sink.next("c");

        // It's multi-thread so wait a sec to receive.
        Thread.sleep(1000);
    }

    // Used down below during Flux.create().
    private FluxSink<String> sink2;

    /**
     * Attempt 2 - use Flux.create() and its FluxSink object.
     * Also seems to always run on the main thread.
     */
    @Test
    public void testReact2() throws InterruptedException
    {
        // Create the flux and sink.
        Flux.<String>create(sink -> sink2 = sink)
            .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
            .subscribeOn(Schedulers.elastic())
            .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

        // Give the multi-thread pipeline a second.
        Thread.sleep(1000);

        // Pass a few messages to the sink.
        sink2.next("a");
        sink2.next("b");
        sink2.next("c");

        // It's multi-thread so wait a sec to receive.
        Thread.sleep(1000);
    }

    // Show us what thread we're on.
    private static void showDebugMsg(String msg)
    {
        System.out.println(String.format("%s [%s]", msg, Thread.currentThread().getName()));
    }
}

产出总是:

代码语言:javascript
运行
复制
a [main]
a [main]
b [main]
b [main]
c [main]
c [main]

但我所期望的是:

代码语言:javascript
运行
复制
a [elastic-1]
a [elastic-1]
b [elastic-2]
b [elastic-2]
c [elastic-3]
c [elastic-3]

提前谢谢。

EN

回答 2

Stack Overflow用户

发布于 2019-05-09 16:35:54

您看到[main]是因为您正在从主线程调用onNext。您使用的subscribeOn仅用于订阅(当create的lambda被触发时)。如果使用publishOn而不是subscribeOn,您将看到elastic-*线程被记录在案。

此外,考虑使用处理器,将从Flux.create和类似操作符获得的sink存储为字段是不可取的。

票数 2
EN

Stack Overflow用户

发布于 2019-05-09 18:51:50

  • 您可以使用parallel()runOn()而不是subscribeOn()来让sink.next()运行多线程。
  • bsideup也是正确的--您可以使用publishOn()强制下游操作符在一个不同的Scheduler线程上运行。

以下是我更新的代码:

代码语言:javascript
运行
复制
import org.junit.jupiter.api.Test;

import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

/**
 * I want to construct my React pipelines during creation,
 * then emit events over the lifetime of my services.
 */
public class React1Test
{
    /**
     * Version 1 - use a DirectProcessor to dynamically emit items.
     */
    @Test
    public void testReact1() throws InterruptedException
    {
        // Create the flux and sink.
        FluxProcessor<String, String> fluxProcessor = DirectProcessor.<String>create().serialize();
        FluxSink<String> sink = fluxProcessor.sink();

        // Create the pipeline.
        fluxProcessor
            .parallel()
            .runOn(Schedulers.elastic())
            .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
            .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

        // Give the multi-thread pipeline a second.
        Thread.sleep(1000);

        // Time passes ... things happen ...
        // Pass a few messages to the sink, emulating events.
        sink.next("a");
        sink.next("b");
        sink.next("c");

        // It's multi-thread so wait a sec to receive.
        Thread.sleep(1000);
    }

    // Used down below during Flux.create().
    private FluxSink<String> sink2;

    /**
     * Version 2 - use Flux.create() and its FluxSink object.
     */
    @Test
    public void testReact2() throws InterruptedException
    {
        // Create the flux and sink.
        Flux.<String>create(sink -> sink2 = sink)
            .parallel()
            .runOn(Schedulers.elastic())
            .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
            .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

        // Give the multi-thread pipeline a second.
        Thread.sleep(1000);

        // Pass a few messages to the sink.
        sink2.next("a");
        sink2.next("b");
        sink2.next("c");

        // It's multi-thread so wait a sec to receive.
        Thread.sleep(1000);
    }

    // Show us what thread we're on.
    private static void showDebugMsg(String msg)
    {
        System.out.println(String.format("%s [%s]", msg, Thread.currentThread().getName()));
    }
}

这两个版本都产生所需的多线程输出:

代码语言:javascript
运行
复制
a [elastic-2]
b [elastic-3]
c [elastic-4]
b [elastic-3]
a [elastic-2]
c [elastic-4]
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56063468

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档