聊聊hystrix的BucketedCounterStream

本文主要研究一下hystrix的BucketedCounterStream

BucketedCounterStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedCounterStream.java

/**
 * Abstract class that imposes a bucketing structure and provides streams of buckets
 *
 * @param <Event> type of raw data that needs to get summarized into a bucket
 * @param <Bucket> type of data contained in each bucket
 * @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be)
 */
public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
    protected final int numBuckets;
    protected final Observable<Bucket> bucketedStream;
    protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null);

    private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary;

    private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue());

    protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
        this.numBuckets = numBuckets;
        this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call(Observable<Event> eventBucket) {
                return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
            }
        };

        final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
        for (int i = 0; i < numBuckets; i++) {
            emptyEventCountsToStart.add(getEmptyBucketSummary());
        }

        this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call() {
                return inputEventStream
                        .observe()
                        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
            }
        });
    }

    abstract Bucket getEmptyBucketSummary();

    abstract Output getEmptyOutputValue();

    /**
     * Return the stream of buckets
     * @return stream of buckets
     */
    public abstract Observable<Output> observe();

    public void startCachingStreamValuesIfUnstarted() {
        if (subscription.get() == null) {
            //the stream is not yet started
            Subscription candidateSubscription = observe().subscribe(counterSubject);
            if (subscription.compareAndSet(null, candidateSubscription)) {
                //won the race to set the subscription
            } else {
                //lost the race to set the subscription, so we need to cancel this one
                candidateSubscription.unsubscribe();
            }
        }
    }

    /**
     * Synchronous call to retrieve the last calculated bucket without waiting for any emissions
     * @return last calculated bucket
     */
    public Output getLatest() {
        startCachingStreamValuesIfUnstarted();
        if (counterSubject.hasValue()) {
            return counterSubject.getValue();
        } else {
            return getEmptyOutputValue();
        }
    }

    public void unsubscribe() {
        Subscription s = subscription.get();
        if (s != null) {
            s.unsubscribe();
            subscription.compareAndSet(s, null);
        }
    }
}
  • 这里的构造器主要初始化bucketedStream,主要是对HystrixEventStream进行observe,然后进行window操作,在进行flatMap
  • window操作的timespan参数为bucketSizeInMs,其计算公式如下 final int counterMetricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get(); final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get(); final int counterBucketSizeInMs = counterMetricWindow / numCounterBuckets;
  • BucketedCounterStream有两个直接的子类,也是抽象类,分别是BucketedRollingCounterStream及BucketedCumulativeCounterStream

BucketedRollingCounterStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedRollingCounterStream.java

/**
 * Refinement of {@link BucketedCounterStream} which reduces numBuckets at a time.
 *
 * @param <Event> type of raw data that needs to get summarized into a bucket
 * @param <Bucket> type of data contained in each bucket
 * @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be)
 */
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

    protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                           final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                           final Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
        Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
            @Override
            public Observable<Output> call(Observable<Bucket> window) {
                return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
            }
        };
        this.sourceStream = bucketedStream      //stream broken up into buckets
                .window(numBuckets, 1)          //emit overlapping windows of buckets
                .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true);
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(false);
                    }
                })
                .share()                        //multiple subscribers should get same data
                .onBackpressureDrop();          //if there are slow consumers, data should not buffer
    }

    @Override
    public Observable<Output> observe() {
        return sourceStream;
    }

    /* package-private */ boolean isSourceCurrentlySubscribed() {
        return isSourceCurrentlySubscribed.get();
    }
}
  • 基于父类的bucketedStream定义了用于observe的sourceStream,对bucketedStream进行了window及flatMap处理
  • window操作采用的是count及skip参数,count参数值为numBuckets,skip参数值为1

BucketedCumulativeCounterStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedCumulativeCounterStream.java

/**
 * Refinement of {@link BucketedCounterStream} which accumulates counters infinitely in the bucket-reduction step
 *
 * @param <Event> type of raw data that needs to get summarized into a bucket
 * @param <Bucket> type of data contained in each bucket
 * @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be)
 */
public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

    protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                                              Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                                              Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);

        this.sourceStream = bucketedStream
                .scan(getEmptyOutputValue(), reduceBucket)
                .skip(numBuckets)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true);
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(false);
                    }
                })
                .share()                        //multiple subscribers should get same data
                .onBackpressureDrop();          //if there are slow consumers, data should not buffer
    }

    @Override
    public Observable<Output> observe() {
        return sourceStream;
    }
}
  • 基于父类的bucketedStream定义了用于observe的sourceStream,对bucketedStream进行了scan及skip操作
  • scan与reduce的区别在于scan每操作完一次就会通知消费者,reduce是一口气操作完再通知消费者
  • 这里scan参数为getEmptyOutputValue(),为空数组用于累加,skip值为numBuckets

小结

  • hystrix的BucketedCounterStream有两个直接的子类,BucketedRollingCounterStream及BucketedCumulativeCounterStream
  • BucketedRollingCounterStream,采取的是window及flatMap操作,这里通过window来达到rolling的效果,其skip参数表示对原生数列,其开始的元素间隔是多少,比如skip为3,window的count为5,那么第一批window就是[1,2,3,4,5],第二批window就是[4,5,6,7,8]
  • BucketedCumulativeCounterStream,采取的是scan及skip操作,其cumulative的效果是通过scan函数来实现的,然后通过skip操作丢弃掉最开始的numBuckets个数据。

rolling及cumulative使用的是rxjava的window及scan操作来实现,看起来比较简洁。

doc

  • rxdocs-scan
  • rxdocs-skip
  • rxjava scan 与reduce区别

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-07-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android机动车

RxJava从入门到不离不弃(四)——过滤操作符

RxJava中的过滤操作符,顾名思义,这类操作符主要用于对事件数据的筛选过滤,只返回满足我们条件的数据。

18430
来自专栏伪君子的梦呓

题解 ~ 输出所有的水仙花数

打印所有的"水仙花数",所谓"水仙花数"是指一个三位数,其各位数字立方和等于该本身。 例如:153 是一个水仙花数,因为

19330
来自专栏deed博客

数学速算法

16820
来自专栏码匠的流水账

聊聊sentinel的AuthoritySlot

com/alibaba/csp/sentinel/slots/block/authority/AuthoritySlot.java

13110
来自专栏calmound

搜索专题

POJ  Best Sequence http://poj.org/problem?id=1699 题意:给你n个字符窜,求其所能拼接的最短长度。 分析:预处理...

29150
来自专栏用户画像

5.2.4 邻接多重表

在邻接表中,容易求得顶点和边的各种信息,但在邻接表中求两个顶点之间是否存在边,或需要对边执行删除等操作时,需要分别在两个顶点的边表中遍历,效率较低。

11910
来自专栏三流程序员的挣扎

RxJava 变换操作符

按照规定大小缓存,每次取 count 个数,取完一次跳过 skip 个数,将每次取的数据合并到一个列表里。

71750
来自专栏码匠的流水账

聊聊springboot的HeapDumpWebEndpoint

spring-boot-actuator-autoconfigure-2.0.1.RELEASE-sources.jar!/org/springframewor...

16610
来自专栏码匠的流水账

聊聊flink的CsvReader

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.jav...

16820
来自专栏三流程序员的挣扎

RxJava 组合操作符

现在看上面的最后一个方法,最后一个参数是多个 Observable,第二个参数 bufferSize 是内部缓冲队列的大小。

30830

扫码关注云+社区

领取腾讯云代金券