前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ClickHouse系列--BalancedClickhouseDataSource实现

ClickHouse系列--BalancedClickhouseDataSource实现

作者头像
IT云清
发布2021-12-06 14:22:03
1.5K0
发布2021-12-06 14:22:03
举报
文章被收录于专栏:IT云清

clickhouse-jdbc中负载均衡数据源的实现。

基本逻辑如下:
  • 1.通过配置的url串,来切分构造url列表;
  • 2.通过一个定时线程任务,来不断的去ping url列表,来更新可用的url列表;
  • 3.在可用列表中随机返回一个可用url;
代码语言:javascript
复制
/**
 * 提供负载均衡能力的datasource实现
 */
public class BalancedClickhouseDataSource implements DataSource {
    private static final Logger log = LoggerFactory.getLogger(BalancedClickhouseDataSource.class);
    private static final Pattern URL_TEMPLATE = Pattern.compile("jdbc:clickhouse://([a-zA-Z0-9_:,.-]+)(/[a-zA-Z0-9_]+([?][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+([&][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+)*)?)?");
    private PrintWriter printWriter;
    private int loginTimeoutSeconds;
    //随机数
    private final ThreadLocal<Random> randomThreadLocal;
    //所有的url
    private final List<String> allUrls;
    //可用的url
    private volatile List<String> enabledUrls;
    private final ClickHouseProperties properties;
    private final ClickHouseDriver driver;

    public BalancedClickhouseDataSource(String url) {
        this(splitUrl(url), getFromUrl(url));
    }

    public BalancedClickhouseDataSource(String url, Properties properties) {
        this(splitUrl(url), new ClickHouseProperties(properties));
    }

    public BalancedClickhouseDataSource(String url, ClickHouseProperties properties) {
        this(splitUrl(url), properties.merge(getFromUrlWithoutDefault(url)));
    }

    private BalancedClickhouseDataSource(List<String> urls) {
        this(urls, new ClickHouseProperties());
    }

    private BalancedClickhouseDataSource(List<String> urls, Properties info) {
        this(urls, new ClickHouseProperties(info));
    }

    private BalancedClickhouseDataSource(List<String> urls, ClickHouseProperties properties) {
        this.loginTimeoutSeconds = 0;
        this.randomThreadLocal = new ThreadLocal();
        this.driver = new ClickHouseDriver();
        if (urls.isEmpty()) {
            throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty");
        } else {
            try {
                //解析配置文件
                ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse((String)urls.get(0), properties.asProperties());
                localProperties.setHost((String)null);
                localProperties.setPort(-1);
                this.properties = localProperties;
            } catch (URISyntaxException var8) {
                throw new IllegalArgumentException(var8);
            }

            List<String> allUrls = new ArrayList(urls.size());
            Iterator var4 = urls.iterator();

            while(var4.hasNext()) {
                String url = (String)var4.next();

                try {
                    //如果合法url
                    if (this.driver.acceptsURL(url)) {
                        //添加到所有的url列表
                        allUrls.add(url);
                    } else {
                        log.error("that url is has not correct format: {}", url);
                    }
                } catch (SQLException var7) {
                    throw new IllegalArgumentException("error while checking url: " + url, var7);
                }
            }

            if (allUrls.isEmpty()) {
                throw new IllegalArgumentException("there are no correct urls");
            } else {
                //所有url
                this.allUrls = Collections.unmodifiableList(allUrls);
                //可用url
                this.enabledUrls = this.allUrls;
            }
        }
    }

    /**
     * 切割url
     * @param url
     * @return
     */
    static List<String> splitUrl(String url) {
        //校验url合法性
        Matcher m = URL_TEMPLATE.matcher(url);
        if (!m.matches()) {
            throw new IllegalArgumentException("Incorrect url");
        } else {
            String database = m.group(2);
            if (database == null) {
                database = "";
            }

            //切割url串
            String[] hosts = m.group(1).split(",");
            List<String> result = new ArrayList(hosts.length);
            String[] var5 = hosts;
            int var6 = hosts.length;

            //遍历,添加切割后的url
            for(int var7 = 0; var7 < var6; ++var7) {
                String host = var5[var7];
                result.add("jdbc:clickhouse://" + host + database);
            }

            return result;
        }
    }

    /**
     * ping url看是否可用
     * @param url
     * @return
     */
    private boolean ping(String url) {
        try {
            //执行简单sql测试url链接可用性
            this.driver.connect(url, this.properties).createStatement().execute("SELECT 1");
            return true;
        } catch (Exception var3) {
            return false;
        }
    }

