前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Cloud Gateway-使用自定义过滤器通过Hystrix实现降级处理

Spring Cloud Gateway-使用自定义过滤器通过Hystrix实现降级处理

作者头像
Throwable
发布2020-06-23 16:15:46
3.7K0
发布2020-06-23 16:15:46
举报
文章被收录于专栏:Throwable's Blog

前提

在微服务架构中,下游依赖出现问题如果上游调用方不做请求降级处理,下游的异常依赖没有被隔离,很有可能出现因为一两个服务或者小到一两个接口异常导致上游所有服务不可用,甚至影响整个业务线。请求降级处理目前比较主流的依然是Netfilx出品的HystrixHystrix的工作原理是:

  • 把请求基于线程池或者信号量隔离,一旦下游服务在指定配置的超时时间内无法响应会进入预设或者默认的降级实现。
  • 每个请求的状态都会记录下来,在一个滑动窗口内处理失败的比率超过设定的阈值就会触发熔断器(Circle Breaker)开启,熔断器开启之后所有请求都会直接进入预设或者默认的降级逻辑。
  • 熔断器打开后,且距离熔断器打开的时间或上一次试探请求放行的时间超过设定值,熔断器器进入半开状态,允许放行一个试探请求。
  • 请求成功率提高后,基于统计数据确定对熔断器进行关闭,所有请求正常放行。

这里不对Hystrix的细节做更深入分析,而是接着谈谈Spring Cloud Gateway中如何使用Hystrix,主要包括内置的Hystrix过滤器和定制过滤器结合Hystrix实现我们想要的功能。除了要引入spring-cloud-starter-gateway依赖之外,还需要引入spring-cloud-starter-netflix-hystrix

代码语言:javascript
复制
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-gateway</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>
    </dependencies>    

使用内置的Hystrix过滤器

内置的Hystrix过滤器是HystrixGatewayFilterFactory,它支持的配置是:

代码语言:javascript
复制
public static class Config {
    // 如果下面的Setter配置为null的时候,name会作为Hystrix的HystrixCommandKey
    private String name;
    // Hystrix的Setter属性,主要用来配置命令的KEY和其他属性
    private Setter setter;
    // 降级的目标URI,必须以forward开头,URI会匹配到网关应用的控制器方法
    private URI fallbackUri;

	public String getName() {
		return name;
	}

	public Config setName(String name) {
		this.name = name;
		return this;
	}

    public Config setFallbackUri(String fallbackUri) {
	    if (fallbackUri != null) {
			setFallbackUri(URI.create(fallbackUri));
		}
		return this;
	}

    public URI getFallbackUri() {
	    return fallbackUri;
    }
    
    // 注意这个方法,配置的fallbackUri要以forward开始作为schema,否则会抛异常
    public void setFallbackUri(URI fallbackUri) {
        if (fallbackUri != null && !"forward".equals(fallbackUri.getScheme())) {
			throw new IllegalArgumentException("Hystrix Filter currently only supports 'forward' URIs, found " + fallbackUri);
		}
		this.fallbackUri = fallbackUri;
	}

	public Config setSetter(Setter setter) {
		this.setter = setter;
		return this;
	}
}

另外,(1)全局的Hystrix配置也会对HystrixGatewayFilterFactory生效;(2)HystrixGatewayFilterFactory可以作为默认过滤器(default-filters)对所有的路由配置作为兜底过滤器并发挥作用。

对于第(1)点,我们如果在application.yaml中配置如下:

代码语言:javascript
复制
// 执行超时时间为1秒,会对下面路由order_route绑定的HystrixGatewayFilterFactory生效
hystrix.command.fallbackcmd.execution.isolation.thread.timeoutInMilliseconds: 1000

