<!-- Clickhouse -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.2</version>
</dependency>
在java程序包目录创建
包名 | 说明 |
---|---|
cn.it.clickhouse | 代码所在的包目录 |
package cn.it.demo;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 使用JDBC操作ClickHouse
*/
public class ClickHouseJDBC {
public static void main(String[] args) {
String sqlDB = "show databases";//查询数据库
String sqlTab = "show tables";//查看表
String sqlCount = "select count(*) count from ontime";//查询ontime数据量
exeSql(sqlDB);
exeSql(sqlTab);
exeSql(sqlCount);
}
public static void exeSql(String sql){
String address = "jdbc:clickhouse://node2:8123/default";
Connection connection = null;
Statement statement = null;
ResultSet results = null;
try {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
connection = DriverManager.getConnection(address);
statement = connection.createStatement();
long begin = System.currentTimeMillis();
results = statement.executeQuery(sql);
long end = System.currentTimeMillis();
System.out.println("执行("+sql+")耗时:"+(end-begin)+"ms");
ResultSetMetaData rsmd = results.getMetaData();
List<Map> list = new ArrayList();
while(results.next()){
Map map = new HashMap();
for(int i = 1;i<=rsmd.getColumnCount();i++){
map.put(rsmd.getColumnName(i),results.getString(rsmd.getColumnName(i)));
}
list.add(map);
}
for(Map map : list){
System.err.println(map);
}
} catch (Exception e) {
e.printStackTrace();
}finally {//关闭连接
try {
if(results!=null){
results.close();
}
if(statement!=null){
statement.close();
}
if(connection!=null){
connection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
<repositories>
<repository>
<id>mvnrepository</id>
<url>https://mvnrepository.com/</url>
<layout>default</layout>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>elastic.co</id>
<url>https://artifacts.elastic.co/maven</url>
</repository>
</repositories>
<properties>
<scala.version>2.11</scala.version>
<!-- Spark -->
<spark.version>2.4.0-cdh6.2.1</spark.version>
<!-- Parquet -->
<parquet.version>1.9.0-cdh6.2.1</parquet.version>
<!-- ClickHouse -->
<clickhouse.version>0.2.2</clickhouse.version>
<jtuple.version>1.2</jtuple.version>
</properties>
<dependencies>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.javatuples</groupId>
<artifactId>javatuples</artifactId>
<version>${jtuple.version}</version>
</dependency>
<!-- Clickhouse -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
在scala程序包目录创建
包名 | 说明 |
---|---|
cn.it.clickhouse | 代码所在的包目录 |
实现步骤:
代码实现
package cn.it.demo
import cn.it.demo.utils.ClickHouseUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使用jdbc方式操作clickhouse表
*/
object ClickHouseJDBCDemo {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
//创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//读取json文件 创建DataFrame
val df: DataFrame = spark.read.json("E:\\input\\order.json")
df.show()
spark.stop()
}
}
3.1、创建表
实现步骤:
实现方法:
package cn.it.demo.utils
/**
* ClickHouse的工具类
*/
class ClickHouseUtils extends Serializable {
}
/**
* 创建clickhouse的连接实例,返回连接对象
* @return
*/
def createConnection = {
//定义ClickHouse的服务器地址
val host = "node2"
//定义ClickHouse的连接端口号
val port = "8123"
val properties = new ClickHouseProperties()
val dataSource = new ClickHouseDataSource(s"jdbc:clickhouse://${host}:${port}", properties)
dataSource.getConnection()
}
/**
* 创建clickhouse的表,返回创建表的sql字符串
* @param table
* @param schema
* @return
*/
def createTable(table: String, schema: StructType, primaryKeyField:String = "id"): String = {
//生成表的列集合字符串
val tableFieldsStr: String = schema.map { f =>
val fName = f.name
val fType = f.dataType match {
case org.apache.spark.sql.types.DataTypes.StringType => "String"
case org.apache.spark.sql.types.DataTypes.IntegerType => "Int32"
case org.apache.spark.sql.types.DataTypes.FloatType => "Float32"
case org.apache.spark.sql.types.DataTypes.LongType => "Int64"
case org.apache.spark.sql.types.DataTypes.BooleanType => "UInt8"
case org.apache.spark.sql.types.DataTypes.DoubleType => "Float64"
case org.apache.spark.sql.types.DataTypes.DateType => "DateTime"
case org.apache.spark.sql.types.DataTypes.TimestampType => "DateTime"
case x => throw new Exception(s"Unsupported type: ${x.toString}")
}
s"$fName $fType"
}.mkString(", ")
//返回创建表的sql字符串
s"CREATE TABLE IF NOT EXISTS $table(${tableFieldsStr},sign Int8,version UInt64) ENGINE=VersionedCollapsingMergeTree(sign, version) ORDER BY ${primaryKeyField}"
}
/**
* 执行更新操作
* @param sql
* @return
*/
def executeUpdate(sql: String) = {
//获取clickhouse的连接字符串
val connection: ClickHouseConnection = createConnection
println(sql)
val statement: ClickHouseStatement = connection.createStatement()
statement.executeUpdate(sql)
}
//创建clickhouse工具实例对象
val clickHouseUtils: ClickHouseUtils = new ClickHouseUtils
//创建表
val strCreateTable: String = clickHouseUtils.createTable("order", df.schema)
clickHouseUtils.executeUpdate(strCreateTable);
3.2、插入数据
实现步骤:
实现方法:
/**
* 生成插入表数据的sql字符串
* @param tableName
* @param schema
* @return
*/
private def createInsertStatmentSql(tableName: String)(schema: org.apache.spark.sql.types.StructType) = {
val columns = schema.map(f => f.name).toList
val vals = 1 to (columns.length) map (i => "?")
s"INSERT INTO $tableName (${columns.mkString(",")}) VALUES (${vals.mkString(",")})"
}
/**
* 为sql赋值默认值
* @param sparkType
* @param v
* @return
*/
private def defaultNullValue(sparkType: org.apache.spark.sql.types.DataType, v: Any) = sparkType match {
case DoubleType => 0
case LongType => 0
case FloatType => 0
case IntegerType => 0
case StringType => null
case BooleanType => false
case _ => null
}
/**
* 将数据插入到clickhouse中
* @param tableName
* @param df
*/
def insertToCkWithStatement(tableName :String, df:DataFrame): Unit = {
//生成插入sql字符串
val insertSql: String = createInsertStatmentSql(tableName)(df.schema)
df.foreachPartition(rows => {
var connection: ClickHouseConnection = null
var pstmt: PreparedStatement = null
var count = 0;
try {
connection = createConnection
pstmt = connection.prepareStatement(insertSql)
rows.foreach(line => {
count += 1
line.schema.fields.foreach{ f =>
val fieldName = f.name
val fieldIdx = line.schema.fieldIndex(fieldName)
val fieldVal = line.get(fieldIdx)
if(fieldVal != null)
pstmt.setObject(fieldIdx+1, fieldVal)
else{
val defVal = defaultNullValue(f.dataType, fieldVal)
pstmt.setObject(fieldIdx+1, defVal)
}
}
// 批量写入
pstmt.addBatch()
if (count >= 100000) {
pstmt.executeBatch()
count = 0
}
})
pstmt.executeBatch()
} catch {
case ex: Exception =>
println(ex)
} finally {
if (connection != null)
connection.close()
}
})
}
//插入数据
clickHouseUtils.insertToCkWithStatement("order", df)
3.3、修改数据
实现步骤:
实现方法:
/**
* 根据指定字段获取该字段的值
* @param fieldName
* @param schema
* @param data
* @return
*/
private def getFieldValue(fieldName: String, schema: StructType, data: Row): Any = {
var flag = true
var fieldValue: String = null
val fields = schema.fields
for (i <- 0 until fields.length if flag) {
val field = fields(i)
if (fieldName == field.name) {
fieldValue = field.dataType match {
case DataTypes.DoubleType => if (data.isNullAt(i)) "NULL" else s"${data.getDouble(i)}"
case DataTypes.FloatType => if (data.isNullAt(i)) "NULL" else s"${data.getFloat(i)}"
case DataTypes.IntegerType => if (data.isNullAt(i)) "NULL" else s"${data.getInt(i)}"
case DataTypes.LongType => if (data.isNullAt(i)) "NULL" else s"${data.getLong(i)}"
case DataTypes.StringType => if (data.isNullAt(i)) "NULL" else s"${data.getString(i).toString.trim}"
case DataTypes.DateType => if (data.isNullAt(i)) "NULL" else s"'${new SimpleDateFormat("yyyy-MM-dd").format(new Date(data.get(i).asInstanceOf[Date].getTime / 1000))}'"
case DataTypes.TimestampType => if (data.isNullAt(i)) "NULL" else s"${new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(data.getLong(i) / 1000))}"
case DataTypes.NullType => "NULL"
}
flag = false
}
}
fieldValue
}
/**
* 生成修改表数据的sql字符串
* @param tableName
* @param schema
* @return
*/
private def createUpdateStatmentSql(tableName: String, row:Row, primaryKeyField:String = "id")(schema: org.apache.spark.sql.types.StructType) = {
val primaryKeyValue = getFieldValue(primaryKeyField, schema, row)
val noPrimaryKeyFields = schema.fields.filter(field => field.name != primaryKeyField)
var sets = ArrayBuffer[String]()
for (i <- 0 until noPrimaryKeyFields.length) {
val noPrimaryKeyField = noPrimaryKeyFields(i)
val set = noPrimaryKeyField.name + s"='${getFieldValue(noPrimaryKeyField.name, schema, row).toString}'"
sets += set
}
s"ALTER TABLE $tableName UPDATE ${sets.mkString(", ")} WHERE ${primaryKeyField}=$primaryKeyValue"
}
def updateToCkWithStatement(tableName :String, df:DataFrame, primaryKeyField:String = "id")= {
df.foreachPartition(rows => {
var connection: ClickHouseConnection = null
var pstmt: ClickHouseStatement = null
try {
connection = createConnection
pstmt = connection.createStatement()
rows.foreach(line => {
//生成修改sql字符串
val updateSql: String = createUpdateStatmentSql(tableName, line)(line.schema)
pstmt.executeUpdate(updateSql)
})
} catch {
case ex: Exception =>
println(ex)
}
})
}
//更新数据
clickHouseUtils.updateToCkWithStatement("order", df)
3.4、删除数据
实现步骤:
实现方法:
/**
* 生成删除表数据的sql字符串
* @param tableName
* @param schema
* @return
*/
private def createDeleteStatmentSql(tableName: String, row:Row, primaryKeyField:String = "id")(schema: org.apache.spark.sql.types.StructType) = {
val primaryKeyValue = getFieldValue(primaryKeyField, schema, row)
s"ALTER TABLE $tableName DELETE WHERE ${primaryKeyField} = $primaryKeyValue"
}
/**
* 将数据从clickhouse中删除
* @param tableName
* @param df
*/
def deleteToCkWithStatement(tableName :String, df:DataFrame, primaryKeyField:String = "id")= {
df.foreachPartition(rows => {
var connection: ClickHouseConnection = null
var pstmt: ClickHouseStatement = null
try {
connection = createConnection
pstmt = connection.createStatement()
rows.foreach(line => {
//生成删除sql字符串
val deleteSql: String = createDeleteStatmentSql(tableName, line)(line.schema)
println(deleteSql)
pstmt.executeUpdate(deleteSql)
})
} catch {
case ex: Exception =>
println(ex)
}
})
}
//删除数据
clickHouseUtils.deleteToCkWithStatement("order", df)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。