前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Gateway源码解读

Gateway源码解读

原创
作者头像
小爽只会CRUD
发布2023-05-09 16:32:55
6780
发布2023-05-09 16:32:55
举报
文章被收录于专栏:一头猪的学习一头猪的学习

Gateway是什么?

  1. Gateway是在Spring生态系统上的API网关服务,基于Spring、SpringBoot和Project等技术
  2. Gateway旨在提供一种简单而有效的方式来对API进行路由,以及提供一些强大的过滤功能,如:熔断、限流、重试等
  3. Gateway官网链接:https://cloud.spring.io/spring-cloud-static/spring-cloud-gateway/2.2.1.RELEASE/reference/html/
  4. 核心功能:鉴权、流量控制、熔断、日志监控、反向代理

Gateway基本原理?

Gateway原理图
Gateway原理图

个人理解:

  1. web请求通过一些匹配条件,从而定位到真正的服务节点/微服务模块,但是在该转发过程的前后有一些精细化的控制
  2. predicate断言就是匹配条件
  3. filter过滤,个人感觉可以理解为网关的过滤机制,有了predicate断言和filter过滤,再加上目标url就可以实现一个具体的路由
  4. route路由:网关基本模块,由id,目标url,一系列断言和过滤器组成,如果断言为true那么就匹配该路由
  5. predicate断言:对http请求中的所有内容(如请求头or请求参数)进行匹配,如果请求与断言相匹配就路由
  6. filter过滤:可以再请求被路由的前后对请求进行处理

Gateway工作机制理解 How it works

How it works
How it works

流程理解:

  1. 客户端向Gateway发出请求,在Gateway Handler Mapping中找到了与请求相匹配的路由,将其转发到Gateway Web Handler
  2. Handler通过指定的过滤器链把请求发送到实际的服务业务逻辑,然后返回
  3. 过滤器之间用虚线是因为过滤器可以在请求之前pre或者请求之后post执行业务逻辑
  4. Filter在“pre”类型的过滤器可以做参数校验、权限校验、流量控制、日志输出、协议转换等
  5. Filter在“post”类型的过滤器可以做响应内容、响应头的修改、日志输出、流量监控等
  6. 综上所述:工作机制为 路由转发 + 执行过滤器链

前置知识:jdk8谓词

谓词是一个函数式接口,可以接受一个参数并返回一个布尔值,表示该参数是否满足某个条件

两个谓词接口:

接受一个参数的,java.util.function.Predicate<T>

代码语言:java
复制
@FunctionalInterface
public interface Predicate<T> {

    /**
     * Evaluates this predicate on the given argument.
     *
     * @param t the input argument
     * @return {@code true} if the input argument matches the predicate,
     * otherwise {@code false}
     */
    boolean test(T t);

    /**
     * Returns a composed predicate that represents a short-circuiting logical
     * AND of this predicate and another.  When evaluating the composed
     * predicate, if this predicate is {@code false}, then the {@code other}
     * predicate is not evaluated.
     *
     * <p>Any exceptions thrown during evaluation of either predicate are relayed
     * to the caller; if evaluation of this predicate throws an exception, the
     * {@code other} predicate will not be evaluated.
     *
     * @param other a predicate that will be logically-ANDed with this
     *              predicate
     * @return a composed predicate that represents the short-circuiting logical
     * AND of this predicate and the {@code other} predicate
     * @throws NullPointerException if other is null
     */
    default Predicate<T> and(Predicate<? super T> other) {
        Objects.requireNonNull(other);
        return (t) -> test(t) && other.test(t);
    }

    /**
     * Returns a predicate that represents the logical negation of this
     * predicate.
     *
     * @return a predicate that represents the logical negation of this
     * predicate
     */
    default Predicate<T> negate() {
        return (t) -> !test(t);
    }

