版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_17655941/article/details/103362115
这几天公司要求实现 springcloud gateway 自定义协议 需要对外统一提供http/https 的接口,但是后端有很多服务提供了不同的方式,包括dubbo 协议,和dubbo 上提供的各种访问协议等(dubbo服务上协议的支持),需要从网关直接代理去访问。仔细分析后,发现gateway 只有自带的几种协议 ,http/https 和 ws 这个几种 ps:这就很难受,网上也百度了许多没有找到具体的实现方案,或许没有人这样干吧,在需求的推动下,决定自己研究一把.
在没有头绪的情况下,还是得从阅读源码开始,源码下载地址,然后配置官方提供的私服地址,因为源码引用的有些包是未发布版本,在项目下找到对应 .flattened-pom.xml 文件下,上面的地址就是私服地址。(我这个说几个需要用到的类,其他的自己看百度查看 推荐学习地址)
这个类是对gateway GatewayFilter 的实例类注入,在源代码调试过程中要在这里添加自己自定义的GatewayFilter,不然用注解注入不能起作用。
全局网关过滤器,是一个接口,自定义需要实现此接口(下面说明的类都实现此接口,并在同一个包中)
@Override @SuppressWarnings("Duplicates") public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); /** * 获取访问的url地址 */ String scheme = requestUrl.getScheme(); /** * 判断这次请求是否被处理过,请求协议是否是http/https * 如果满足则不处理 */ if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } /** * 开始处理,首先将状态标志为已处理 */ setAlreadyRouted(exchange); /** * 获取ServerHttpRequest,相当于HttpServletRequest请求,在gateway * 中使用的是webflux 做的web服务,所有处理有点变化 */ ServerHttpRequest request = exchange.getRequest(); /** * 获取 请求方式 */ final HttpMethod method = HttpMethod.valueOf(request.getMethodValue()); final String url = requestUrl.toASCIIString(); System.out.println(url); /** * 获取 HttpHeaders */ HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); boolean preserveHost = exchange .getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); /** * httpClient 这次采用httpclient 去代理访问要去代理的地址,相当于网关去 * 帮你请求了一次 */ Flux<HttpClientResponse> responseFlux = this.httpClient.headers(headers -> { headers.add(httpHeaders); /** * 判断请求主机是否一致,一起是访问地址和路由地址是否是一个 * ip 不是则需要替换 */ if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); headers.add(HttpHeaders.HOST, host); } else { // let Netty set it based on hostname headers.remove(HttpHeaders.HOST); } }).request(method).uri(url).send((req, nettyOutbound) -> { if (log.isTraceEnabled()) { nettyOutbound.withConnection(connection -> log.trace( "outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix())); } NettyOutbound outbound = nettyOutbound.send(request.getBody() .map(dataBuffer -> ((NettyDataBuffer) dataBuffer).getNativeBuffer())); return outbound; }).responseConnection((res, connection) -> { // Defer committing the response until all route filters have run // Put client response as ServerWebExchange attribute and write // response later NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); /** * 异步获取 httpClient 访问的响应结果,下面就是对响应结果的封装 */ ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); res.responseHeaders() .forEach(entry -> headers.add(entry.getKey(), entry.getValue())); String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } HttpStatus status = HttpStatus.resolve(res.status().code()); if (status != null) { response.setStatusCode(status); } else if (response instanceof AbstractServerHttpResponse) { // https://jira.spring.io/browse/SPR-16748 ((AbstractServerHttpResponse) response) .setStatusCodeValue(res.status().code()); } else { // TODO: log warning here, not throw error? throw new IllegalStateException("Unable to set status code on response: " + res.status().code() + ", " + response.getClass()); } // make sure headers filters run after setting status so it is // available in response HttpHeaders filteredResponseHeaders = HttpHeadersFilter .filter(getHeadersFilters(), headers, exchange, Type.RESPONSE); if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) { // It is not valid to have both the transfer-encoding header and // the content-length header. // Remove the transfer-encoding header in the response if the // content-length header is present. response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().putAll(filteredResponseHeaders); System.out.println(res+"///////"); return Mono.just(res); }); if (properties.getResponseTimeout() != null) { responseFlux = responseFlux.timeout(properties.getResponseTimeout(), Mono.error(new TimeoutException("Response took longer than timeout: " + properties.getResponseTimeout()))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } return responseFlux.then(chain.filter(exchange)); }
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added // until the NettyRoutingFilter is run // @formatter:off return chain.filter(exchange) .doOnError(throwable -> cleanup(exchange)) .then(Mono.defer(() -> { Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR); if (connection == null) { return Mono.empty(); } if (log.isTraceEnabled()) { log.trace("NettyWriteResponseFilter start inbound: " + connection.channel().id().asShortText() + ", outbound: " + exchange.getLogPrefix()); } ServerHttpResponse response = exchange.getResponse(); // TODO: what if it's not netty NettyDataBufferFactory factory = (NettyDataBufferFactory) response .bufferFactory(); // TODO: needed? 对响应结果进一个解析,获取到的数据是一个NettyDataBuffer,可以百度下 // 是netty的数据 final Flux<NettyDataBuffer> body = connection .inbound() .receive() .retain() .map(factory::wrap); MediaType contentType = null; try { contentType = response.getHeaders().getContentType(); } catch (Exception e) { if (log.isTraceEnabled()) { log.trace("invalid media type", e); } } return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })).doOnCancel(() -> cleanup(exchange)); // @formatter:on }
上面的代码变量和编写方式都和框架使用了webflux 有关 大家可以学习一下就很容易理解了阿里JAVA架构师详解Spring5新特性之WebFlux 我也是看了这个视频学会的 2个小时,感觉讲的挺好的
package com.neo.config; import com.alibaba.fastjson.JSONObject; import io.netty.buffer.UnpooledByteBufAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter; import org.springframework.context.annotation.Configuration; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.StringUtils; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.io.UnsupportedEncodingException; import java.lang.reflect.Method; import java.net.URI; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*; /** * @ClassName DubboResponseGlobalFilter * @Desription 协议转换的过滤器类 * @Author zhangzexu * @Date 2019/11/28 17:14 * @Version V1.0 */ @Configuration public class DubboResponseGlobalFilter implements GlobalFilter, Ordered { @Value("${plugin.calssName}") private String className; private static Logger LOGGER = LoggerFactory.getLogger(DubboResponseGlobalFilter.class); private volatile List<HttpHeadersFilter> headersFilters; @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } public DubboResponseGlobalFilter() { } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); final String scheme = requestUrl.getScheme(); if (isAlreadyRouted(exchange) || "http".equals(scheme) || "https".equals(scheme) || "lb".equals(scheme) || "ws".equals(scheme)) { return chain.filter(exchange); } LOGGER.info("请求的url为{},协议为{}",requestUrl,scheme); setAlreadyRouted(exchange); /** * 获取请求的url 对路径进行重新编码 */ final String url = requestUrl.toASCIIString(); Flux<DataBuffer> flux = exchange.getRequest().getBody(); AtomicReference<byte[]> atomicReference = new AtomicReference<>(); /** * 获取客户端请求的数据,body体 */ flux.subscribe(buffer -> { byte[] bytes = new byte[buffer.readableByteCount()]; buffer.read(bytes); DataBufferUtils.release(buffer); atomicReference.set(bytes); }); return chain.filter(exchange) .then(Mono.defer(() -> { ServerHttpResponse response = exchange.getResponse(); return response.writeWith(Flux.create(sink -> { NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(new UnpooledByteBufAllocator(false)); JSONObject json = new JSONObject(); Class c = null; DataBuffer dataBuffer = null; String charset = "UTF-8"; try { /** * 初始化反射数据,将要调用的类反射获取,反射的类的名称结构, * 用 dubbo 协议举例 * 则插件的类名组合为 DubboGatewayImpl */ StringBuilder sb = new StringBuilder(className); sb.append("."); char[] name = scheme.toCharArray(); name[0] -= 32; sb.append(String.valueOf(name)); sb.append("GatewayPluginImpl"); c = Class.forName(sb.toString()); c.getMethods(); Method method = c.getMethod("send", String.class, byte[].class); Object obj = c.getConstructor().newInstance(); Object result = method.invoke(obj, url, atomicReference.get()); HttpStatus status = HttpStatus.resolve(500); /** * 判断结果是否返回,如果没有数据则直接返回 */ if (result == null) { } else { json = (JSONObject) result; status = HttpStatus.resolve(json.getInteger("code")); json.remove("code"); /** * 获取字符集编码格式 默认 utf-8 */ if (!StringUtils.isEmpty(json.getString("charset"))) { charset = json.getString("charset"); } } response.setStatusCode(status); try { dataBuffer = nettyDataBufferFactory.wrap(json.toJSONString().getBytes(charset)); } catch (UnsupportedEncodingException e) { dataBuffer = nettyDataBufferFactory.wrap(e.toString().getBytes(charset)); LOGGER.error("返回调用请求数据错误{}",e); e.printStackTrace(); } } catch (Exception e) { try { dataBuffer = nettyDataBufferFactory.wrap(e.toString().getBytes(charset)); LOGGER.error("获取远程数据错误{}",e); } catch (UnsupportedEncodingException ex) { ex.printStackTrace(); LOGGER.error("返回调用请求数据错误{}",ex); } e.printStackTrace(); } /** * 将数据进行发射到下一个过滤器 */ sink.next(dataBuffer); sink.complete(); })); })); } }
通过反射机制类完成除过gateway 自定义协议外的所有解析进行处理。
这就完了,简单把 具体协议插件实现可以下载源代码
完整项目下载 github
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句