首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何通过REST将传入数据转发到Quarkus中的SSE流

如何通过REST将传入数据转发到Quarkus中的SSE流
EN

Stack Overflow用户
提问于 2020-04-23 14:45:32
回答 2查看 558关注 0票数 2

在我的设置中,我希望通过SSE通道(服务器发送的事件)转发某些状态更改。状态更改是通过调用REST端点启动的。因此,我需要将传入的状态更改转发到SSE流。

在Quarkus中完成此操作的最佳/最简单方法是什么。

我能想到的一种解决方案是使用EventBus (https://quarkus.io/guides/reactive-messaging)。SSE端点将订阅状态更改并通过SSE通道将其推送。状态更改端点发布适当的事件。

这是一个可行的解决方案吗?还有其他(更简单的)解决方案吗?在任何情况下,我都需要使用反应性的东西来实现这一点吗?

任何帮助都是非常感谢的!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-04-23 22:46:03

Dmytro,谢谢你为我指明了正确的方向。我选择了与Kotlin有关的Mutiny。我的代码现在看起来像这样:

代码语言:javascript
运行
复制
data class DeviceStatus(var status: Status = Status.OFFLINE) {
    enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
}

@ApplicationScoped
class DeviceStatusService {
    var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
    var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)

    fun pushDeviceStatus(deviceStatus: DeviceStatus) {
        deviceStatusProcessor.onNext(deviceStatus)
    }

    fun getStream(): Multi<DeviceStatus> {
        return Multi.createFrom().publisher(deviceStatusQueue)
    }
}

@Path("/deviceStatus")
class DeviceStatusResource {
    private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")

    @Inject
    @field: Default
    lateinit var deviceStatusService: DeviceStatusService

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    fun status(status: DeviceStatus): Response {
        LOGGER.info("POST /deviceStatus " + status.status);
        deviceStatusService.pushDeviceStatus(status)
        return Response.ok().build();
    }

    @GET
    @Path("/eventStream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    fun stream(): Multi<DeviceStatus>? {
        return deviceStatusService.getStream()
    }
}

作为最小的设置,该服务可以直接使用deviceStatusProcessor作为发布者。然而,Flowable增加了缓冲。欢迎对实施方案提出意见。

票数 1
EN

Stack Overflow用户

发布于 2020-04-23 16:43:59

最简单的方法是使用rxjava作为流提供者。首先,您需要添加rxjava依赖项。它既可以来自于quarkus中的反应式依赖关系,比如kafka,也可以直接使用它(如果你不需要任何流程库):

代码语言:javascript
运行
复制
        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.2.19</version>
        </dependency>

以下是每秒发送随机双精度值的示例:

代码语言:javascript
运行
复制
    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType("text/plain")
    public Publisher<Double> stream() {
        return Flowable.interval(1, TimeUnit.SECONDS).map(tick -> new Random().nextDouble());
    }

我们创建新的Flowable,它将每秒触发一次,并且在每一次滴答时,我们生成下一个随机的double。研究有关如何创建Flowable的任何其他选项,例如Flowable.fromFuture(),以使其适应特定的代码逻辑。

P.S上面的代码将在您每次查询此端点时生成新的Flowable,我创建它是为了节省空间,在您的情况下,我假设您只需构建一次事件源,并且每次查询端点时都使用相同的实例

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61380899

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档