前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >facebook/swift:构建thrift http server(4)--ThriftXHRDecoder,ThriftXHREncoder

facebook/swift:构建thrift http server(4)--ThriftXHRDecoder,ThriftXHREncoder

作者头像
10km
发布2019-05-25 20:37:59
4670
发布2019-05-25 20:37:59
举报
文章被收录于专栏:10km的专栏10km的专栏

版权声明:本文为博主原创文章,转载请注明源地址。 https://cloud.tencent.com/developer/article/1433452

《facebook/swift:构建thrift http server(1)》

《facebook/swift:构建thrift http server(2)–HttpServerCodec》

《facebook/swift:构建thrift http server(3)–CORS跨域》

在上一篇博客中解决了thrift http sever的CORS跨域问题,但前端依然没有服务端的正常响应。看来还存在问题。

继续研究Netty的代码。

ThriftMessage

通过跟踪服务端收到的HTTP POST请求在管道(ChannelPipeline)中的传递流程找到了问题:

如下是实现将网络请求分发到thrift服务实例(NiftyProcessor)的ChannelHandler实例com.facebook.nifty.core.NiftyDispatchermessageReceived方法实现代码:

代码语言:javascript
复制
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
            throws Exception
    {    	
        if (e.getMessage() instanceof ThriftMessage) {
            ThriftMessage message = (ThriftMessage) e.getMessage();
            if (taskTimeoutMillis > 0) {
                message.setProcessStartTimeMillis(System.currentTimeMillis());
            }
            checkResponseOrderingRequirements(ctx, message);

            TNiftyTransport messageTransport = new TNiftyTransport(ctx.getChannel(), message);
            TTransportPair transportPair = TTransportPair.fromSingleTransport(messageTransport);
            TProtocolPair protocolPair = duplexProtocolFactory.getProtocolPair(transportPair);
            TProtocol inProtocol = protocolPair.getInputProtocol();
            TProtocol outProtocol = protocolPair.getOutputProtocol();

            processRequest(ctx, message, messageTransport, inProtocol, outProtocol);
        }
        else {
        	// 如果不是message是ThriftMessage实例则继续向下传递
            ctx.sendUpstream(e);
        }
    }

请注意第一行的判断语句,只有收到的消息是ThriftMessage实例,才会响应请求。

HttpServerCodec只会将收到的网络请求解析为DefaultHttpRequest

参见 org.jboss.netty.handler.codec.http.HttpRequestDecoder

所以问题搞清楚了。我们需要一个ChannelUpstreamHandlerDefaultHttpRequest转换为ThriftMessage向后传递给NiftyDispatcher,同时也需要一个ChannelDownstreamHandlerThriftMessage转为DefaultHttpResponse向前传递给HttpServerCodecHttpResponseEncoder进一步封装成HTTP响应数据最后发送给前端。

ThriftXHRDecoder

HTTP request解码器ThriftXHRDecoder实现如下:

代码语言:javascript
复制
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.UpstreamMessageEvent;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;

import com.facebook.nifty.core.ThriftMessage;
import com.facebook.nifty.core.ThriftTransportType;

/**
 * XHR(XML Http Request)解码器<br>
 * 将{@link HttpRequest}请求的内容数据(content)转为{@link ThriftMessage},
 * 提供给{@link com.facebook.nifty.core.NiftyDispatcher}
 * @author guyadong
 *
 */
public class ThriftXHRDecoder extends SimpleChannelUpstreamHandler {
	
	public ThriftXHRDecoder() {
	}

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		if(e.getMessage() instanceof HttpRequest){
			HttpRequest request = (HttpRequest)e.getMessage();
			if(request.getContent().readable() && HttpMethod.POST.equals(request.getMethod())){
				// 非FRAME数据
				ThriftMessage thriftMessage = new ThriftMessage(request.getContent(),ThriftTransportType.UNFRAMED);
				ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(), thriftMessage, e.getRemoteAddress()));
				return;
			}
		}
		super.messageReceived(ctx, e);
	}
}

ThriftXHREncoder

ThriftMessage编码器ThriftXHREncoder实现如下:

代码语言:javascript
复制
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.DownstreamMessageEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import com.facebook.nifty.core.ThriftMessage;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
 * XHR(XML Http Request)编码器<br>
 * 将{@link com.facebook.nifty.core.NiftyDispatcher}输出的
 * {@link ThriftMessage}响应数据转为{@link DownstreamMessageEvent},
 * 
 * @author guyadong
 *
 */
