首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >SSE 11+客户端示例(w/o依赖项)

SSE 11+客户端示例(w/o依赖项)
EN

Stack Overflow用户
提问于 2020-08-20 08:04:25
回答 1查看 2.7K关注 0票数 2

我正在寻找使用普通JDK11+ http客户端读取服务器发送事件的示例,而不需要额外的依赖项。我在文献资料中也找不到有关sse的任何信息。

有什么暗示吗?

EN

回答 1

Stack Overflow用户

发布于 2020-09-09 16:50:39

编辑1:信息这里这里关于传入数据的格式。

编辑2:更新了代码示例以处理协议的data:部分。还有event:id:retry:部件(请参阅上面的链接),但我不打算为这些部件添加处理。

我找不到一个正式的BodySubscriber来做SSE,但是写一个也没那么难。下面是一个粗略的建议(但请注意TODOs):

代码语言:javascript
运行
复制
public class SseSubscriber implements BodySubscriber<Void>
{
    protected static final Pattern dataLinePattern = Pattern.compile( "^data: ?(.*)$" );

    protected static String extractMessageData( String[] messageLines )
    {
        var s = new StringBuilder( );
        for ( var line : messageLines )
        {
            var m = dataLinePattern.matcher( line );
            if ( m.matches( ) )
            {
                s.append( m.group( 1 ) );
            }
        }
        return s.toString( );
    }

    protected final Consumer<? super String> messageDataConsumer;
    protected final CompletableFuture<Void> future;
    protected volatile Subscription subscription;
    protected volatile String deferredText;

    public SseSubscriber( Consumer<? super String> messageDataConsumer )
    {
        this.messageDataConsumer = messageDataConsumer;
        this.future = new CompletableFuture<>( );
        this.subscription = null;
        this.deferredText = null;
    }

    @Override
    public void onSubscribe( Subscription subscription )
    {
        this.subscription = subscription;
        try
        {
            this.deferredText = "";
            this.subscription.request( 1 );
        }
        catch ( Exception e )
        {
            this.future.completeExceptionally( e );
            this.subscription.cancel( );
        }
    }

    @Override
    public void onNext( List<ByteBuffer> buffers )
    {
        try
        {
            // Volatile read
            var deferredText = this.deferredText;

            for ( var buffer : buffers )
            {
                // TODO: Safe to assume multi-byte chars don't get split across buffers?
                var s = deferredText + UTF_8.decode( buffer );

                // -1 means don't discard trailing empty tokens ... so the final token will
                // be whatever is left after the last \n\n (possibly the empty string, but
                // not necessarily), which is the part we need to defer until the next loop
                // iteration
                var tokens = s.split( "\n\n", -1 );

                // Final token gets deferred, not processed here
                for ( var i = 0; i < tokens.length - 1; i++ )
                {
                    var message = tokens[ i ];
                    var lines = message.split( "\n" );
                    var data = extractMessageData( lines );
                    this.messageDataConsumer.accept( data );
                    // TODO: Handle lines that start with "event:", "id:", "retry:"
                }

                // Defer the final token
                deferredText = tokens[ tokens.length - 1 ];
            }

            // Volatile write
            this.deferredText = deferredText;

            this.subscription.request( 1 );
        }
        catch ( Exception e )
        {
            this.future.completeExceptionally( e );
            this.subscription.cancel( );
        }
    }

    @Override
    public void onError( Throwable e )
    {
        this.future.completeExceptionally( e );
    }

    @Override
    public void onComplete( )
    {
        try
        {
            this.future.complete( null );
        }
        catch ( Exception e )
        {
            this.future.completeExceptionally( e );
        }
    }

    @Override
    public CompletionStage<Void> getBody( )
    {
        return this.future;
    }
}

然后使用它:

代码语言:javascript
运行
复制
var req = HttpRequest.newBuilder( )
                     .GET( )
                     .uri( new URI( "http://service/path/to/events" )
                     .setHeader( "Accept", "text/event-stream" )
                     .build( );

this.client.sendAsync( req, respInfo ->
{
    if ( respInfo.statusCode( ) == 200 )
    {
        return new SseSubscriber( messageData ->
        {
            // TODO: Handle messageData
        } );
    }
    else
    {
        throw new RuntimeException( "Request failed" );
    }
} );
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63500818

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档