    /**
     * Returns a composed predicate that represents a short-circuiting logical
     * OR of this predicate and another.  When evaluating the composed
     * predicate, if this predicate is {@code true}, then the {@code other}
     * predicate is not evaluated.
     *
     * <p>Any exceptions thrown during evaluation of either predicate are relayed
     * to the caller; if evaluation of this predicate throws an exception, the
     * {@code other} predicate will not be evaluated.
     *
     * @param other a predicate that will be logically-ORed with this
     *              predicate
     * @return a composed predicate that represents the short-circuiting logical
     * OR of this predicate and the {@code other} predicate
     * @throws NullPointerException if other is null
     */
    default Predicate<T> or(Predicate<? super T> other) {
        Objects.requireNonNull(other);
        return (t) -> test(t) || other.test(t);
    }

    /**
     * Returns a predicate that tests if two arguments are equal according
     * to {@link Objects#equals(Object, Object)}.
     *
     * @param <T> the type of arguments to the predicate
     * @param targetRef the object reference with which to compare for equality,
     *               which may be {@code null}
     * @return a predicate that tests if two arguments are equal according
     * to {@link Objects#equals(Object, Object)}
     */
    static <T> Predicate<T> isEqual(Object targetRef) {
        return (null == targetRef)
                ? Objects::isNull
                : object -> targetRef.equals(object);
    }
}

接受两个参数的,java.util.function.BiPredicate<T,U>

代码语言:java
复制
@FunctionalInterface
public interface BiPredicate<T, U> {

    /**
     * Evaluates this predicate on the given arguments.
     *
     * @param t the first input argument
     * @param u the second input argument
     * @return {@code true} if the input arguments match the predicate,
     * otherwise {@code false}
     */
    boolean test(T t, U u);

    /**
     * Returns a composed predicate that represents a short-circuiting logical
     * AND of this predicate and another.  When evaluating the composed
     * predicate, if this predicate is {@code false}, then the {@code other}
     * predicate is not evaluated.
     *
     * <p>Any exceptions thrown during evaluation of either predicate are relayed
     * to the caller; if evaluation of this predicate throws an exception, the
     * {@code other} predicate will not be evaluated.
     *
     * @param other a predicate that will be logically-ANDed with this
     *              predicate
     * @return a composed predicate that represents the short-circuiting logical
     * AND of this predicate and the {@code other} predicate
     * @throws NullPointerException if other is null
     */
    default BiPredicate<T, U> and(BiPredicate<? super T, ? super U> other) {
        Objects.requireNonNull(other);
        return (T t, U u) -> test(t, u) && other.test(t, u);
    }

    /**
     * Returns a predicate that represents the logical negation of this
     * predicate.
     *
     * @return a predicate that represents the logical negation of this
     * predicate
     */
    default BiPredicate<T, U> negate() {
        return (T t, U u) -> !test(t, u);
    }

    /**
     * Returns a composed predicate that represents a short-circuiting logical
     * OR of this predicate and another.  When evaluating the composed
     * predicate, if this predicate is {@code true}, then the {@code other}
     * predicate is not evaluated.
     *
     * <p>Any exceptions thrown during evaluation of either predicate are relayed
     * to the caller; if evaluation of this predicate throws an exception, the
     * {@code other} predicate will not be evaluated.
     *
     * @param other a predicate that will be logically-ORed with this
     *              predicate
     * @return a composed predicate that represents the short-circuiting logical
     * OR of this predicate and the {@code other} predicate
     * @throws NullPointerException if other is null
     */
    default BiPredicate<T, U> or(BiPredicate<? super T, ? super U> other) {
        Objects.requireNonNull(other);
        return (T t, U u) -> test(t, u) || other.test(t, u);
    }
}

内置谓词工厂

如PathRoutePredicateFactory:根据请求的路径是否匹配给定的路径模式进行匹配

需求:将request作为参数传入谓词对象,取出request.getURI(),与Path中配置的regex判断

代码语言:java
复制
public class PathRoutePredicateFactory extends AbstractRoutePredicateFactory<PathRoutePredicateFactory.Config> {
    private static final Log log = LogFactory.getLog(PathRoutePredicateFactory.class);
    private static final String MATCH_TRAILING_SLASH = "matchTrailingSlash";
    private PathPatternParser pathPatternParser = new PathPatternParser();


