前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊debezium的ElapsedTimeStrategy

聊聊debezium的ElapsedTimeStrategy

原创
作者头像
code4it
修改2020-05-25 10:22:04
5290
修改2020-05-25 10:22:04
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下debezium的ElapsedTimeStrategy

ElapsedTimeStrategy

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

代码语言:javascript
复制
@FunctionalInterface
public interface ElapsedTimeStrategy {
​
    /**
     * Determine if the time period has elapsed since this method was last called.
     *
     * @return {@code true} if this invocation caused the thread to sleep, or {@code false} if this method did not sleep
     */
    boolean hasElapsed();
​
}
  • ElapsedTimeStrategy定义了hasElapsed方法

none

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

代码语言:javascript
复制
    public static ElapsedTimeStrategy none() {
        return () -> true;
    }
  • none方法针对hasElapsed始终返回true

constant

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

代码语言:javascript
复制
    public static ElapsedTimeStrategy constant(Clock clock, long delayInMilliseconds) {
        if (delayInMilliseconds <= 0) {
            throw new IllegalArgumentException("Initial delay must be positive");
        }
        return new ElapsedTimeStrategy() {
            private long nextTimestamp = 0L;
​
            @Override
            public boolean hasElapsed() {
                if (nextTimestamp == 0L) {
                    // Initialize ...
                    nextTimestamp = clock.currentTimeInMillis() + delayInMilliseconds;
                    return true;
                }
                long current = clock.currentTimeInMillis();
                if (current >= nextTimestamp) {
                    do {
                        long multiple = 1 + (current - nextTimestamp) / delayInMilliseconds;
                        nextTimestamp += multiple * delayInMilliseconds;
                    } while (current > nextTimestamp);
                    return true;
                }
                return false;
            }
        };
    }
  • constant接收clock及delayInMilliseconds参数,其hasElapsed方法通过delayInMilliseconds计算nextTimestamp,在current小于nextTimestamp时返回false,否则更新nextTimestamp并返回true

step

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

代码语言:javascript
复制
    public static ElapsedTimeStrategy step(Clock clock,
                                           long preStepDelayInMilliseconds,
                                           BooleanSupplier stepFunction,
                                           long postStepDelayInMilliseconds) {
        if (preStepDelayInMilliseconds <= 0) {
            throw new IllegalArgumentException("Pre-step delay must be positive");
        }
        if (postStepDelayInMilliseconds <= 0) {
            throw new IllegalArgumentException("Post-step delay must be positive");
        }
        return new ElapsedTimeStrategy() {
            private long nextTimestamp = 0L;
            private boolean elapsed = false;
            private long delta = 0L;
​
            @Override
            public boolean hasElapsed() {
                if (nextTimestamp == 0L) {
                    // Initialize ...
                    elapsed = stepFunction.getAsBoolean();
                    delta = elapsed ? postStepDelayInMilliseconds : preStepDelayInMilliseconds;
                    nextTimestamp = clock.currentTimeInMillis() + delta;
                    return true;
                }
                if (!elapsed) {
                    elapsed = stepFunction.getAsBoolean();
                    if (elapsed) {
                        delta = postStepDelayInMilliseconds;
                    }
                }
                long current = clock.currentTimeInMillis();
                if (current >= nextTimestamp) {
                    do {
                        assert delta > 0;
                        long multiple = 1 + (current - nextTimestamp) / delta;
                        nextTimestamp += multiple * delta;
                    } while (nextTimestamp <= current);
                    return true;
                }
                return false;
            }
        };
    }
  • step接收stepFunction方法,其hasElapsed方法的初始nextTimestamp为clock.currentTimeInMillis() + delta,在elapsed为false时通过stepFunction设置elapsed,如果为true则更新delta为postStepDelayInMilliseconds,之后在current小于nextTimestamp返回false,否则更新nextTimestamp,返回true

linear

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

代码语言:javascript
复制
    public static ElapsedTimeStrategy linear(Clock clock, long delayInMilliseconds) {
        if (delayInMilliseconds <= 0) {
            throw new IllegalArgumentException("Initial delay must be positive");
        }
        return new ElapsedTimeStrategy() {
            private long nextTimestamp = 0L;
            private long counter = 1L;
​
            @Override
            public boolean hasElapsed() {
                if (nextTimestamp == 0L) {
                    // Initialize ...
                    nextTimestamp = clock.currentTimeInMillis() + delayInMilliseconds;
                    counter = 1L;
                    return true;
                }
                long current = clock.currentTimeInMillis();
                if (current >= nextTimestamp) {
                    do {
                        if (counter < Long.MAX_VALUE) {
                            ++counter;
                        }
                        nextTimestamp += (delayInMilliseconds * counter);
                    } while (nextTimestamp <= current);
                    return true;
                }
                return false;
            }
        };
    }
  • linear通过delayInMilliseconds * counter来递增nextTimestamp

exponential

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java

代码语言:javascript
复制
    public static ElapsedTimeStrategy exponential(Clock clock,
                                                  long initialDelayInMilliseconds,
                                                  long maxDelayInMilliseconds,
                                                  double multiplier) {
        if (multiplier <= 1.0) {
            throw new IllegalArgumentException("Multiplier must be greater than 1");
        }
        if (initialDelayInMilliseconds <= 0) {
            throw new IllegalArgumentException("Initial delay must be positive");
        }
        if (initialDelayInMilliseconds >= maxDelayInMilliseconds) {
            throw new IllegalArgumentException("Maximum delay must be greater than initial delay");
        }
        return new ElapsedTimeStrategy() {
            private long nextTimestamp = 0L;
            private long previousDelay = 0L;
​
            @Override
            public boolean hasElapsed() {
                if (nextTimestamp == 0L) {
                    // Initialize ...
                    nextTimestamp = clock.currentTimeInMillis() + initialDelayInMilliseconds;
                    previousDelay = initialDelayInMilliseconds;
                    return true;
                }
                long current = clock.currentTimeInMillis();
                if (current >= nextTimestamp) {
                    do {
                        // Compute how long to delay ...
                        long nextDelay = (long) (previousDelay * multiplier);
                        if (nextDelay >= maxDelayInMilliseconds) {
                            previousDelay = maxDelayInMilliseconds;
                            // If we're not there yet, then we know the increment is linear from here ...
                            if (nextTimestamp < current) {
                                long multiple = 1 + (current - nextTimestamp) / maxDelayInMilliseconds;
                                nextTimestamp += multiple * maxDelayInMilliseconds;
                            }
                        }
                        else {
                            previousDelay = nextDelay;
                        }
                        nextTimestamp += previousDelay;
                    } while (nextTimestamp <= current);
                    return true;
                }
                return false;
            }
        };
    }
  • exponential通过multiple * maxDelayInMilliseconds来递增nextTimestamp

小结

ElapsedTimeStrategy定义了hasElapsed方法,它提供了none、constant、step、linear、exponential这几种实现

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ElapsedTimeStrategy
  • none
  • constant
  • step
  • linear
  • exponential
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档