tomcat源码解读四 tomcat中的processer

     Processor是一个接口,针对于不同协议下具有不同的具体实现类,其实现类的具体功能是处理http请求,主要是对协议进行解析,状态处理以及响应。然后起一个中间作用转发到 Adater,下面是其类的关系图

     其实现类中我们常用的http协议,所以一般是左边的部分,用红线标注

1.1 循环队列

protected static class RecycledProcessors<P extends Processor<S>, S> extends SynchronizedStack<Processor<S>> {

        private final transient AbstractConnectionHandler<S,P> handler;
        protected final AtomicInteger size = new AtomicInteger(0);

        public RecycledProcessors(AbstractConnectionHandler<S,P> handler) {
            this.handler = handler;
        }

        @SuppressWarnings("sync-override") // Size may exceed cache size a bit
        @Override
        public boolean push(Processor<S> processor) {
            //获取Processor能够缓存的大小
            int cacheSize = handler.getProtocol().getProcessorCache();
            boolean offer = cacheSize == -1 ? true : size.get() < cacheSize;
            //向栈中压入当前processor
            boolean result = false;
            if (offer) {
                result = super.push(processor);
                if (result) {
                    size.incrementAndGet();
                }
            }
            //取消当前processor实例的JMX
            if (!result) handler.unregister(processor);
            return result;
        }

        @SuppressWarnings("sync-override") // OK if size is too big briefly
        @Override
        public Processor<S> pop() {
            Processor<S> result = super.pop();
            if (result != null) {
                size.decrementAndGet();
            }
            return result;
        }

        @Override
        public synchronized void clear() {
            Processor<S> next = pop();
            while (next != null) {
                handler.unregister(next);
                next = pop();
            }
            super.clear();
            size.set(0);
        }
    }

     在讲述Processor的获取以及处理过程之前先看一个类,姑且命名为循环队列, 它主要是继承了SynchronizedStack这个栈(tomcat自己实现)里面实现了进栈出栈两种方法。