    public Predicate<ServerWebExchange> apply(PathRoutePredicateFactory.Config config) {
        final ArrayList<PathPattern> pathPatterns = new ArrayList();
        synchronized(this.pathPatternParser) {
            this.pathPatternParser.setMatchOptionalTrailingSeparator(config.isMatchTrailingSlash());
            config.getPatterns().forEach((pattern) -> {
                PathPattern pathPattern = this.pathPatternParser.parse(pattern);
                pathPatterns.add(pathPattern);
            });
        }

        return new GatewayPredicate() {
            // 一个包装对象
            public boolean test(ServerWebExchange exchange) {
                // 取出该对象的uri
                PathContainer path = PathContainer.parsePath(exchange.getRequest().getURI().getRawPath());
                Optional<PathPattern> optionalPathPattern = pathPatterns.stream().filter((pattern) -> {
                    return pattern.matches(path);
                }).findFirst();
                if (optionalPathPattern.isPresent()) {
                    PathPattern pathPattern = (PathPattern)optionalPathPattern.get();
                    PathRoutePredicateFactory.traceMatch("Pattern", pathPattern.getPatternString(), path, true);
                    PathMatchInfo pathMatchInfo = pathPattern.matchAndExtract(path);
                    ServerWebExchangeUtils.putUriTemplateVariables(exchange, pathMatchInfo.getUriVariables());
                    return true;
                } else {
                    PathRoutePredicateFactory.traceMatch("Pattern", config.getPatterns(), path, false);
                    return false;
                }
            }

            public String toString() {
                return String.format("Paths: %s, match trailing slash: %b", config.getPatterns(), config.isMatchTrailingSlash());
            }
        };
    }
}

这里同时也涉及到了工厂设计模式的一些概念

简单工厂模式:1个工厂产出n个产品,创建逻辑都写在一个方法里,无法根据不同的参数来创建产品

抽象工厂模式:1个工厂产出1个产品

当Gateway读取配置文件,读到断言的Path就知道去PathRoutePredicateFactory工厂产出该工厂的断言

工厂方法模式,工厂是一个抽象,产品是一个抽象,工厂实现与产品实现一一对应

Gateway源码

Gateway的自动配置:SpringBoot 在引入一个新的组件时,一般都会有对应的XxxAutoConfiguration类来对该组件进行配置,Gateway也不例外,在引入了以下配置后,就会生成对应的GatewayAutoConfiguration自动配置类,最核心是响应式的Web框

代码语言:html
复制
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

GatewayAutoConfiguration自动配置类

代码语言:java
复制
@Configuration(proxyBeanMethods = false)
// spring.cloud.gateway.enabled配置项必须为true,自动配置才生效,默认为true
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
// 自动配置前置条件:引入了WebFlux 和 HttpHandler 组件
@AutoConfigureBefore({ HttpHandlerAutoConfiguration.class,
                      WebFluxAutoConfiguration.class })
// 自动配置后置组件:负载均衡组件
@AutoConfigureAfter({ GatewayLoadBalancerClientAutoConfiguration.class,
                     GatewayClassPathWarningAutoConfiguration.class })
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {
    ..... // 初始化各种bean
}

GatewayClassPathWarningAutoConfiguration:Gateway基于Spring WebFlux实现,此配置类用于检查项目是否正确导入spring-boot-starter-webflux依赖,而不是错误导入spring-boot-starter-web依赖GatewayLoadBalancerClientAutoConfiguration:初始化LoadBalancerClientFilter

重点在GatewayAutoConfiguration这个配置类上:

  • NettyConfiguration:使用HttpCliemt Bean,创建一个类型为org.springframework.cloud.NettyRoutingFilter的Bean对象,使用基于Netty实现的HttpClient请求后端http服务
  • GlobalFilter:初始全局的GlobalFilter
  • GateWayProperties:读取配置文件的RouteDefinition/FilterDefinition
  • FilteringWebHandler:它的handle方法,形成过滤器链并顺序调用
