前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊sentinel的DataSource

聊聊sentinel的DataSource

作者头像
code4it
发布2018-09-17 17:21:58
9900
发布2018-09-17 17:21:58
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下sentinel的DataSource

DataSource

sentinel-datasource-extension-0.1.1-sources.jar!/com/alibaba/csp/sentinel/datasource/DataSource.java

代码语言:javascript
复制
public interface DataSource<S, T> {

    /**
     * Load data data source as the target type.
     *
     * @return the target data.
     * @throws Exception
     */
    T loadConfig() throws Exception;

    /**
     * Read original data from the data source.
     *
     * @return the original data.
     * @throws Exception
     */
    S readSource() throws Exception;

    /**
     * Get {@link SentinelProperty} of the data source.
     *
     * @return the property.
     */
    SentinelProperty<T> getProperty();

    /**
     * Write the {@code values} to the data source.
     *
     * @param values
     * @throws Exception
     */
    void writeDataSource(T values) throws Exception;

    /**
     * Close the data source.
     *
     * @throws Exception
     */
    void close() throws Exception;
}
  • 定义了loadConfig、readSource、writeDataSource等方法
  • 有个抽象子类AbstractDataSource

AbstractDataSource

sentinel-datasource-extension-0.1.1-sources.jar!/com/alibaba/csp/sentinel/datasource/AbstractDataSource.java

代码语言:javascript
复制
public abstract class AbstractDataSource<S, T> implements DataSource<S, T> {

    protected final ConfigParser<S, T> parser;
    protected final SentinelProperty<T> property;

    public AbstractDataSource(ConfigParser<S, T> parser) {
        if (parser == null) {
            throw new IllegalArgumentException("parser can't be null");
        }
        this.parser = parser;
        this.property = new DynamicSentinelProperty<T>();
    }

    @Override
    public T loadConfig() throws Exception {
        S readValue = readSource();
        T value = parser.parse(readValue);
        return value;
    }

    public T loadConfig(S conf) throws Exception {
        T value = parser.parse(conf);
        return value;
    }

    @Override
    public SentinelProperty<T> getProperty() {
        return property;
    }

    @Override
    public void writeDataSource(T values) throws Exception {
        throw new UnsupportedOperationException();
    }

}
  • 定义了ConfigParser属性,使用它来解析数据源
  • 它有一个抽象子类为AutoRefreshDataSource

AutoRefreshDataSource

sentinel-datasource-extension-0.1.1-sources.jar!/com/alibaba/csp/sentinel/datasource/AutoRefreshDataSource.java

代码语言:javascript
复制
/**
 * A {@link DataSource} automatically fetches the backend data.
 *
 * @param <S> source data type
 * @param <T> target data type
 * @author Carpenter Lee
 */
public abstract class AutoRefreshDataSource<S, T> extends AbstractDataSource<S, T> {

    private ScheduledExecutorService service;
    protected long recommendRefreshMs = 3000;

    public AutoRefreshDataSource(ConfigParser<S, T> configParser) {
        super(configParser);
        startTimerService();
    }

    public AutoRefreshDataSource(ConfigParser<S, T> configParser, final long recommendRefreshMs) {
        super(configParser);
        if (recommendRefreshMs <= 0) {
            throw new IllegalArgumentException("recommendRefreshMs must > 0, but " + recommendRefreshMs + " get");
        }
        this.recommendRefreshMs = recommendRefreshMs;
        startTimerService();
    }

    private void startTimerService() {
        service = Executors.newScheduledThreadPool(1,
            new NamedThreadFactory("sentinel-datasource-auto-refresh-task", true));
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    T newValue = loadConfig();

                    getProperty().updateValue(newValue);
                } catch (Throwable e) {
                    RecordLog.info("loadConfig exception", e);
                }
            }
        }, recommendRefreshMs, recommendRefreshMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() throws Exception {
        if (service != null) {
            service.shutdownNow();
            service = null;
        }
    }

}
  • 创建了ScheduledExecutorService,然后定时调度执行loadConfig方法,然后将获取的数据更新到property
  • 它有一个子类为FileRefreshableDataSource