spring:
  cloud:
    gateway:
      routes:
      - id: order_route
        uri: http://localhost:9091
        predicates:
        - Path=/order/**
        filters:
        - name: Hystrix
          args:
            name: HystrixCommand
            fallbackUri: forward:/fallback

配置的hystrix.command.fallbackcmd.execution.isolation.thread.timeoutInMilliseconds会对绑定在路由order_route中的HystrixGatewayFilterFactory生效。

对于第(2)点,我们可以把HystrixGatewayFilterFactory配置为默认过滤器,这样子所有的路由都会关联此过滤器,但是非必要时建议不要这样做

代码语言:javascript
复制
spring:
  cloud:
    gateway:
      routes:
        - id: order_route
          uri: http://localhost:9091
          predicates:
            - Path=/order/**
      default-filters:
        - name: Hystrix
          args:
            name: HystrixCommand
            fallbackUri: forward:/fallback

笔者在测试的时候,发现上面提到的Setter无法配置,估计是由于HystrixSetter对象是经过多重包装,暂时没有办法设置该属性。接着我们要在网关服务加一个控制器方法用于处理重定向的/fallback请求:

代码语言:javascript
复制
@RestController
public class FallbackController {

    @RequestMapping(value = "/fallback")
    @ResponseStatus
    public Mono<Map<String, Object>> fallback(ServerWebExchange exchange, Throwable throwable) {
        Map<String, Object> result = new HashMap<>(8);
        ServerHttpRequest request = exchange.getRequest();
        result.put("path", request.getPath().pathWithinApplication().value());
        result.put("method", request.getMethodValue());
        if (null != throwable.getCause()) {
            result.put("message", throwable.getCause().getMessage());
        } else {
            result.put("message", throwable.getMessage());
        }
        return Mono.just(result);
    }
}

控制器方法入参会被Spring Cloud Gateway的内部组件处理,可以回调一些有用的类型例如ServerWebExchange实例、具体的异常实例等等。

使用Hystrix定制过滤器

HystrixGatewayFilterFactory在大多数情况下应该可以满足业务需要,但是这里也做一次定制一个整合Hystrix的过滤器,实现的功能如下:

  • 基于每个请求URL创建一个新的Hystrix命令实例进行调用。
  • 每个URL可以指定特有的线程池配置,如果不指定则使用默认的。
  • 每个URL可以配置单独的Hystrix超时时间。

也就是通过Hystrix使用线程池对每种不同的外部请求URL进行隔离。当然,这样的过滤器仅仅在外部请求的不同URL的数量有限的情况下才比较合理,否则有可能创建过多的线程池造成系统性能的下降,适得其反。改造如下:

代码语言:javascript
复制
@Component
public class CustomHystrixFilter extends AbstractGatewayFilterFactory<CustomHystrixFilter.Config> {

    private static final String FORWARD_KEY = "forward";
    private static final String NAME = "CustomHystrix";
    private static final int TIMEOUT_MS = 1000;
    private final ObjectProvider<DispatcherHandler> dispatcherHandlerProvider;
    private volatile DispatcherHandler dispatcherHandler;
    private boolean processConfig = false;

    public CustomHystrixFilter(ObjectProvider<DispatcherHandler> dispatcherHandlerProvider) {
        super(Config.class);
        this.dispatcherHandlerProvider = dispatcherHandlerProvider;
    }

    private DispatcherHandler getDispatcherHandler() {
        if (dispatcherHandler == null) {
            dispatcherHandler = dispatcherHandlerProvider.getIfAvailable();
        }

        return dispatcherHandler;
    }

    @Override
    public List<String> shortcutFieldOrder() {
        return Collections.singletonList(NAME_KEY);
    }


    @Override
    public GatewayFilter apply(Config config) {
        processConfig(config);
        return (exchange, chain) -> {
            ServerHttpRequest request = exchange.getRequest();
            String path = request.getPath().pathWithinApplication().value();
            int timeout = config.getTimeout().getOrDefault(path, TIMEOUT_MS);
            CustomHystrixCommand command = new CustomHystrixCommand(config.getFallbackUri(), exchange, chain, timeout, path);
            return Mono.create(s -> {
                Subscription sub = command.toObservable().subscribe(s::success, s::error, s::success);
                s.onCancel(sub::unsubscribe);
            }).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> {
                if (throwable instanceof HystrixRuntimeException) {
                    HystrixRuntimeException e = (HystrixRuntimeException) throwable;
                    HystrixRuntimeException.FailureType failureType = e.getFailureType();
                    switch (failureType) {
                        case TIMEOUT:
                            return Mono.error(new TimeoutException());
                        case COMMAND_EXCEPTION: {
                            Throwable cause = e.getCause();
                            if (cause instanceof ResponseStatusException || AnnotatedElementUtils
                                    .findMergedAnnotation(cause.getClass(), ResponseStatus.class) != null) {
                                return Mono.error(cause);
                            }
                        }
                        default:
                            break;
                    }
                }
                return Mono.error(throwable);
            }).then();
        };
    }

    /**
     * YAML解析的时候MAP的KEY不支持'/',这里只能用'-'替代
     *
     * @param config config
     */
    private void processConfig(Config config) {
        if (!processConfig) {
            processConfig = true;
            if (null != config.getTimeout()) {
                Map<String, Integer> timeout = new HashMap<>(8);
                config.getTimeout().forEach((k, v) -> {
                    String key = k.replace("-", "/");
                    if (!key.startsWith("/")) {
                        key = "/" + key;
                    }
                    timeout.put(key, v);
                });
                config.setTimeout(timeout);
            }
        }
    }

    @Override
    public String name() {
        return NAME;
    }

    private class CustomHystrixCommand extends HystrixObservableCommand<Void> {

        private final URI fallbackUri;
        private final ServerWebExchange exchange;
        private final GatewayFilterChain chain;

        public CustomHystrixCommand(URI fallbackUri,
                                    ServerWebExchange exchange,
                                    GatewayFilterChain chain,
                                    int timeout,
                                    String key) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(key))
                    .andCommandKey(HystrixCommandKey.Factory.asKey(key))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(timeout)));
            this.fallbackUri = fallbackUri;
            this.exchange = exchange;
            this.chain = chain;
        }

        @Override
        protected Observable<Void> construct() {
            return RxReactiveStreams.toObservable(this.chain.filter(exchange));
        }

        @Override
        protected Observable<Void> resumeWithFallback() {
            if (null == fallbackUri) {
                return super.resumeWithFallback();
            }
            URI uri = exchange.getRequest().getURI();
            boolean encoded = containsEncodedParts(uri);
            URI requestUrl = UriComponentsBuilder.fromUri(uri)
                    .host(null)
                    .port(null)
                    .uri(this.fallbackUri)
                    .build(encoded)
                    .toUri();
            exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
            ServerHttpRequest request = this.exchange.getRequest().mutate().uri(requestUrl).build();
            ServerWebExchange mutated = exchange.mutate().request(request).build();
            return RxReactiveStreams.toObservable(getDispatcherHandler().handle(mutated));
        }
    }

    public static class Config {

        private String id;
        private URI fallbackUri;
        /**
         * url -> timeout ms
         */
        private Map<String, Integer> timeout;

        public String getId() {
            return id;
        }

        public Config setId(String id) {
            this.id = id;
            return this;
        }

        public URI getFallbackUri() {
            return fallbackUri;
        }

        public Config setFallbackUri(URI fallbackUri) {
            if (fallbackUri != null && !FORWARD_KEY.equals(fallbackUri.getScheme())) {
                throw new IllegalArgumentException("Hystrix Filter currently only supports 'forward' URIs, found " + fallbackUri);
            }
            this.fallbackUri = fallbackUri;
            return this;
        }

        public Map<String, Integer> getTimeout() {
            return timeout;
        }

        public Config setTimeout(Map<String, Integer> timeout) {
            this.timeout = timeout;
            return this;
        }
    }
}