代码语言:java
复制
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
	// 拿到对应的route
	Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
	// 取出此路由自定义的过滤器
	List<GatewayFilter> gatewayFilters = route.getFilters();

	List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
	
	// 添加全局过滤器,合并所有的过滤器,形成过滤器链
	combined.addAll(gatewayFilters);
	// TODO: needed or cached?
	// 排序
	AnnotationAwareOrderComparator.sort(combined);

	if (logger.isDebugEnabled()) {
		logger.debug("Sorted gatewayFilterFactories: " + combined);
	}
	// 基于一个请求得到过滤器链
	// 责任链模式
	return new DefaultGatewayFilterChain(combined).filter(exchange);
}
  • PrefixPathGatewayFilterFactory:前面提及的工厂模式,很多个GatewayFilterFactory的创建
  • RoutePredicateFactory:很多个RoutePredicateFactory的创建
  • RoutePredicateHandlerMapping:getHandlerInternal() -> lookupRoute()很重要,可以看到前面提到的谓词调用,通过谓词调用来知道Route是谁了
代码语言:java
复制
/**
	 *
	 * @param webHandler 上边装配的FilteringWebHandler
	 * @param routeLocator 上边装配的CachingRouteLocator
	 */
	@Bean
	public RoutePredicateHandlerMapping routePredicateHandlerMapping(
			FilteringWebHandler webHandler, RouteLocator routeLocator,
			GlobalCorsProperties globalCorsProperties, Environment environment) {
		return new RoutePredicateHandlerMapping(webHandler, routeLocator,
				globalCorsProperties, environment);
	}
代码语言:java
复制
package org.springframework.cloud.gateway.handler;

import java.util.function.Function;

import reactor.core.publisher.Mono;

import org.springframework.cloud.gateway.config.GlobalCorsProperties;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.core.env.Environment;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.reactive.handler.AbstractHandlerMapping;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping.ManagementPortType.DIFFERENT;
import static org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping.ManagementPortType.DISABLED;
import static org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping.ManagementPortType.SAME;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_HANDLER_MAPPER_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;

