/**
* 提供负载均衡能力的datasource实现
*/
public class BalancedClickhouseDataSource implements DataSource {
private static final Logger log = LoggerFactory.getLogger(BalancedClickhouseDataSource.class);
private static final Pattern URL_TEMPLATE = Pattern.compile("jdbc:clickhouse://([a-zA-Z0-9_:,.-]+)(/[a-zA-Z0-9_]+([?][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+([&][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+)*)?)?");
private PrintWriter printWriter;
private int loginTimeoutSeconds;
//随机数
private final ThreadLocal<Random> randomThreadLocal;
//所有的url
private final List<String> allUrls;
//可用的url
private volatile List<String> enabledUrls;
private final ClickHouseProperties properties;
private final ClickHouseDriver driver;
public BalancedClickhouseDataSource(String url) {
this(splitUrl(url), getFromUrl(url));
}
public BalancedClickhouseDataSource(String url, Properties properties) {
this(splitUrl(url), new ClickHouseProperties(properties));
}
public BalancedClickhouseDataSource(String url, ClickHouseProperties properties) {
this(splitUrl(url), properties.merge(getFromUrlWithoutDefault(url)));
}
private BalancedClickhouseDataSource(List<String> urls) {
this(urls, new ClickHouseProperties());
}
private BalancedClickhouseDataSource(List<String> urls, Properties info) {
this(urls, new ClickHouseProperties(info));
}
private BalancedClickhouseDataSource(List<String> urls, ClickHouseProperties properties) {
this.loginTimeoutSeconds = 0;
this.randomThreadLocal = new ThreadLocal();
this.driver = new ClickHouseDriver();
if (urls.isEmpty()) {
throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty");
} else {
try {
//解析配置文件
ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse((String)urls.get(0), properties.asProperties());
localProperties.setHost((String)null);
localProperties.setPort(-1);
this.properties = localProperties;
} catch (URISyntaxException var8) {
throw new IllegalArgumentException(var8);
}
List<String> allUrls = new ArrayList(urls.size());
Iterator var4 = urls.iterator();
while(var4.hasNext()) {
String url = (String)var4.next();
try {
//如果合法url
if (this.driver.acceptsURL(url)) {
//添加到所有的url列表
allUrls.add(url);
} else {
log.error("that url is has not correct format: {}", url);
}
} catch (SQLException var7) {
throw new IllegalArgumentException("error while checking url: " + url, var7);
}
}
if (allUrls.isEmpty()) {
throw new IllegalArgumentException("there are no correct urls");
} else {
//所有url
this.allUrls = Collections.unmodifiableList(allUrls);
//可用url
this.enabledUrls = this.allUrls;
}
}
}
/**
* 切割url
* @param url
* @return
*/
static List<String> splitUrl(String url) {
//校验url合法性
Matcher m = URL_TEMPLATE.matcher(url);
if (!m.matches()) {
throw new IllegalArgumentException("Incorrect url");
} else {
String database = m.group(2);
if (database == null) {
database = "";
}
//切割url串
String[] hosts = m.group(1).split(",");
List<String> result = new ArrayList(hosts.length);
String[] var5 = hosts;
int var6 = hosts.length;
//遍历,添加切割后的url
for(int var7 = 0; var7 < var6; ++var7) {
String host = var5[var7];
result.add("jdbc:clickhouse://" + host + database);
}
return result;
}
}
/**
* ping url看是否可用
* @param url
* @return
*/
private boolean ping(String url) {
try {
//执行简单sql测试url链接可用性
this.driver.connect(url, this.properties).createStatement().execute("SELECT 1");
return true;
} catch (Exception var3) {
return false;
}
}
/**
* 遍历所有url,通过ping的方式,选择出可用的url
* @return
*/
public synchronized int actualize() {
//新建可用url列表
List<String> enabledUrls = new ArrayList(this.allUrls.size());
Iterator var2 = this.allUrls.iterator();
while(var2.hasNext()) {
String url = (String)var2.next();
log.debug("Pinging disabled url: {}", url);
if (this.ping(url)) {
log.debug("Url is alive now: {}", url);
//ping通的才添加进可用的
enabledUrls.add(url);
} else {
log.debug("Url is dead now: {}", url);
}
}
//重置可用url列表
this.enabledUrls = Collections.unmodifiableList(enabledUrls);
return enabledUrls.size();
}
/**
* 随机获取可用url返回
* @return
* @throws SQLException
*/
private String getAnyUrl() throws SQLException {
//可用url列表
List<String> localEnabledUrls = this.enabledUrls;
if (localEnabledUrls.isEmpty()) {
throw new SQLException("Unable to get connection: there are no enabled urls");
} else {
Random random = (Random)this.randomThreadLocal.get();
if (random == null) {
this.randomThreadLocal.set(new Random());
//产生一个随机数
random = (Random)this.randomThreadLocal.get();
}
int index = random.nextInt(localEnabledUrls.size());
//用随机数选择一个可用的url返回
return (String)localEnabledUrls.get(index);
}
}
public ClickHouseConnection getConnection() throws SQLException {
return this.driver.connect(this.getAnyUrl(), this.properties);
}
public ClickHouseConnection getConnection(String username, String password) throws SQLException {
return this.driver.connect(this.getAnyUrl(), this.properties.withCredentials(username, password));
}
public <T> T unwrap(Class<T> iface) throws SQLException {
if (iface.isAssignableFrom(this.getClass())) {
return iface.cast(this);
} else {
throw new SQLException("Cannot unwrap to " + iface.getName());
}
}
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return iface.isAssignableFrom(this.getClass());
}
public PrintWriter getLogWriter() throws SQLException {
return this.printWriter;
}
public void setLogWriter(PrintWriter printWriter) throws SQLException {
this.printWriter = printWriter;
}
public void setLoginTimeout(int seconds) throws SQLException {
this.loginTimeoutSeconds = seconds;
}
public int getLoginTimeout() throws SQLException {
return this.loginTimeoutSeconds;
}
public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException();
}
/**
* 定期清理无用url链接
* @param rate
* @param timeUnit
* @return
*/
public BalancedClickhouseDataSource withConnectionsCleaning(int rate, TimeUnit timeUnit) {
this.driver.scheduleConnectionsCleaning(rate, timeUnit);
return this;
}
/**
* 定期确认url,通过定时任务实现,以定时更新可用url列表
* @param delay
* @param timeUnit
* @return
*/
public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) {
ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
BalancedClickhouseDataSource.this.actualize();
} catch (Exception var2) {
BalancedClickhouseDataSource.log.error("Unable to actualize urls", var2);
}
}
}, 0L, (long)delay, timeUnit);
return this;
}
public List<String> getAllClickhouseUrls() {
return this.allUrls;
}
public List<String> getEnabledClickHouseUrls() {
return this.enabledUrls;
}
/**
* 返回不可用url集合
* 通过all 和 enable的差值来找
*
* @return
*/
public List<String> getDisabledUrls() {
List<String> enabledUrls = this.enabledUrls;
if (!this.hasDisabledUrls()) {
return Collections.emptyList();
} else {
List<String> disabledUrls = new ArrayList(this.allUrls);
disabledUrls.removeAll(enabledUrls);
return disabledUrls;
}
}
public boolean hasDisabledUrls() {
return this.allUrls.size() != this.enabledUrls.size();
}
public ClickHouseProperties getProperties() {
return this.properties;
}
private static ClickHouseProperties getFromUrl(String url) {
return new ClickHouseProperties(getFromUrlWithoutDefault(url));
}
private static Properties getFromUrlWithoutDefault(String url) {
if (StringUtils.isBlank(url)) {
return new Properties();
} else {
int index = url.indexOf("?");
return index == -1 ? new Properties() : ClickhouseJdbcUrlParser.parseUriQueryPart(url.substring(index + 1), new Properties());
}
}
}