public class ThriftXHREncoder extends SimpleChannelDownstreamHandler {

	@Override
	public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		if(e.getMessage() instanceof ThriftMessage){
			ThriftMessage thriftMessage = (ThriftMessage)e.getMessage();
			if(thriftMessage.getBuffer().readable()){
				switch (thriftMessage.getTransportType()) {
				case UNFRAMED:
                    DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
                    response.setContent(thriftMessage.getBuffer());
					ctx.sendDownstream(new DownstreamMessageEvent(ctx.getChannel(), 
							Channels.future(ctx.getChannel()), response, e.getRemoteAddress()));
					return;
				default:
	                throw new UnsupportedOperationException(
	                		thriftMessage.getTransportType().name() +" transport is not supported");
				}
			}
		}
		super.writeRequested(ctx, e);
	}

}

修改ChannelPipeline

有了ThriftXHRDecoderThriftXHREncoder就可以将它们添加到frameCodec之后,dispatcher之前,基于上一篇中addCorsHandlerIfHttp方法代码修改如下,

代码语言:javascript
复制
	/**
	 * 添加CORS Handler和XHR编解码器
	 */
	protected void addCorsHandlerIfHttp(){
		if(HTTP_TRANSPORT.equals(thriftServerConfig.getTransportName())){
			try {
				// 反射获取私有的成员NettyServerTransport
				final NettyServerTransport nettyServerTransport = ReflectionUtils.valueOfField(thriftServer, "transport");
				// 反射获取私有的成员ChannelPipelineFactory
				Field pipelineFactory = NettyServerTransport.class.getDeclaredField("pipelineFactory");
				{
					Field modifiersField = Field.class.getDeclaredField("modifiers");
					modifiersField.setAccessible(true); //Field 的 modifiers 是私有的
					modifiersField.setInt(pipelineFactory, pipelineFactory.getModifiers() & ~Modifier.FINAL);
				}
				pipelineFactory.setAccessible(true);
				final ChannelPipelineFactory channelPipelineFactory = (ChannelPipelineFactory) pipelineFactory.get(nettyServerTransport);
				final Netty3CorsConfig corsConfig = Netty3CorsConfigBuilder.forAnyOrigin()
					.allowedRequestMethods(POST,GET,OPTIONS)
					.allowedRequestHeaders("Origin","Content-Type","Accept","application","x-requested-with")
					.build();
				ChannelPipelineFactory factoryWithCORS = new ChannelPipelineFactory(){

					@Override
					public ChannelPipeline getPipeline() throws Exception {
						// 修改 ChannelPipeline,在frameCodec后(顺序)增加CORS handler,XHR编解码器
						ChannelPipeline cp = channelPipelineFactory.getPipeline();
						cp.addAfter("frameCodec", "thriftXHRDecoder", new ThriftXHRDecoder());
						cp.addAfter("frameCodec", "thriftXHREncoder", new ThriftXHREncoder());
						cp.addAfter("frameCodec", "cors", new Netty3CorsHandler(corsConfig));
						return cp;
					}};
				// 修改nettyServerTransport的私有常量pipelineFactory
				pipelineFactory.set(nettyServerTransport, factoryWithCORS);
			} catch (Exception e) {
				Throwables.throwIfUnchecked(e);
				throw new RuntimeException(e);
			}
		}
	}

修改完毕后再执行test_js.html,thrift http server终于有响应了。

而且数据结果正常

遗留问题

至此,基于facebook/swift构建的thrift http server已经基本可以正常工作,但还是存在一个小问题。HTTP响应只会在前端空闲超时后才会发送响应数据到前端,所以ThriftServerConfig的IdleConnectionTimeout如果设置过大(比如默认值60s),那么前端要在发送请求60秒后才会收到服务端的响应,这个问题一直到目前还没有找到。所以目前的办法是将这个值设置在10ms,就基本不会影响前端的响应延迟。

零零散散的写了好几篇文章,贴出的代码都比较零碎,完整的代码参见码云仓库

https://gitee.com/l0km/common-java/tree/master/common-thrift/src/main/java/com/facebook/swift/service

重要的修改在ThriftServerService

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年05月04日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ThriftMessage
  • ThriftXHRDecoder
  • ThriftXHREncoder
  • 修改ChannelPipeline
  • 遗留问题
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档