当我需要以流的方式处理响应时,我应该如何处理挂起使用Java 11中包含的HTTP客户机发送HTTP响应体的服务器?
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(2))
.build();
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create("http://example.com"))
.timeout(Duration.ofSeconds(5))
.build();
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, HttpResponse.BodyHandlers.ofLines());
Stream<String> responseLineStream = httpResponse.body();
responseLineStream.count();
在上述守则中:
BodyHandler
,在收到状态行和标头时,会考虑接收响应。这意味着当代码执行时,在7秒内就会抛出一个异常,或者我们已经到达最后一行。但是,最后一行不受任何超时的限制。如果服务器停止发送响应体,则最后一行将永远阻塞。
在这种情况下,我怎样才能避免最后一句呢?
发布于 2021-05-19 04:31:34
我猜这将留给流的使用者,因为这是处理逻辑的一部分,所以仍然可以使用CompletableFuture
来处理主体处理。
HttpResponse<Stream<String>> httpResponse = httpClient.send(httpRequest,
HttpResponse.BodyHandlers.ofLines());
Stream<String> responseLineStream = httpResponse.body();
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> responseLineStream.count());
long count = future.get(3, TimeUnit.SECONDS);
或者仅仅是一个由Java Executor
执行的Future
。
发布于 2021-06-04 20:05:04
解决这一问题的一种方法是在接收整个身体所需的时间上设置一个超时。这就是M A的解决方案。正如您已经注意到的,如果超时计算,您应该关闭流,这样连接就会正确地释放,而不是挂在后台。一种更普遍的方法是实现一个BodySubscriber
,当它在超时时间内不被上游完成时,它会异常地完成它自己。这使得不必只为定时等待或关闭流而生成线程。下面是一个适当的实现。
class TimeoutBodySubscriber<T> implements BodySubscriber<T> {
private final BodySubscriber<T> downstream;
private final Duration timeout;
private Subscription subscription;
/** Make sure downstream isn't called after we receive an onComplete or onError. */
private boolean done;
TimeoutBodySubscriber(BodySubscriber<T> downstream, Duration timeout) {
this.downstream = downstream;
this.timeout = timeout;
}
@Override
public CompletionStage<T> getBody() {
return downstream.getBody();
}
@Override
public synchronized void onSubscribe(Subscription subscription) {
this.subscription = requireNonNull(subscription);
downstream.onSubscribe(subscription);
// Schedule an error completion to be fired when timeout evaluates
CompletableFuture.delayedExecutor(timeout.toMillis(), TimeUnit.MILLISECONDS)
.execute(this::onTimeout);
}
private synchronized void onTimeout() {
if (!done) {
done = true;
downstream.onError(new HttpTimeoutException("body completion timed out"));
// Cancel subscription to release the connection, so it doesn't keep hanging in background
subscription.cancel();
}
}
@Override
public synchronized void onNext(List<ByteBuffer> item) {
if (!done) {
downstream.onNext(item);
}
}
@Override
public synchronized void onError(Throwable throwable) {
if (!done) {
done = true;
downstream.onError(throwable);
}
}
@Override
public synchronized void onComplete() {
if (!done) {
done = true;
downstream.onComplete();
}
}
static <T> BodyHandler<T> withBodyTimeout(BodyHandler<T> handler, Duration timeout) {
return responseInfo -> new TimeoutBodySubscriber<>(handler.apply(responseInfo), timeout);
}
}
它可用于以下方面:
Duration timeout = Duration.ofSeconds(10);
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, TimeoutBodySubscriber.withTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));
另一种方法是使用读取超时。这更灵活,因为只要服务器保持活动状态(即继续发送内容),响应就不会超时。如果在超时期间没有接收到下一个请求的信号,那么您需要一个BodySubscriber
来完成自己的任务。这一点实现起来稍微复杂一些。如果依赖关系良好,可以使用甲醇。如所述,它实现了读取超时。
Duration timeout = Duration.ofSeconds(3);
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, MoreBodyHandlers.withReadTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));
另一种策略是将两者结合使用:当服务器变得不活跃或身体需要太长时间才能完成时,就立即超时。
https://stackoverflow.com/questions/67602169
复制相似问题