问题描述
Servlet-3.0API允许分离请求/响应上下文并稍后对其作出答复。
但是,如果我试图编写大量数据,如下所示:
AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()
它实际上可能会阻塞Tomcat 7和Jetty 8的测试用例--而且确实会阻塞。教程建议创建一个线程池来处理这样的设置--与传统的10K体系结构相比,它通常是相反的。
但是,如果我有10000个打开的连接和一个线程池(比如10个线程),那么即使有1%的低速连接或只是阻塞连接的客户端,也足以阻塞线程池并完全阻止彗星响应或显著减慢它的速度。
预期的做法是获得“编写就绪”通知或I/O完成通知,而不是继续推送数据。
如何使用Servlet3.0 API完成这一任务,即如何获得:
如果Servlet-3.0API不支持这一点,那么是否有任何特定于Web的API (如Jetty或Tomcat CometEvent)允许真正异步地处理此类事件,而无需使用线程池伪造异步I/O。
有人知道吗?
如果这是不可能的,你能用参考文档来确认吗?
示例代码中的问题演示
我在下面附加了模拟事件流的代码。
备注:
ServletOutputStream
的IOException
来检测断开连接的客户端。keep-alive
消息,以确保客户端仍然在那里。在这样的示例中,我显式地定义了大小为1的线程池,以显示问题:
curl http://localhost:8080/path/to/app
运行(两次)curd -d m=message http://localhost:8080/path/to/app
发送数据curd -d m=message http://localhost:8080/path/to/app
我想在不使用线程池的情况下解决这样的问题,因为有了1000-5000个打开的连接,我可以非常快地耗尽线程池。
下面的示例代码。
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;
@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {
private long id = 0;
private String message = "";
private final ThreadPoolExecutor pool =
new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
// it is explicitly small for demonstration purpose
private final Thread timer = new Thread(new Runnable() {
public void run()
{
try {
while(true) {
Thread.sleep(1000);
sendKeepAlive();
}
}
catch(InterruptedException e) {
// exit
}
}
});
class RunJob implements Runnable {
volatile long lastUpdate = System.nanoTime();
long id = 0;
AsyncContext ac;
RunJob(AsyncContext ac)
{
this.ac = ac;
}
public void keepAlive()
{
if(System.nanoTime() - lastUpdate > 1000000000L)
pool.submit(this);
}
String formatMessage(String msg)
{
StringBuilder sb = new StringBuilder();
sb.append("id");
sb.append(id);
for(int i=0;i<100000;i++) {
sb.append("data:");
sb.append(msg);
sb.append("\n");
}
sb.append("\n");
return sb.toString();
}
public void run()
{
String message = null;
synchronized(HugeStreamWithThreads.this) {
if(this.id != HugeStreamWithThreads.this.id) {
this.id = HugeStreamWithThreads.this.id;
message = HugeStreamWithThreads.this.message;
}
}
if(message == null)
message = ":keep-alive\n\n";
else
message = formatMessage(message);
if(!sendMessage(message))
return;
boolean once_again = false;
synchronized(HugeStreamWithThreads.this) {
if(this.id != HugeStreamWithThreads.this.id)
once_again = true;
}
if(once_again)
pool.submit(this);
}
boolean sendMessage(String message)
{
try {
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(message);
out.flush();
lastUpdate = System.nanoTime();
return true;
}
catch(IOException e) {
ac.complete();
removeContext(this);
return false;
}
}
};
private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();
@Override
public void init(ServletConfig config) throws ServletException
{
super.init(config);
timer.start();
}
@Override
public void destroy()
{
for(;;){
try {
timer.interrupt();
timer.join();
break;
}
catch(InterruptedException e) {
continue;
}
}
pool.shutdown();
super.destroy();
}
protected synchronized void removeContext(RunJob ac)
{
asyncContexts.remove(ac);
}
// GET method is used to establish a stream connection
@Override
protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// Content-Type header
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
// Access-Control-Allow-Origin header
response.setHeader("Access-Control-Allow-Origin", "*");
final AsyncContext ac = request.startAsync();
ac.setTimeout(0);
RunJob job = new RunJob(ac);
asyncContexts.add(job);
if(id!=0) {
pool.submit(job);
}
}
private synchronized void sendKeepAlive()
{
for(RunJob job : asyncContexts) {
job.keepAlive();
}
}
// POST method is used to communicate with the server
@Override
protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
{
request.setCharacterEncoding("utf-8");
id++;
message = request.getParameter("m");
for(RunJob job : asyncContexts) {
pool.submit(job);
}
}
}
上面的示例使用线程来防止阻塞..。但是,如果阻塞客户端的数量大于线程池的大小,那么它就会阻塞。
如何在不阻塞的情况下实现它?
发布于 2012-08-28 11:17:05
我发现Servlet 3.0
Asynchronous
API很难正确实现,而且有用的文档是稀疏的。经过多次尝试和尝试不同的方法之后,我找到了一个我一直很满意的健壮的解决方案。当我查看我的代码并将其与您的代码进行比较时,我注意到一个可能帮助您解决特定问题的主要区别。我使用ServletResponse
来编写数据,而不是ServletOutputStream
。
在这里,我的go-to异步Servlet类稍微适应了some_big_data
的情况:
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.apache.log4j.Logger;
@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {
private static final Logger logger = Logger.getLogger(AsyncServlet.class);
public static final int CALLBACK_TIMEOUT = 10000; // ms
/** executor service */
private ExecutorService exec;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
int size = Integer.parseInt(getInitParameter("threadpoolsize"));
exec = Executors.newFixedThreadPool(size);
}
@Override
public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
final AsyncContext ctx = req.startAsync();
final HttpSession session = req.getSession();
// set the timeout
ctx.setTimeout(CALLBACK_TIMEOUT);
// attach listener to respond to lifecycle events of this AsyncContext
ctx.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
logger.info("onComplete called");
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
logger.info("onTimeout called");
}
@Override
public void onError(AsyncEvent event) throws IOException {
logger.info("onError called: " + event.toString());
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
logger.info("onStartAsync called");
}
});
enqueLongRunningTask(ctx, session);
}
/**
* if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
* <p/>
* if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
*/
private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {
exec.execute(new Runnable() {
@Override
public void run() {
String some_big_data = getSomeBigData();
try {
ServletResponse response = ctx.getResponse();
if (response != null) {
response.getWriter().write(some_big_data);
ctx.complete();
} else {
throw new IllegalStateException(); // this is caught below
}
} catch (IllegalStateException ex) {
logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
} catch (Exception e) {
logger.error("ERROR IN AsyncServlet", e);
}
}
});
}
/** destroy the executor */
@Override
public void destroy() {
exec.shutdown();
}
}
发布于 2013-09-07 11:13:33
在我研究这个话题的时候,这个帖子不断出现,所以我想我在这里提到它:
Servlet3.1在ServletInputStream
和ServletOutputStream
上引入了异步操作。见ServletOutputStream.setWriteListener
。
在http://docs.oracle.com/javaee/7/tutorial/servlets013.htm上可以找到一个例子
发布于 2012-09-25 23:18:15
这可能会有帮助
http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/async-servlet/async-servlets.html
https://stackoverflow.com/questions/12085235
复制相似问题