1.1 Processor的创建

     根据栈中执行的流程可以看出调用的是协议句柄的抽象类中的process方法,所以针对于四种模式其实现过程大致相同,具体代码如下:

 public SocketState process(SocketWrapper<S> wrapper,
                SocketStatus status) {
            //如果socketWrapper为空则证明不存在socket则直接将状态设置为CLOSED
            if (wrapper == null) {
                return SocketState.CLOSED;
            }
            //获取当前SocketWrapper实例对应的NIO通道
            S socket = wrapper.getSocket();
            if (socket == null) {
                //什么也不做 socket已经关闭
                return SocketState.CLOSED;
            }
            /**
             * 从connections中根据socket获取Processor,如果没有则在下面创建 connections句柄类型Map<S,Processor<S>>
             * 在以下情况下connections中存在值
             * 1.websocket中
             * 2.异步servlet
             * 3.发送文件
             * */
            Processor<S> processor = connections.get(socket);

            if (status == SocketStatus.DISCONNECT && processor == null) {
                // Nothing to do. Endpoint requested a close and there is no
                // longer a processor associated with this socket.
                return SocketState.CLOSED;
            }

            wrapper.setAsync(false);
            //标记当前线程是否是容器线程 set则是容器线程
            ContainerThreadMarker.set();
            /**
             *
             * 创建一个Http11NioProcessor 实例里面构造了request 和response成员变量
             * 各封装了一个InternalNioInputBuffer实例
             * 其中request中封装了成员属性名inputBuffer
             *    response中封装了成员属性名outputBuffer
             * */
            try {
                if (processor == null) {
                    processor = recycledProcessors.pop();
                }
                if (processor == null) {
                    processor = createProcessor();
                }

                initSsl(wrapper, processor);

                SocketState state = SocketState.CLOSED;

                Iterator<DispatchType> dispatches = null;
                do {
                    if (dispatches != null) {
                        // Associate the processor with the connection as
                        // these calls may result in a nested call to process()
                        connections.put(socket, processor);
                        DispatchType nextDispatch = dispatches.next();
                        if (processor.isUpgrade()) {
                            state = processor.upgradeDispatch(
                                    nextDispatch.getSocketStatus());
                        } else {
                            state = processor.asyncDispatch(
                                    nextDispatch.getSocketStatus());
                        }
                    } else if (processor.isComet()) {
                        state = processor.event(status);
                    } else if (processor.isUpgrade()) {
                        state = processor.upgradeDispatch(status);
                    } else if (status == SocketStatus.DISCONNECT) {
                        // Comet and upgrade need to see DISCONNECT but the
                        // others don't. NO-OP and let socket close.
                    } else if (processor.isAsync() || state == SocketState.ASYNC_END) {
                        state = processor.asyncDispatch(status);
                        if (state == SocketState.OPEN) {
                            // release() won't get called so in case this request
                            // takes a long time to process, remove the socket from
                            // the waiting requests now else the async timeout will
                            // fire
                            getProtocol().endpoint.removeWaitingRequest(wrapper);
                            // There may be pipe-lined data to read. If the data
                            // isn't processed now, execution will exit this
                            // loop and call release() which will recycle the
                            // processor (and input buffer) deleting any
                            // pipe-lined data. To avoid this, process it now.
                            state = processor.process(wrapper);
                        }
                    } else if (status == SocketStatus.OPEN_WRITE) {
                        // Extra write event likely after async, ignore
                        state = SocketState.LONG;
                    } else {
                        //这个是在第一次请求的时候执行
                        state = processor.process(wrapper);
                    }

                    //根据异步asyncStateMachine的状态设置Socket的状态
                    if (state != SocketState.CLOSED && processor.isAsync()) {
                        state = processor.asyncPostProcess();
                    }

                    if (state == SocketState.UPGRADING) {
                        // Get the HTTP upgrade handler
                        UpgradeToken upgradeToken = processor.getUpgradeToken();
                        HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
                        // Retrieve leftover input
                        ByteBuffer leftoverInput = processor.getLeftoverInput();
                        // Release the Http11 processor to be re-used
                        release(wrapper, processor, false, false);
                        // Create the upgrade processor
                        processor = createUpgradeProcessor(
                                wrapper, leftoverInput, upgradeToken);
                        // Mark the connection as upgraded
                        wrapper.setUpgraded(true);
                        // Associate with the processor with the connection
                        connections.put(socket, processor);
                        // Initialise the upgrade handler (which may trigger
                        // some IO using the new protocol which is why the lines
                        // above are necessary)
                        // This cast should be safe. If it fails the error
                        // handling for the surrounding try/catch will deal with
                        // it.
                        if (upgradeToken.getInstanceManager() == null) {
                            httpUpgradeHandler.init((WebConnection) processor);
                        } else {
                            ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
                            try {
                                httpUpgradeHandler.init((WebConnection) processor);
                            } finally {
                                upgradeToken.getContextBind().unbind(false, oldCL);
                            }
                        }
                    }
                    if (getLog().isDebugEnabled()) {
                        getLog().debug("Socket: [" + wrapper +
                                "], Status in: [" + status +
                                "], State out: [" + state + "]");
                    }
                    if (dispatches == null || !dispatches.hasNext()) {
                        // Only returns non-null iterator if there are
                        // dispatches to process.
                        dispatches = wrapper.getIteratorAndClearDispatches();
                    }
                } while (state == SocketState.ASYNC_END ||
                        state == SocketState.UPGRADING ||
                        dispatches != null && state != SocketState.CLOSED);

                if (state == SocketState.LONG) {
                    // In the middle of processing a request/response. Keep the
                    // socket associated with the processor. Exact requirements
                    // depend on type of long poll
                    //异步在第一次处理的时候会将其设置到当前connections中去
                    connections.put(socket, processor);
                    longPoll(wrapper, processor);
                } else if (state == SocketState.OPEN) {
                    // In keep-alive but between requests. OK to recycle
                    // processor. Continue to poll for the next request.
                    connections.remove(socket);
                    release(wrapper, processor, false, true);
                } else if (state == SocketState.SENDFILE) {
                    // Sendfile in progress. If it fails, the socket will be
                    // closed. If it works, the socket either be added to the
                    // poller (or equivalent) to await more data or processed
                    // if there are any pipe-lined requests remaining.
                    connections.put(socket, processor);
                } else if (state == SocketState.UPGRADED) {
                    // Don't add sockets back to the poller if this was a
                    // non-blocking write otherwise the poller may trigger
                    // multiple read events which may lead to thread starvation
                    // in the connector. The write() method will add this socket
                    // to the poller if necessary.
                    if (status != SocketStatus.OPEN_WRITE) {
                        longPoll(wrapper, processor);
                    }
                } else {
                    // Connection closed. OK to recycle the processor. Upgrade
                    // processors are not recycled.
                    connections.remove(socket);
                    if (processor.isUpgrade()) {
                        UpgradeToken upgradeToken = processor.getUpgradeToken();
                        HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
                        InstanceManager instanceManager = upgradeToken.getInstanceManager();
                        if (instanceManager == null) {
                            httpUpgradeHandler.destroy();
                        } else {
                            ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
                            try {
                                httpUpgradeHandler.destroy();
                            } finally {
                                try {
                                    instanceManager.destroyInstance(httpUpgradeHandler);
                                } catch (Throwable e) {
                                    ExceptionUtils.handleThrowable(e);
                                    getLog().error(sm.getString("abstractConnectionHandler.error"), e);
                                }
                                upgradeToken.getContextBind().unbind(false, oldCL);
                            }
                        }
                    } else {
                        release(wrapper, processor, true, false);
                    }
                }
                return state;
            } catch(java.net.SocketException e) {
                // SocketExceptions are normal
                getLog().debug(sm.getString(
                        "abstractConnectionHandler.socketexception.debug"), e);
            } catch (java.io.IOException e) {
                // IOExceptions are normal
                getLog().debug(sm.getString(
                        "abstractConnectionHandler.ioexception.debug"), e);
            }
            // Future developers: if you discover any other
            // rare-but-nonfatal exceptions, catch them here, and log as
            // above.
            catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                // any other exception or error is odd. Here we log it
                // with "ERROR" level, so it will show up even on
                // less-than-verbose logs.
                getLog().error(
                        sm.getString("abstractConnectionHandler.error"), e);
            } finally {
                ContainerThreadMarker.clear();
            }

            // Make sure socket/processor is removed from the list of current
            // connections
            connections.remove(socket);
            // Don't try to add upgrade processors back into the pool
            if (processor !=null && !processor.isUpgrade()) {
                release(wrapper, processor, true, false);
            }
            return SocketState.CLOSED;
        }

     从代码中可以看出获取Processor共经过三种途径,首先在connections这个map根据socket找到对应的Processor实例,也许你会有疑惑socket为什么会相同,目前我知道的有基于长连接和Upgrade来实现的socket,这样就有效的保留其中的协议状态,以及部分请求数据。如果从其中并没有获取则在循环队列中获取(下文讲述循环队列),这相当于从栈中获取元素,这是因为当一个实例化后的Processor处理完之后,并不会回收,而是释放存入栈中供下次来可以直接进行使用,如果栈中不存在则自己再实例化一个。由这种方式可以看出其实例化跟浏览器的请求没有多大关系,在一次会话中可能使用不同的,在不同会话中也可能使用相同的Processor