FileRefreshableDataSource

sentinel-datasource-extension-0.1.1-sources.jar!/com/alibaba/csp/sentinel/datasource/FileRefreshableDataSource.java

代码语言:javascript
复制
/**
 * <p>
 * A {@link DataSource} based on file. This class will automatically fetches the backend file every 3 seconds.
 * </p>
 * <p>
 * Limitations: default read buffer size is 1MB, if file size is greater than buffer size, exceeding bytes will
 * be ignored. Default charset is UTF8.
 * </p>
 *
 * @author Carpenter Lee
 */
public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, T> {

    private static final int MAX_SIZE = 1024 * 1024 * 4;
    private static final long DEFAULT_REFRESH_MS = 3000;
    private static final int DEFAULT_BUF_SIZE = 1024 * 1024;
    private static final Charset DEFAULT_CHAR_SET = Charset.forName("utf-8");

    private byte[] buf;
    private Charset charset;
    private File file;

    /**
     * Create a file based {@link DataSource} whose read buffer size is 1MB, charset is UTF8,
     * and read interval is 3 seconds.
     *
     * @param file         the file to read.
     * @param configParser the config parser.
     */
    public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser) throws FileNotFoundException {
        this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
    }

    public FileRefreshableDataSource(String fileName, ConfigParser<String, T> configParser)
        throws FileNotFoundException {
        this(new File(fileName), configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
        //System.out.println(file.getAbsoluteFile());
    }

    public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, int bufSize)
        throws FileNotFoundException {
        this(file, configParser, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET);
    }

    public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, Charset charset)
        throws FileNotFoundException {
        this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset);
    }

    public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, long recommendRefreshMs,
                                     int bufSize, Charset charset) throws FileNotFoundException {
        super(configParser, recommendRefreshMs);
        if (bufSize <= 0 || bufSize > MAX_SIZE) {
            throw new IllegalArgumentException("bufSize must between (0, " + MAX_SIZE + "], but " + bufSize + " get");
        }
        if (file == null) {
            throw new IllegalArgumentException("file can't be null");
        }
        if (charset == null) {
            throw new IllegalArgumentException("charset can't be null");
        }
        this.buf = new byte[bufSize];
        this.file = file;
        this.charset = charset;
        firstLoad();
    }

    private void firstLoad() {
        try {
            T newValue = loadConfig();
            getProperty().updateValue(newValue);
        } catch (Throwable e) {
            RecordLog.info("loadConfig exception", e);
        }
    }

    @Override
    public String readSource() throws Exception {
        FileInputStream inputStream = null;
        try {
            inputStream = new FileInputStream(file);
            FileChannel channel = inputStream.getChannel();
            if (channel.size() > buf.length) {
                throw new RuntimeException(file.getAbsolutePath() + " file size=" + channel.size()
                    + ", is bigger than bufSize=" + buf.length + ". Can't read");
            }
            int len = inputStream.read(buf);
            return new String(buf, 0, len, charset);
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (Exception ignore) {
                }
            }
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        buf = null;
    }

    @Override
    public void writeDataSource(T values) throws Exception {
        throw new UnsupportedOperationException();
    }
}
  • 从文件读取数据,但是writeDataSource目前还不支持

小结

  • sentinel-datasource-extension默认提供FileRefreshableDataSource,另外有zookeeper、nacos、appllo的扩展实现。
  • 如果要自己扩展的话,使用拉模式直接继承AutoRefreshDataSource实现readSource();推模式的话直接继承AbstractDataSource,自己构造监听方法实现readSource()

doc

  • DataSource
  • sentinel-extension
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-08-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DataSource
  • AbstractDataSource
  • AutoRefreshDataSource
  • FileRefreshableDataSource
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档