    /**
     * 遍历所有url,通过ping的方式,选择出可用的url
     * @return
     */
    public synchronized int actualize() {
        //新建可用url列表
        List<String> enabledUrls = new ArrayList(this.allUrls.size());
        Iterator var2 = this.allUrls.iterator();

        while(var2.hasNext()) {
            String url = (String)var2.next();
            log.debug("Pinging disabled url: {}", url);
            if (this.ping(url)) {
                log.debug("Url is alive now: {}", url);
                //ping通的才添加进可用的
                enabledUrls.add(url);
            } else {
                log.debug("Url is dead now: {}", url);
            }
        }

        //重置可用url列表
        this.enabledUrls = Collections.unmodifiableList(enabledUrls);
        return enabledUrls.size();
    }

    /**
     * 随机获取可用url返回
     * @return
     * @throws SQLException
     */
    private String getAnyUrl() throws SQLException {
        //可用url列表
        List<String> localEnabledUrls = this.enabledUrls;
        if (localEnabledUrls.isEmpty()) {
            throw new SQLException("Unable to get connection: there are no enabled urls");
        } else {
            Random random = (Random)this.randomThreadLocal.get();
            if (random == null) {
                this.randomThreadLocal.set(new Random());
                //产生一个随机数
                random = (Random)this.randomThreadLocal.get();
            }

            int index = random.nextInt(localEnabledUrls.size());
            //用随机数选择一个可用的url返回
            return (String)localEnabledUrls.get(index);
        }
    }

    public ClickHouseConnection getConnection() throws SQLException {
        return this.driver.connect(this.getAnyUrl(), this.properties);
    }

    public ClickHouseConnection getConnection(String username, String password) throws SQLException {
        return this.driver.connect(this.getAnyUrl(), this.properties.withCredentials(username, password));
    }

    public <T> T unwrap(Class<T> iface) throws SQLException {
        if (iface.isAssignableFrom(this.getClass())) {
            return iface.cast(this);
        } else {
            throw new SQLException("Cannot unwrap to " + iface.getName());
        }
    }

    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return iface.isAssignableFrom(this.getClass());
    }

    public PrintWriter getLogWriter() throws SQLException {
        return this.printWriter;
    }

    public void setLogWriter(PrintWriter printWriter) throws SQLException {
        this.printWriter = printWriter;
    }

    public void setLoginTimeout(int seconds) throws SQLException {
        this.loginTimeoutSeconds = seconds;
    }

    public int getLoginTimeout() throws SQLException {
        return this.loginTimeoutSeconds;
    }

    public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
        throw new SQLFeatureNotSupportedException();
    }

    /**
     * 定期清理无用url链接
     * @param rate
     * @param timeUnit
     * @return
     */
    public BalancedClickhouseDataSource withConnectionsCleaning(int rate, TimeUnit timeUnit) {
        this.driver.scheduleConnectionsCleaning(rate, timeUnit);
        return this;
    }

    /**
     * 定期确认url,通过定时任务实现,以定时更新可用url列表
     * @param delay
     * @param timeUnit
     * @return
     */
    public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) {
        ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    BalancedClickhouseDataSource.this.actualize();
                } catch (Exception var2) {
                    BalancedClickhouseDataSource.log.error("Unable to actualize urls", var2);
                }

            }
        }, 0L, (long)delay, timeUnit);
        return this;
    }

    public List<String> getAllClickhouseUrls() {
        return this.allUrls;
    }

    public List<String> getEnabledClickHouseUrls() {
        return this.enabledUrls;
    }

    /**
     * 返回不可用url集合
     * 通过all 和 enable的差值来找
     * 
     * @return
     */
    public List<String> getDisabledUrls() {
        List<String> enabledUrls = this.enabledUrls;
        if (!this.hasDisabledUrls()) {
            return Collections.emptyList();
        } else {
            List<String> disabledUrls = new ArrayList(this.allUrls);
            disabledUrls.removeAll(enabledUrls);
            return disabledUrls;
        }
    }

    public boolean hasDisabledUrls() {
        return this.allUrls.size() != this.enabledUrls.size();
    }

    public ClickHouseProperties getProperties() {
        return this.properties;
    }

    private static ClickHouseProperties getFromUrl(String url) {
        return new ClickHouseProperties(getFromUrlWithoutDefault(url));
    }

    private static Properties getFromUrlWithoutDefault(String url) {
        if (StringUtils.isBlank(url)) {
            return new Properties();
        } else {
            int index = url.indexOf("?");
            return index == -1 ? new Properties() : ClickhouseJdbcUrlParser.parseUriQueryPart(url.substring(index + 1), new Properties());
        }
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/07/08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • clickhouse-jdbc中负载均衡数据源的实现。
    • 基本逻辑如下:
    相关产品与服务
    负载均衡
    负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档