前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >客快物流大数据项目(一百):ClickHouse的使用

客快物流大数据项目(一百):ClickHouse的使用

原创
作者头像
Lansonli
发布2022-12-29 15:12:42
1.2K0
发布2022-12-29 15:12:42
举报
文章被收录于专栏:Lansonli技术博客

​ClickHouse的使用

一、使用Java操作ClickHouse

1、构建maven工程

2、​​​​​​​导入依赖

代码语言:javascript
复制
<!-- Clickhouse -->
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.2</version>
</dependency>

3、​​​​​​​​​​​​​​创建包结构

java程序包目录创建

包名

说明

cn.it.clickhouse

代码所在的包目录

4、代码案例

代码语言:javascript
复制
package cn.it.demo;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 使用JDBC操作ClickHouse
 */
public class ClickHouseJDBC {
    public static void main(String[] args) {
        String sqlDB = "show databases";//查询数据库
        String sqlTab = "show tables";//查看表
        String sqlCount = "select count(*) count from ontime";//查询ontime数据量
        exeSql(sqlDB);
        exeSql(sqlTab);
        exeSql(sqlCount);
    }

    public static void exeSql(String sql){
        String address = "jdbc:clickhouse://node2:8123/default";
        Connection connection = null;
        Statement statement = null;
        ResultSet results = null;
        try {
            Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
            connection = DriverManager.getConnection(address);
            statement = connection.createStatement();
            long begin = System.currentTimeMillis();
            results = statement.executeQuery(sql);
            long end = System.currentTimeMillis();
            System.out.println("执行("+sql+")耗时:"+(end-begin)+"ms");
            ResultSetMetaData rsmd = results.getMetaData();
            List<Map> list = new ArrayList();
            while(results.next()){
                Map map = new HashMap();
                for(int i = 1;i<=rsmd.getColumnCount();i++){
                    map.put(rsmd.getColumnName(i),results.getString(rsmd.getColumnName(i)));
                }
                list.add(map);
            }
            for(Map map : list){
                System.err.println(map);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {//关闭连接
            try {
                if(results!=null){
                    results.close();
                }
                if(statement!=null){
                    statement.close();
                }
                if(connection!=null){
                    connection.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

二、使用Spark操作ClickHouse

1、导入依赖

代码语言:javascript
复制
<repositories>
    <repository>
        <id>mvnrepository</id>
        <url>https://mvnrepository.com/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>elastic.co</id>
        <url>https://artifacts.elastic.co/maven</url>
    </repository>
</repositories>

<properties>
    <scala.version>2.11</scala.version>
    <!-- Spark -->
    <spark.version>2.4.0-cdh6.2.1</spark.version>
    <!-- Parquet -->
    <parquet.version>1.9.0-cdh6.2.1</parquet.version>
    <!-- ClickHouse -->
    <clickhouse.version>0.2.2</clickhouse.version>
    <jtuple.version>1.2</jtuple.version>
</properties>

<dependencies>
    <!-- Spark -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-common</artifactId>
        <version>${parquet.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-graphx_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>net.jpountz.lz4</groupId>
        <artifactId>lz4</artifactId>
        <version>1.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.javatuples</groupId>
        <artifactId>javatuples</artifactId>
        <version>${jtuple.version}</version>
    </dependency>
    <!-- Clickhouse -->
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>${clickhouse.version}</version>
        <exclusions>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

2、​​​​​​​​​​​​​​创建包结构

scala程序包目录创建

包名

说明

cn.it.clickhouse

代码所在的包目录

3、案例开发

实现步骤:

  • 创建ClickHouseJDBCDemo单例对象
  • 初始化spark运行环境
  • 加载外部数据源(资料\order.json)生成DataFrame对象

代码实现

代码语言:javascript
复制
package cn.it.demo

import cn.it.demo.utils.ClickHouseUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 使用jdbc方式操作clickhouse表
 */
object ClickHouseJDBCDemo {
  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")

    //创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    //读取json文件 创建DataFrame
    val df: DataFrame = spark.read.json("E:\\input\\order.json")
    df.show()

    spark.stop()
  }
}

3.1、创建表

实现步骤:

  • 创建ClickHouseUtils工具类
  • 创建方法:clickhouse的连接实例,返回连接对象
  • 创建方法:生成表的sql字符串
  • 创建方法:执行更新操作
  • ClickHouseJDBCDemo单例对象中调用创建表

实现方法:

  • 创建ClickHouseUtils工具类
代码语言:javascript
复制
package cn.it.demo.utils

/**
 * ClickHouse的工具类
 */
class ClickHouseUtils extends Serializable {
}

  • 创建方法:clickhouse的连接实例,返回连接对象
代码语言:javascript
复制
/**
* 创建clickhouse的连接实例,返回连接对象
* @return
*/
def createConnection = {
  //定义ClickHouse的服务器地址
  val host = "node2"
  //定义ClickHouse的连接端口号
  val port = "8123"
  val properties = new ClickHouseProperties()
  val dataSource = new ClickHouseDataSource(s"jdbc:clickhouse://${host}:${port}", properties)
  dataSource.getConnection()
}

  • 创建方法:生成表的sql字符串
代码语言:javascript
复制
/**
 * 创建clickhouse的表,返回创建表的sql字符串
 * @param table
 * @param schema
 * @return
 */
def createTable(table: String, schema: StructType, primaryKeyField:String = "id"): String = {
  //生成表的列集合字符串
  val tableFieldsStr: String = schema.map { f =>
    val fName = f.name
    val fType = f.dataType match {
      case org.apache.spark.sql.types.DataTypes.StringType => "String"
      case org.apache.spark.sql.types.DataTypes.IntegerType => "Int32"
      case org.apache.spark.sql.types.DataTypes.FloatType => "Float32"
      case org.apache.spark.sql.types.DataTypes.LongType => "Int64"
      case org.apache.spark.sql.types.DataTypes.BooleanType => "UInt8"
      case org.apache.spark.sql.types.DataTypes.DoubleType => "Float64"
      case org.apache.spark.sql.types.DataTypes.DateType => "DateTime"
      case org.apache.spark.sql.types.DataTypes.TimestampType => "DateTime"
      case x => throw new Exception(s"Unsupported type: ${x.toString}")
    }
    s"$fName $fType"
  }.mkString(", ")

  //返回创建表的sql字符串
  s"CREATE TABLE IF NOT EXISTS $table(${tableFieldsStr},sign Int8,version UInt64) ENGINE=VersionedCollapsingMergeTree(sign, version) ORDER BY ${primaryKeyField}"
}

  • 创建方法:执行更新操作
代码语言:javascript
复制
/**
 * 执行更新操作
 * @param sql
 * @return
 */
def executeUpdate(sql: String) = {
  //获取clickhouse的连接字符串
  val connection: ClickHouseConnection = createConnection
  println(sql)
  val statement: ClickHouseStatement = connection.createStatement()
  statement.executeUpdate(sql)
}

  • ClickHouseJDBCDemo单例对象中调用创建表
代码语言:javascript
复制
//创建clickhouse工具实例对象
val clickHouseUtils: ClickHouseUtils = new ClickHouseUtils

//创建表
val strCreateTable: String = clickHouseUtils.createTable("order", df.schema)
clickHouseUtils.executeUpdate(strCreateTable);

3.2、​​​​​​​​​​​​​​插入数据

实现步骤:

  • 打开ClickHouseUtils工具类
  • 创建方法:生成插入表数据的sql字符串
  • 创建方法:根据字段类型为字段赋值默认值
  • 创建方法:将数据插入到clickhouse中
  • ClickHouseJDBCDemo单例对象中调用插入数据

实现方法:

  • 创建方法:生成插入表数据的sql字符串
代码语言:javascript
复制
/**
 * 生成插入表数据的sql字符串
 * @param tableName
 * @param schema
 * @return
 */
private def createInsertStatmentSql(tableName: String)(schema: org.apache.spark.sql.types.StructType) = {
  val columns = schema.map(f => f.name).toList
  val vals = 1 to (columns.length) map (i => "?")
  s"INSERT INTO $tableName (${columns.mkString(",")}) VALUES (${vals.mkString(",")})"
}

  • 创建方法:根据字段类型为字段赋值默认值
代码语言:javascript
复制
/**
 * 为sql赋值默认值
 * @param sparkType
 * @param v
 * @return
 */
private def defaultNullValue(sparkType: org.apache.spark.sql.types.DataType, v: Any) = sparkType match {
  case DoubleType => 0
  case LongType => 0
  case FloatType => 0
  case IntegerType => 0
  case StringType => null
  case BooleanType => false
  case _ => null
}

  • 创建方法:将数据插入到clickhouse中
代码语言:javascript
复制
/**
 * 将数据插入到clickhouse中
 * @param tableName
 * @param df
 */
def insertToCkWithStatement(tableName :String, df:DataFrame): Unit = {
  //生成插入sql字符串
  val insertSql: String = createInsertStatmentSql(tableName)(df.schema)
  df.foreachPartition(rows => {
    var connection: ClickHouseConnection = null
    var pstmt: PreparedStatement = null
    var count = 0;
    try {
      connection = createConnection
      pstmt = connection.prepareStatement(insertSql)
      rows.foreach(line => {
        count += 1
        line.schema.fields.foreach{ f =>
          val fieldName = f.name
          val fieldIdx = line.schema.fieldIndex(fieldName)
          val fieldVal = line.get(fieldIdx)
          if(fieldVal != null)
            pstmt.setObject(fieldIdx+1, fieldVal)
          else{
            val defVal = defaultNullValue(f.dataType, fieldVal)
            pstmt.setObject(fieldIdx+1, defVal)
          }
        }
        // 批量写入
        pstmt.addBatch()
        if (count >= 100000) {
          pstmt.executeBatch()
          count = 0
        }
      })
      pstmt.executeBatch()
    } catch {
      case ex: Exception =>
        println(ex)
    } finally {
      if (connection != null)
        connection.close()
    }
  })
}

  • ClickHouseJDBCDemo单例对象中调用插入数据
代码语言:javascript
复制
//插入数据
clickHouseUtils.insertToCkWithStatement("order", df)

3.3、​​​​​​​​​​​​​​修改数据

实现步骤:

  • 打开ClickHouseUtils工具类
  • 创建方法:根据指定的字段名称获取字段对应的值
  • 创建方法:生成修改表数据的sql字符串
  • 创建方法:将数据更新到clickhouse中
  • ClickHouseJDBCDemo单例对象中调用更新数据

实现方法:

  • 创建方法:根据指定的字段名称获取字段对应的值
代码语言:javascript
复制
/**
 * 根据指定字段获取该字段的值
 * @param fieldName
 * @param schema
 * @param data
 * @return
 */
private def getFieldValue(fieldName: String, schema: StructType, data: Row): Any = {
  var flag = true
  var fieldValue: String = null
  val fields = schema.fields
  for (i <- 0 until fields.length if flag) {
    val field = fields(i)
    if (fieldName == field.name) {
      fieldValue = field.dataType match {
        case DataTypes.DoubleType => if (data.isNullAt(i)) "NULL" else s"${data.getDouble(i)}"
        case DataTypes.FloatType => if (data.isNullAt(i)) "NULL" else s"${data.getFloat(i)}"
        case DataTypes.IntegerType => if (data.isNullAt(i)) "NULL" else s"${data.getInt(i)}"
        case DataTypes.LongType => if (data.isNullAt(i)) "NULL" else s"${data.getLong(i)}"
        case DataTypes.StringType => if (data.isNullAt(i)) "NULL" else s"${data.getString(i).toString.trim}"
        case DataTypes.DateType => if (data.isNullAt(i)) "NULL" else s"'${new SimpleDateFormat("yyyy-MM-dd").format(new Date(data.get(i).asInstanceOf[Date].getTime / 1000))}'"
        case DataTypes.TimestampType => if (data.isNullAt(i)) "NULL" else s"${new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(data.getLong(i) / 1000))}"
        case DataTypes.NullType => "NULL"
      }
      flag = false
    }
  }
  fieldValue
}

  • 创建方法:生成修改表数据的sql字符串
代码语言:javascript
复制
/**
 * 生成修改表数据的sql字符串
 * @param tableName
 * @param schema
 * @return
 */
private def createUpdateStatmentSql(tableName: String, row:Row,  primaryKeyField:String = "id")(schema: org.apache.spark.sql.types.StructType) = {
  val primaryKeyValue = getFieldValue(primaryKeyField, schema, row)
  val noPrimaryKeyFields = schema.fields.filter(field => field.name != primaryKeyField)
  var sets = ArrayBuffer[String]()
  for (i <- 0 until noPrimaryKeyFields.length) {
    val noPrimaryKeyField = noPrimaryKeyFields(i)
    val set = noPrimaryKeyField.name + s"='${getFieldValue(noPrimaryKeyField.name, schema, row).toString}'"
    sets += set
  }
  s"ALTER TABLE $tableName UPDATE ${sets.mkString(", ")} WHERE ${primaryKeyField}=$primaryKeyValue"
}

  • 创建方法:将数据更新到clickhouse中
代码语言:javascript
复制
def updateToCkWithStatement(tableName :String, df:DataFrame, primaryKeyField:String = "id")= {
  df.foreachPartition(rows => {
    var connection: ClickHouseConnection = null
    var pstmt: ClickHouseStatement = null
    try {
      connection = createConnection
      pstmt = connection.createStatement()
      rows.foreach(line => {
        //生成修改sql字符串
        val updateSql: String = createUpdateStatmentSql(tableName, line)(line.schema)
        pstmt.executeUpdate(updateSql)
      })
    } catch {
      case ex: Exception =>
        println(ex)
    }
  })
}

  • ClickHouseJDBCDemo单例对象中调用更新数据
代码语言:javascript
复制
//更新数据
clickHouseUtils.updateToCkWithStatement("order", df)

3.4、​​​​​​​​​​​​​​删除数据

实现步骤:

  • 打开ClickHouseUtils工具类
  • 创建方法:生成删除表数据的sql字符串
  • 创建方法:将数据从clickhouse中删除
  • ClickHouseJDBCDemo单例对象中调用删除数据

实现方法:

  • 创建方法:生成删除表数据的sql字符串
代码语言:javascript
复制
/**
 * 生成删除表数据的sql字符串
 * @param tableName
 * @param schema
 * @return
 */
private def createDeleteStatmentSql(tableName: String, row:Row,  primaryKeyField:String = "id")(schema: org.apache.spark.sql.types.StructType) = {
  val primaryKeyValue = getFieldValue(primaryKeyField, schema, row)
  s"ALTER TABLE $tableName DELETE WHERE ${primaryKeyField} = $primaryKeyValue"
}

  • 创建方法:将数据从clickhouse中删除
代码语言:javascript
复制
/**
 * 将数据从clickhouse中删除
 * @param tableName
 * @param df
 */
def deleteToCkWithStatement(tableName :String, df:DataFrame, primaryKeyField:String = "id")= {
  df.foreachPartition(rows => {
    var connection: ClickHouseConnection = null
    var pstmt: ClickHouseStatement = null
    try {
      connection = createConnection
      pstmt = connection.createStatement()
      rows.foreach(line => {
        //生成删除sql字符串
        val deleteSql: String = createDeleteStatmentSql(tableName, line)(line.schema)
        println(deleteSql)
        pstmt.executeUpdate(deleteSql)
      })
    } catch {
      case ex: Exception =>
        println(ex)
    }
  })
}

  • ClickHouseJDBCDemo单例对象中调用删除数据
代码语言:javascript
复制
//删除数据
clickHouseUtils.deleteToCkWithStatement("order", df)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ​ClickHouse的使用
    • 一、使用Java操作ClickHouse
      • 1、构建maven工程
      • 2、​​​​​​​导入依赖
      • 3、​​​​​​​​​​​​​​创建包结构
      • 4、代码案例
    • 二、使用Spark操作ClickHouse
      • 1、导入依赖
      • 2、​​​​​​​​​​​​​​创建包结构
      • 3、案例开发
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档