/**
 * @author Spencer Gibb
 */
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {

	private final FilteringWebHandler webHandler;

	private final RouteLocator routeLocator;

	private final Integer managementPort;

	private final ManagementPortType managementPortType;

	public RoutePredicateHandlerMapping(FilteringWebHandler webHandler,
			RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties,
			Environment environment) {
		this.webHandler = webHandler;
		this.routeLocator = routeLocator;

		this.managementPort = getPortProperty(environment, "management.server.");
		this.managementPortType = getManagementPortType(environment);
		setOrder(1);
		setCorsConfigurations(globalCorsProperties.getCorsConfigurations());
	}

	private ManagementPortType getManagementPortType(Environment environment) {
		Integer serverPort = getPortProperty(environment, "server.");
		if (this.managementPort != null && this.managementPort < 0) {
			return DISABLED;
		}
		return ((this.managementPort == null
				|| (serverPort == null && this.managementPort.equals(8080))
				|| (this.managementPort != 0 && this.managementPort.equals(serverPort)))
						? SAME : DIFFERENT);
	}

	private static Integer getPortProperty(Environment environment, String prefix) {
		return environment.getProperty(prefix + "port", Integer.class);
	}

	@Override
	protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
		// don't handle requests on management port if set and different than server port
		if (this.managementPortType == DIFFERENT && this.managementPort != null
				&& exchange.getRequest().getURI().getPort() == this.managementPort) {
			return Mono.empty();
		}
		exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

        // 查找路由
		return lookupRoute(exchange)
				// .log("route-predicate-handler-mapping", Level.FINER) //name this
				.flatMap((Function<Route, Mono<?>>) r -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isDebugEnabled()) {
						logger.debug(
								"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
					}

					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    // 找到该路由对应的webHandler
					return Mono.just(webHandler);
				}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isTraceEnabled()) {
						logger.trace("No RouteDefinition found for ["
								+ getExchangeDesc(exchange) + "]");
					}
				})));
	}

	@Override
	protected CorsConfiguration getCorsConfiguration(Object handler,
			ServerWebExchange exchange) {
		// TODO: support cors configuration via properties on a route see gh-229
		// see RequestMappingHandlerMapping.initCorsConfiguration()
		// also see
		// https://github.com/spring-projects/spring-framework/blob/master/spring-web/src/test/java/org/springframework/web/cors/reactive/CorsWebFilterTests.java
		return super.getCorsConfiguration(handler, exchange);
	}

	// TODO: get desc from factory?
	private String getExchangeDesc(ServerWebExchange exchange) {
		StringBuilder out = new StringBuilder();
		out.append("Exchange: ");
		out.append(exchange.getRequest().getMethod());
		out.append(" ");
		out.append(exchange.getRequest().getURI());
		return out.toString();
	}

	protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
		return this.routeLocator.getRoutes()
				// individually filter routes so that filterWhen error delaying is not a
				// problem
                // 循环取出每个route根据谓词进行判断
				.concatMap(route -> Mono.just(route).filterWhen(r -> {
					// add the current route we are testing
					exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                    // apply()其实就是谓词判断!!
                    // important!!!
					return r.getPredicate().apply(exchange);
				})
						// instead of immediately stopping main flux due to error, log and
						// swallow it
						.doOnError(e -> logger.error(
								"Error applying predicate for route: " + route.getId(),
								e))
						.onErrorResume(e -> Mono.empty()))
				// .defaultIfEmpty() put a static Route not found
				// or .switchIfEmpty()
				// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
				.next()
				// TODO: error handling
				.map(route -> {
					if (logger.isDebugEnabled()) {
						logger.debug("Route matched: " + route.getId());
					}
					validateRoute(route, exchange);
					return route;
				});

		/*
		 * TODO: trace logging if (logger.isTraceEnabled()) {
		 * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
		 */
	}

	/**
	 * Validate the given handler against the current request.
	 * <p>
	 * The default implementation is empty. Can be overridden in subclasses, for example
	 * to enforce specific preconditions expressed in URL mappings.
	 * @param route the Route object to validate
	 * @param exchange current exchange
	 * @throws Exception if validation failed
	 */
	@SuppressWarnings("UnusedParameters")
	protected void validateRoute(Route route, ServerWebExchange exchange) {
	}

	protected String getSimpleName() {
		return "RoutePredicateHandlerMapping";
	}

	public enum ManagementPortType {

		/**
		 * The management port has been disabled.
		 */
		DISABLED,

		/**
		 * The management port is the same as the server port.
		 */
		SAME,

		/**
		 * The management port and server port are different.
		 */
		DIFFERENT;

	}

}

组件关系图:

组件关系图
组件关系图

整个调用过程为:

GatewayAutoConfiguration中通过@ConditionalOnClass(DispatherHandler.class)确认加载了DispathcherHandler类,这个类是请求分发处理器,同时也是Spring WebFlux的访问入口,这个类和SPringMVC的兄弟DispatcherServlet功能相同(请求分发处理)。这个DispathcherHandler类的handle(ServerWebExchange exchange)根据请求最终获取WebHandler,也就说获得了FilterWebHander(上图2处),可以创建DefaultGatewayFilterChain并调用filter方法形成过滤链

代码语言:java
复制
	@Override
	public Mono<Void> handle(ServerWebExchange exchange) {
		if (this.handlerMappings == null) {
			return createNotFoundError();
		}
		if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
			return handlePreFlight(exchange);
		}
		return Flux
				// 1.遍历所有的 handlerMapping
				.fromIterable(this.handlerMappings) 
				// 2.得到Mapping,根据Mapping查找到webHandler
				.concatMap(mapping -> mapping.getHandler(exchange))
				.next()
				.switchIfEmpty(createNotFoundError())
				// 3.调用对应的处理器
				.flatMap(handler -> invokeHandler(exchange, handler))
				// 4.返回处理结果
				.flatMap(result -> handleResult(exchange, result));
	}

以上内容仅为本人学习所用,如有侵权请联系!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Gateway是什么?
  • Gateway基本原理?
  • Gateway工作机制理解 How it works
  • 前置知识:jdk8谓词
  • 内置谓词工厂
  • Gateway源码
    • 组件关系图:
    相关产品与服务
    负载均衡
    负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档