1.3 Processor的释放

     在当前socket处理完之后,会将Processor给释放,在这里将其部分句柄给重置之后,然后就压入循环队列供下次使用,其具体处理过程在BIO NIO 和AIO中有所出入

 protected abstract void release(SocketWrapper<S> socket,
                Processor<S> processor, boolean socketClosing,
                boolean addToPoller);

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java帮帮-微信公众号-技术文章全总结

24(02)多线程锁,线程通讯,线程组,线程池,多线程三种方式,匿名内部类,定时器,设计模式,单例模式,Runtime

(6)多线程实现的第三种方案 package cn.itcast_09;(1) import java.util.concurrent.Callable; //...

3514
来自专栏MasiMaro 的技术博文

驱动程序的同步处理

驱动程序运行在系统的内核地址空间,而所有进程共享这2GB的虚拟地址空间,所以绝大多数驱动程序是运行在多线程环境中,有的时候需要对程序进行同步处理,使某些操作是严...

531
来自专栏IT开发技术与工作效率

Hibernate一对多保存思考题 Java框架

713
来自专栏Android开发指南

用最简单的例子说明设计模式(一)之单例模式、工厂模式、装饰模式、外观模式

3695
来自专栏猿天地

高性能NIO框架Netty-对象传输

上篇文章高性能NIO框架Netty入门篇我们对Netty做了一个简单的介绍,并且写了一个入门的Demo,客户端往服务端发送一个字符串的消息,服务端回复一个字符串...

2848
来自专栏Spark生态圈

[spark] 内存管理 MemoryManager 解析

spark的内存管理有两套方案,新旧方案分别对应的类是UnifiedMemoryManager和StaticMemoryManager。

972
来自专栏jeremy的技术点滴

Java开发小技巧_02

3414
来自专栏菩提树下的杨过

java学习:weblogic下JNDI及JDBC连接测试(weblogic环境)

JNDI的专业解释,大家自行去网络搜索吧,这里就不啰嗦了。 单纯从使用角度看,可以简称把它看成一个key-value的“哈希资源”容器。给定一个string类型...

2139
来自专栏nice_每一天

转载 Java设计模式

设计模式; 一个程序员对设计模式的理解: “不懂”为什么要把很简单的东西搞得那么复杂。后来随着软件开发经验的增加才开始明白我所看到的“复杂”恰恰就是设计模式的精...

582
来自专栏nnngu

记录某公司(简称SMKJ) 的一次面试

昨天去了一家公司面试 Java 开发岗位,这篇文章主要是做一个面试的记录以及总结。 这家公司的规模大概100-200人,环境还可以,在一栋大厦租了两层办公室(3...

3006

扫描关注云+社区