其实大部分代码和内置的Hystrix过滤器差不多,只是改了命令改造函数部分和配置加载处理的部分。配置文件如下:

代码语言:javascript
复制
spring:
  cloud:
    gateway:
      routes:
        - id: hystrix_route
          uri: http://localhost:9091
          predicates:
            - Host=localhost:9090
          filters:
            - name: CustomHystrix
              args:
                id: CustomHystrix
                fallbackUri: forward:/fallback
                timeout:
                  # 这里暂时用-分隔URL,因为/不支持
                  order-remote: 2000
  application:
    name: route-server
server:
  port: 9090

网关添加一个/fallback处理控制器如下:

代码语言:javascript
复制
@RestController
public class FallbackController {

    @RequestMapping(value = "/fallback")
    @ResponseStatus
    public Mono<Map<String, Object>> fallback(ServerWebExchange exchange, Throwable throwable) {
        Map<String, Object> result = new HashMap<>(8);
        ServerHttpRequest request = exchange.getRequest();
        result.put("path", request.getPath().pathWithinApplication().value());
        result.put("method", request.getMethodValue());
        if (null != throwable.getCause()) {
            result.put("message", throwable.getCause().getMessage());
        } else {
            result.put("message", throwable.getMessage());
        }
        return Mono.just(result);
    }
}

故意在下游服务打断点:

代码语言:javascript
复制
curl http://localhost:9090/order/remote

响应结果:
{
    "path": "/fallback",
    "method": "GET",
    "message": null   # <== 这里由于是超时异常,message就是null
}

刚好符合预期结果。

小结

这篇文章仅仅是对Hystrix和过滤器应用提供一个可用的例子和解决问题的思路,具体如何使用还是需要针对真实的场景。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019年5月25日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前提
  • 使用内置的Hystrix过滤器
  • 使用Hystrix定制过滤器
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档