在我的设置中,我希望通过SSE通道(服务器发送的事件)转发某些状态更改。状态更改是通过调用REST端点启动的。因此,我需要将传入的状态更改转发到SSE流。
在Quarkus中完成此操作的最佳/最简单方法是什么。
我能想到的一种解决方案是使用EventBus (https://quarkus.io/guides/reactive-messaging)。SSE端点将订阅状态更改并通过SSE通道将其推送。状态更改端点发布适当的事件。
这是一个可行的解决方案吗?还有其他(更简单的)解决方案吗?在任何情况下,我都需要使用反应性的东西来实现这一点吗?
任何帮助都是非常感谢的!
发布于 2020-04-23 22:46:03
Dmytro,谢谢你为我指明了正确的方向。我选择了与Kotlin有关的Mutiny。我的代码现在看起来像这样:
data class DeviceStatus(var status: Status = Status.OFFLINE) {
enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
}
@ApplicationScoped
class DeviceStatusService {
var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)
fun pushDeviceStatus(deviceStatus: DeviceStatus) {
deviceStatusProcessor.onNext(deviceStatus)
}
fun getStream(): Multi<DeviceStatus> {
return Multi.createFrom().publisher(deviceStatusQueue)
}
}
@Path("/deviceStatus")
class DeviceStatusResource {
private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")
@Inject
@field: Default
lateinit var deviceStatusService: DeviceStatusService
@POST
@Consumes(MediaType.APPLICATION_JSON)
fun status(status: DeviceStatus): Response {
LOGGER.info("POST /deviceStatus " + status.status);
deviceStatusService.pushDeviceStatus(status)
return Response.ok().build();
}
@GET
@Path("/eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
fun stream(): Multi<DeviceStatus>? {
return deviceStatusService.getStream()
}
}
作为最小的设置,该服务可以直接使用deviceStatusProcessor作为发布者。然而,Flowable增加了缓冲。欢迎对实施方案提出意见。
发布于 2020-04-23 16:43:59
最简单的方法是使用rxjava作为流提供者。首先,您需要添加rxjava依赖项。它既可以来自于quarkus中的反应式依赖关系,比如kafka,也可以直接使用它(如果你不需要任何流程库):
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.19</version>
</dependency>
以下是每秒发送随机双精度值的示例:
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/plain")
public Publisher<Double> stream() {
return Flowable.interval(1, TimeUnit.SECONDS).map(tick -> new Random().nextDouble());
}
我们创建新的Flowable,它将每秒触发一次,并且在每一次滴答时,我们生成下一个随机的double。研究有关如何创建Flowable的任何其他选项,例如Flowable.fromFuture()
,以使其适应特定的代码逻辑。
P.S上面的代码将在您每次查询此端点时生成新的Flowable,我创建它是为了节省空间,在您的情况下,我假设您只需构建一次事件源,并且每次查询端点时都使用相同的实例
https://stackoverflow.com/questions/61380899
复制相似问题