之前的文章有说clickhouse的分布式集群做数据插入有两种方式,一种是随机选个节点插入数据,另外是直接插入分布式表。如果我们直接插入分布式表,分布式表会经历过把数据同步到其他节点的过程,会造成批量插入的时候性能出现瓶颈。我们一般实现都通过随机选节点插入。
这里我主要讲一下go客户端如果实现随机做插入。 首先我们来看一段代码,下面的代码如果设置多个host,则会进行随机平均选择节点进行插入。
package main
import (
"fmt"
_ "github.com/ClickHouse/clickhouse-go"
"github.com/jmoiron/sqlx"
"log"
"time"
)
var allclient *sqlx.DB
func main() {
host1 := "192.168.0.001:9000"
otherHost := "192.168.0.002:9000"
user := "admin"
password := "a123456789"
database := "app1"
tcpInfo := "tcp://%s?username=%s&password=%s&database=%s&read_timeout=5&write_timeout=5&debug=true&compress=true&alt_hosts=%s"
tcpInfo = fmt.Sprintf(tcpInfo, host1, user, password, database, otherHost)
fmt.Println("tcpInfo", tcpInfo)
var err error
//创建全局对象
allclient, err = sqlx.Open("clickhouse", tcpInfo)
if err != nil {
log.Fatal("err", err)
}
allclient.SetMaxOpenConns(5)
allclient.SetMaxIdleConns(5)
if err != nil {
log.Fatal("1", err)
}
for i := 0; i < 4; i++ {
go a(i)
}
time.Sleep(2 * time.Hour)
}
func a(i int) {
log.Println("start")
//这里是开始创建连接,如果是同一个进程,则会复用连接池保存的进程。
tx, err := allclient.Begin()
if err != nil {
log.Println("xxx", err)
}
log.Println("start Prepare")
stmt, err := tx.Prepare("insert into app1.test1 values(?,?,?)")
if err != nil {
log.Println("xxx2", err)
}
if _, err := stmt.Exec(
"我的",
10000,
time.Now(),
); err != nil {
log.Fatal("2", err)
}
if err := tx.Commit(); err != nil {
log.Fatal("3", err)
}
time.Sleep(10 * time.Second)
fmt.Printf("i:%d\n", i)
}
看完上面的代码,我们来分析为什么我们能平均插入多个节点,我们首先看一下alt_hosts这个参数,官方文档说明如下:
alt_hosts - comma separated list of single address host for load-balancing
其实实现起来插入是比较简单的,就是配置多个host即可。就如上面的代码,则可以实现随机选择一个节点插入本地表。
下面我们简单看一下为什么能这么实现? 在github.com/ClickHouse/clickhouse-go/bootstrap.go的代码中,有个open(dsn string)的方法实现了连接的默认选择服务的方式是:connOpenStrategy=connOpenRandom随机。关于选择的hosts是通过url.Host加上alt_hosts组合起来生成的hosts。源代码如下:
if altHosts := strings.Split(query.Get("alt_hosts"), ","); len(altHosts) != 0 {
for _, host := range altHosts {
if len(host) != 0 {
hosts = append(hosts, host)
}
}
}
open函数执行是在clickhouse的连接的时候,同时在这个时候也会操作github.com/ClickHouse/clickhouse-go/connect.go里面的dial函数,dial函数这里会通过刚才的配置策略,返回需要操作的连接,当然如果是之前存在的连接则会复用。关于host的选择的代码主要如下:
var num int
switch options.openStrategy {
case connOpenInOrder:
num = i
case connOpenRandom://host默认选择方式
num = (ident + i) % len(options.hosts)
case connOpenTimeRandom:
// select host based on milliseconds
num = int((time.Now().UnixNano()/1000)%1000) % len(options.hosts)
for _, ok := checkedHosts[num]; ok; _, ok = checkedHosts[num] {
num = int(time.Now().UnixNano()) % len(options.hosts)
}
checkedHosts[num] = struct{}{}
}
总结:clickhouse的go的客户端插入分布式集群单一节点的方式直接配置alt_hosts即可。