我正在寻找使用普通JDK11+ http客户端读取服务器发送事件的示例,而不需要额外的依赖项。我在文献资料中也找不到有关sse的任何信息。
有什么暗示吗?
发布于 2020-09-09 16:50:39
编辑2:更新了代码示例以处理协议的data:部分。还有event:、id:和retry:部件(请参阅上面的链接),但我不打算为这些部件添加处理。
我找不到一个正式的BodySubscriber来做SSE,但是写一个也没那么难。下面是一个粗略的建议(但请注意TODOs):
public class SseSubscriber implements BodySubscriber<Void>
{
protected static final Pattern dataLinePattern = Pattern.compile( "^data: ?(.*)$" );
protected static String extractMessageData( String[] messageLines )
{
var s = new StringBuilder( );
for ( var line : messageLines )
{
var m = dataLinePattern.matcher( line );
if ( m.matches( ) )
{
s.append( m.group( 1 ) );
}
}
return s.toString( );
}
protected final Consumer<? super String> messageDataConsumer;
protected final CompletableFuture<Void> future;
protected volatile Subscription subscription;
protected volatile String deferredText;
public SseSubscriber( Consumer<? super String> messageDataConsumer )
{
this.messageDataConsumer = messageDataConsumer;
this.future = new CompletableFuture<>( );
this.subscription = null;
this.deferredText = null;
}
@Override
public void onSubscribe( Subscription subscription )
{
this.subscription = subscription;
try
{
this.deferredText = "";
this.subscription.request( 1 );
}
catch ( Exception e )
{
this.future.completeExceptionally( e );
this.subscription.cancel( );
}
}
@Override
public void onNext( List<ByteBuffer> buffers )
{
try
{
// Volatile read
var deferredText = this.deferredText;
for ( var buffer : buffers )
{
// TODO: Safe to assume multi-byte chars don't get split across buffers?
var s = deferredText + UTF_8.decode( buffer );
// -1 means don't discard trailing empty tokens ... so the final token will
// be whatever is left after the last \n\n (possibly the empty string, but
// not necessarily), which is the part we need to defer until the next loop
// iteration
var tokens = s.split( "\n\n", -1 );
// Final token gets deferred, not processed here
for ( var i = 0; i < tokens.length - 1; i++ )
{
var message = tokens[ i ];
var lines = message.split( "\n" );
var data = extractMessageData( lines );
this.messageDataConsumer.accept( data );
// TODO: Handle lines that start with "event:", "id:", "retry:"
}
// Defer the final token
deferredText = tokens[ tokens.length - 1 ];
}
// Volatile write
this.deferredText = deferredText;
this.subscription.request( 1 );
}
catch ( Exception e )
{
this.future.completeExceptionally( e );
this.subscription.cancel( );
}
}
@Override
public void onError( Throwable e )
{
this.future.completeExceptionally( e );
}
@Override
public void onComplete( )
{
try
{
this.future.complete( null );
}
catch ( Exception e )
{
this.future.completeExceptionally( e );
}
}
@Override
public CompletionStage<Void> getBody( )
{
return this.future;
}
}然后使用它:
var req = HttpRequest.newBuilder( )
.GET( )
.uri( new URI( "http://service/path/to/events" )
.setHeader( "Accept", "text/event-stream" )
.build( );
this.client.sendAsync( req, respInfo ->
{
if ( respInfo.statusCode( ) == 200 )
{
return new SseSubscriber( messageData ->
{
// TODO: Handle messageData
} );
}
else
{
throw new RuntimeException( "Request failed" );
}
} );https://stackoverflow.com/questions/63500818
复制相似问题