使用ClickHouse分析物流指标数据,必须将数据存储到ClickHouse中。
业务流程:
Spark 1.3 版本开始引入了 Data Source API V1,通过这个 API 我们可以很方便的读取各种来源的数据,而且 Spark 使用 SQL 组件的一些优化引擎对数据源的读取进行优化,比如列裁剪、过滤下推等等。
这个版本的 Data Source API 有以下几个优点:
同时存在一些问题:
Data Source API V2为了解决 Data Source V1 的一些问题,从 Apache Spark 2.3.0 版本开始,社区引入了 Data Source API V2,在保留原有的功能之外,还解决了 Data Source API V1 存在的一些问题,比如不再依赖上层 API,扩展能力增强。
这个版本的 Data Source API V2 有以下几个优点:
Spark2.3中V2的功能
为了使用 Data Source API V2,我们肯定是需要使用到 Data Source API V2 包里面相关的类库,对于读取程序,我们只需要实现 ReadSupport 相关接口就行,如下:
代码实现:
/**
* Spark SQL 基于DataSourceV2接口实现自定义数据源
* 1.继承DataSourceV2向Spark注册数据源
* 2.继承ReadSupport支持读数据
* 3.继承WriteSupport支持写数据
*/
class CustomDataSourceV2 extends DataSourceV2 with ReadSupport with WriteSupport {
/**
* 创建Reader
*
* @param options 用户自定义的options
* @return 返回自定义的DataSourceReader
*/
override def createReader(options: DataSourceOptions): DataSourceReader = ???
/**
* 创建Writer
*
* @param jobId jobId
* @param schema schema
* @param mode 保存模式
* @param options 用于定义的option
* @return Optional[自定义的DataSourceWriter]
*/
override def createWriter(jobId: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = ???
}
前面我们实现了 ReadSupport 接口,并重写了 createReader 方法。这里我们需要实现 DataSourceReader 接口相关的操作,如下:
/**
* 自定义的DataSourceReader
* 继承DataSourceReader
* 重写readSchema方法用来生成schema
* 重写planInputPartitions,每个分区拆分及读取逻辑
* @param options options
*/
case class CustomDataSourceV2Reader(options: Map[String, String]) extends DataSourceReader {
/**
* 读取的列相关信息
* @return
*/
override def readSchema(): StructType = ???
/**
* 每个分区拆分及读取逻辑
* @return
*/
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = ???
}
/**
* 自定义的DataSourceWriter
* 继承DataSourceWriter
* 重写createWriterFactory方法用来创建RestDataWriter工厂类
* 重写commit方法,所有分区提交的commit信息
* 重写abort方法,当write异常时调用,该方法用于事务回滚,当write方法发生异常之后触发该方法
* @param dataSourceOptions options
*/
class CustomDataSourceWriter(dataSourceOptions: DataSourceOptions) extends DataSourceWriter {
/**
* 创建RestDataWriter工厂类
* @return DataWriterFactory
*/
override def createWriterFactory(): DataWriterFactory[InternalRow] = ???
/**
* commit
* @param writerCommitMessages 所有分区提交的commit信息
* 触发一次
*/
override def commit(writerCommitMessages: Array[WriterCommitMessage]): Unit = ???
/** *
* abort
* @param writerCommitMessages 当write异常时调用,该方法用于事务回滚,当write方法发生异常之后触发该方法
*/
override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = ???
}
最后一个需要我们实现的就是分片读取,在 DataSource V1 里面缺乏分区的支持,而 DataSource V2 支持完整的分区处理,也就是上面的 planInputPartitions 方法。
在那里我们可以定义使用几个分区读取数据源的数据。比如如果是 TextInputFormat,我们可以读取到对应文件的 splits 个数,然后每个 split 构成这里的一个分区,使用一个 Task 读取。为了简便起见,我这里使用了只使用了一个分区,也就是 List[InputPartition[InternalRow]].asJava。
SparkSQL的DataSourceV2的实现与StructuredStreaming自定义数据源如出一辙,思想是一样的,但是具体实现有所不同
主要步骤如下:
实现步骤:
实现方法:
package cn.it.logistics.etl.realtime.ext.clickhouse
/**
* @ClassName ClickHouseDataSourceV2
* @Description 扩展SparkSQL DataSourceV2的ClickHouse数据源实现
*/
class ClickHouseDataSourceV2 {
}
/**
* @ClassName ClickHouseDataSourceV2
* @Description 扩展SparkSQL DataSourceV2的ClickHouse数据源实现
*/
class ClickHouseDataSourceV2 extends DataSourceV2 with ReadSupport with WriteSupport with StreamWriteSupport {
}
/**
* @ClassName ClickHouseDataSourceV2
* @Description 扩展SparkSQL DataSourceV2的ClickHouse数据源实现
*/
class ClickHouseDataSourceV2 extends DataSourceV2 with ReadSupport with WriteSupport with StreamWriteSupport {
/** 批处理方式下的数据读取 */
override def createReader(options: DataSourceOptions): DataSourceReader = ???
/** 批处理方式下的数据写入 */
override def createWriter(writeUUID: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = ???
/** 流处理方式下的数据写入 */
override def createStreamWriter(queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamWriter = ???
}
实现步骤:
实现方法:
/**
* 基于批处理的方式对ClickHouse数据库中的数据进行读取
*/
class ClickHouseDataSourceReader(options: ClickHouseOptions) extends DataSourceReader {
}
/**
* 基于批处理的方式对ClickHouse数据库中的数据进行读取
*/
class ClickHouseDataSourceReader(options: ClickHouseOptions) extends DataSourceReader {
//实例化ClickHouseHelper工具类
val ckHelper = new ClickHouseHelper(options)
private val schema: StructType = ckHelper.getSparkTableSchema
/**
* 读取数据需要返回DataFrame对象(RDD+Schema组成)
* 读取表的结构信息(schema)
* @return
*/
override def readSchema(): StructType = schema
/**
* 每个分区拆分读取逻辑的实现(返回所有分区的数据)
* @return
*/
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = util.Arrays.asList(new ClickHouseInputPartition(schema, options))
}
package cn.it.logistics.etl.realtime.ext
import java.sql.{Connection, Date, PreparedStatement, ResultSet, Statement}
import java.text.SimpleDateFormat
import java.util
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.javatuples.Triplet
import ru.yandex.clickhouse.domain.ClickHouseDataType
import ru.yandex.clickhouse.response.{ClickHouseResultSet, ClickHouseResultSetMetaData}
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource, ClickHouseStatement}
import ru.yandex.clickhouse.settings.ClickHouseProperties
import org.apache.spark.sql.types.{BooleanType, DataType, DataTypes, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructField, StructType}
import scala.collection.mutable.ArrayBuffer
/**
* clickHouse操作的工具类
*/
class ClickHouseHelper(options: ClickHouseOptions) extends Logging{
private val opType: String = options.getOpTypeField
private var connection: ClickHouseConnection = getConnection
private val id: String = options.getPrimaryKey
/**
* 获取Clickhouse的连接对象
*/
def getConnection = {
//获取clickhouse的连接字符串
val url: String = options.getURL
//创建clickhouseDataSource对象
val clickHouseDataSource: ClickHouseDataSource = new ClickHouseDataSource(url, new ClickHouseProperties())
//返回clickhouse的连接对象
clickHouseDataSource.getConnection
}
/**
* 返回指定表的schema信息
* @return StructType:sparkDataFrame对象的schema信息
*/
def getSparkTableSchema: StructType = {
import collection.JavaConversions._
val clickHouseTableSchema: util.LinkedList[Triplet[String, String, String]] = getClickHouseTableSchema
//println(clickHouseTableSchema)
val fileds = ArrayBuffer[StructField]()
//基于clickhouse的表的列及列的类型创建schema对象
for (trp <- clickHouseTableSchema) {
fileds += StructField(trp.getValue0, getSparkSqlType(trp.getValue1))
}
//返回structType对象,该对象就是schema
StructType(fileds)
}
/**
* 根据clickhouseTable的列及列的类型集合
*/
def getClickHouseTableSchema = {
//定义列的集合
val fileds: util.LinkedList[Triplet[String, String, String]] = new util.LinkedList[Triplet[String, String, String]]()
//查询指定的表数据,返回查询到的结果及列的信息
//定义clickhouse的connection对象
var connection: ClickHouseConnection = null
var statement: ClickHouseStatement = null
var resultSet: ClickHouseResultSet = null
var metaData: ClickHouseResultSetMetaData = null
try {
//获取connection的连接对象
connection = getConnection
statement = connection.createStatement()
//定义要操作的表的sql语句,目前我们需要的是表的字段及字段类型,而不关心表的数据,因此给定不能满足的查询条件
val sql: String = s"select * FROM ${options.getFullTable} where 1=0"
resultSet = statement.executeQuery(sql).asInstanceOf[ClickHouseResultSet]
//获取到了指定表的元数据信息
metaData = resultSet.getMetaData.asInstanceOf[ClickHouseResultSetMetaData]
val columnCount: Int = metaData.getColumnCount
for (i <- 1 to columnCount) {
val columnName: String = metaData.getColumnName(i)
val columnTypeName: String = metaData.getColumnTypeName(i)
val javaTypeName: String = ClickHouseDataType.fromTypeString(columnTypeName).getJavaClass.getSimpleName
println("columnTypeName:"+columnTypeName)
println("javaTypeName:"+javaTypeName)
fileds.add(new Triplet(columnName, columnTypeName, javaTypeName))
}
} catch {
case ex: Exception => ex.printStackTrace()
} finally {
if (statement != null) statement.close()
if (connection != null) connection.close()
}
fileds
}
def closeAll(connection: Connection = null, st: Statement = null, ps: PreparedStatement = null, rs: ResultSet = null): Unit = {
try {
if (rs != null && !rs.isClosed) rs.close()
if (ps != null && !ps.isClosed) ps.close()
if (st != null && !st.isClosed) st.close()
if (connection != null && !connection.isClosed) connection.close()
} catch {
case e: Exception => e.printStackTrace()
}
}
}
package cn.it.logistics.etl.realtime.ext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
import org.apache.spark.sql.types.StructType
/**
* @ClassName CKInputPartition
* @Description 基于批处理方式的ClickHouse分区实现
*/
class ClickHouseInputPartition(schema: StructType, options: ClickHouseOptions) extends InputPartition[InternalRow] {
/**
* 创建分区数据读取对象
* @return
*/
override def createPartitionReader(): InputPartitionReader[InternalRow] = new ClickHouseInputPartitionReader(schema, options)
}
package cn.it.logistics.etl.realtime.ext
import java.io.Serializable
import java.sql.{ResultSet, SQLException}
import cn.it.logistics.etl.realtime.ext.example1.{CKHelper, CKOptions}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.unsafe.types.UTF8String
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseStatement}
/**
* @ClassName CKInputPartitionReader
* @Description 基于批处理方式的ClickHouse分区读取数据实现
*/
class ClickHouseInputPartitionReader(schema: StructType, options: ClickHouseOptions) extends InputPartitionReader[InternalRow] with Logging with Serializable{
val helper = new ClickHouseHelper(options)
var connection: ClickHouseConnection = null
var st: ClickHouseStatement = null
var rs: ResultSet = null
/**
* 是否有下一条数据
* @return boolean
*/
override def next(): Boolean = {
if (null == connection || connection.isClosed && null == st || st.isClosed && null == rs || rs.isClosed){
connection = helper.getConnection
st = connection.createStatement()
rs = st.executeQuery(helper.getSelectStatement(schema))
println(/**logInfo**/s"初始化ClickHouse连接.")
}
if(null != rs && !rs.isClosed) rs.next() else false
}
/**
* 获取数据
* 当next为true时会调用get方法获取数据
* @return Row
*/
override def get(): InternalRow = {
val fields = schema.fields
val length = fields.length
val record = new Array[Any](length)
for (i <- 0 until length) {
val field = fields(i)
val name = field.name
val dataType = field.dataType
try {
dataType match {
case DataTypes.BooleanType => record(i) = rs.getBoolean(name)
case DataTypes.DateType => record(i) = DateTimeUtils.fromJavaDate(rs.getDate(name))
case DataTypes.DoubleType => record(i) = rs.getDouble(name)
case DataTypes.FloatType => record(i) = rs.getFloat(name)
case DataTypes.IntegerType => record(i) = rs.getInt(name)
case DataTypes.LongType => record(i) = rs.getLong(name)
case DataTypes.ShortType => record(i) = rs.getShort(name)
case DataTypes.StringType => record(i) = UTF8String.fromString(rs.getString(name))
case DataTypes.TimestampType => record(i) = DateTimeUtils.fromJavaTimestamp(rs.getTimestamp(name))
case DataTypes.BinaryType => record(i) = rs.getBytes(name)
case DataTypes.NullType => record(i) = StringUtils.EMPTY
}
} catch {
case e: SQLException => logError(e.getStackTrace.mkString("", scala.util.Properties.lineSeparator, scala.util.Properties.lineSeparator))
}
}
new GenericInternalRow(record)
}
/**
* 关闭资源
*/
override def close(): Unit = {helper.closeAll(connection, st, null, rs)}
}
def getSelectStatement(schema: StructType): String = {
s"SELECT ${schema.fieldNames.mkString(",")} FROM ${options.getFullTable}"
}
/**
* 批处理的方式读取数据
* options:接收到的是加载数据的时候指定的options函数
* */
override def createReader(options: DataSourceOptions): DataSourceReader = new ClickHouseDataSourceReader(new ClickHouseOptions(options.asMap()))
实现步骤:
实现方法:
package cn.it.logistics.etl.realtime.ext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.writer.{DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
* @ClassName CKWriter
* @Description 支持Batch和Stream的数据写实现
*/
class ClickHouseWriter(writeUuidOrQueryId: String, schema: StructType, batchMode: SaveMode, streamMode: OutputMode, options: ClickHouseOptions) extends StreamWriter {
/**
* 重写createWriterFactory返回自定义的DataWriterFactory
* @return
*/
override def createWriterFactory(): DataWriterFactory[InternalRow] = new ClickHouseDataWriterFactory(writeUuidOrQueryId, schema, batchMode, streamMode, options)
/** Batch writer commit */
override def commit(messages: Array[WriterCommitMessage]): Unit = {}
/** Batch writer abort */
override def abort(messages: Array[WriterCommitMessage]): Unit = {}
/** Streaming writer commit */
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
/** Streaming writer abort */
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
}
package cn.it.logistics.etl.realtime.ext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
* @ClassName CKDataWriterFactory
* @Description 写数据工厂,用来实例化CKDataWriter
*/
class ClickHouseDataWriterFactory(writeUUID: String, schema: StructType, batchMode: SaveMode, streamMode: OutputMode, options: ClickHouseOptions) extends DataWriterFactory[InternalRow] {
override def createDataWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow] = new ClickHouseDataWriter(writeUUID, schema, batchMode, streamMode, options)
}
package cn.it.logistics.etl.realtime.ext
import java.io.Serializable
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import scala.collection.mutable.ArrayBuffer
/**
* @ClassName CKDataWriter
* @Description ClickHouse的数据写实现
*/
class ClickHouseDataWriter(writeUUID: String, schema: StructType, batchMode: SaveMode, streamMode: OutputMode, options: ClickHouseOptions) extends DataWriter[InternalRow] with Logging with Serializable {
val helper = new ClickHouseHelper(options)
val opType = options.getOpTypeField
private val sqls = ArrayBuffer[String]()
private val autoCreateTable: Boolean = options.autoCreateTable
private val init = if (autoCreateTable) {
val createSQL: String = helper.createTable(options.getFullTable, schema)
println(/**logInfo**/s"==== 初始化表SQL:$createSQL")
helper.executeUpdate(createSQL)
}
val fields = schema.fields
override def commit(): WriterCommitMessage = {
println("executeUpdateBatch:" + sqls.length)
if (sqls.length > 0) {
helper.executeUpdateBatch(sqls)
}
val batchSQL = sqls.mkString("\n")
// logDebug(batchSQL)
println(batchSQL)
new WriterCommitMessage {
override def toString: String = s"批量插入SQL: $batchSQL"
}
}
override def write(record: InternalRow): Unit = {
if(StringUtils.isEmpty(opType)) {
throw new RuntimeException("未传入opTypeField字段名称,无法确定数据持久化类型!")
}
var sqlStr: String = helper.getStatement(options.getFullTable, schema, record)
println(sqlStr)
logDebug(s"==== 拼接完成的INSERT SQL语句为:$sqlStr")
try {
if (StringUtils.isEmpty(sqlStr)) {
val msg = "==== 拼接INSERT SQL语句失败,因为该语句为NULL或EMPTY!"
logError(msg)
throw new RuntimeException(msg)
}
//Thread.sleep(options.getInterval())
// 在流处理模式下操作
if (null == batchMode) {
if (streamMode == OutputMode.Append) {
sqls += sqlStr
// val state = helper.executeUpdate(sqlStr)
// println(s"==== 在OutputMode.Append模式下执行:$sqlStr\n状态:$state")
}
else if(streamMode == OutputMode.Complete) {logError("==== 未实现OutputMode.Complete模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
else if(streamMode == OutputMode.Update) {logError("==== 未实现OutputMode.Update模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
else {logError(s"==== 未知模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
// 在批处理模式下操作
} else {
if (batchMode == SaveMode.Append) {
sqls += sqlStr
//val state = helper.executeUpdate(sqlStr)
//println(s"==== 在SaveMode.Append模式下执行:$sqlStr\n状态:$state")
}
else if(batchMode == SaveMode.Overwrite) {logError("==== 未实现SaveMode.Overwrite模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
else if(batchMode == SaveMode.ErrorIfExists) {logError("==== 未实现SaveMode.ErrorIfExists模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
else if(batchMode == SaveMode.Ignore) {logError("==== 未实现SaveMode.Ignore模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
else {logError(s"==== 未知模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
}
} catch {
case e: Exception => logError(e.getMessage)
}
}
override def abort(): Unit = {}
}
package cn.it.logistics.etl.realtime.ext
import java.sql.{Connection, Date, PreparedStatement, ResultSet, Statement}
import java.text.SimpleDateFormat
import java.util
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.javatuples.Triplet
import ru.yandex.clickhouse.domain.ClickHouseDataType
import ru.yandex.clickhouse.response.{ClickHouseResultSet, ClickHouseResultSetMetaData}
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource, ClickHouseStatement}
import ru.yandex.clickhouse.settings.ClickHouseProperties
import org.apache.spark.sql.types.{BooleanType, DataType, DataTypes, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructField, StructType}
import scala.collection.mutable.ArrayBuffer
/**
* clickHouse操作的工具类
*/
class ClickHouseHelper(options: ClickHouseOptions) extends Logging{
private val opType: String = options.getOpTypeField
private var connection: ClickHouseConnection = getConnection
private val id: String = options.getPrimaryKey
/**
* 获取Clickhouse的连接对象
*/
def getConnection = {
//获取clickhouse的连接字符串
val url: String = options.getURL
//创建clickhouseDataSource对象
val clickHouseDataSource: ClickHouseDataSource = new ClickHouseDataSource(url, new ClickHouseProperties())
//返回clickhouse的连接对象
clickHouseDataSource.getConnection
}
/**
* 返回指定表的schema信息
* @return StructType:sparkDataFrame对象的schema信息
*/
def getSparkTableSchema: StructType = {
import collection.JavaConversions._
val clickHouseTableSchema: util.LinkedList[Triplet[String, String, String]] = getClickHouseTableSchema
//println(clickHouseTableSchema)
val fileds = ArrayBuffer[StructField]()
//基于clickhouse的表的列及列的类型创建schema对象
for (trp <- clickHouseTableSchema) {
fileds += StructField(trp.getValue0, getSparkSqlType(trp.getValue1))
}
//返回structType对象,该对象就是schema
StructType(fileds)
}
/**
* 根据clickhouseTable的列及列的类型集合
*/
def getClickHouseTableSchema = {
//定义列的集合
val fileds: util.LinkedList[Triplet[String, String, String]] = new util.LinkedList[Triplet[String, String, String]]()
//查询指定的表数据,返回查询到的结果及列的信息
//定义clickhouse的connection对象
var connection: ClickHouseConnection = null
var statement: ClickHouseStatement = null
var resultSet: ClickHouseResultSet = null
var metaData: ClickHouseResultSetMetaData = null
try {
//获取connection的连接对象
connection = getConnection
statement = connection.createStatement()
//定义要操作的表的sql语句,目前我们需要的是表的字段及字段类型,而不关心表的数据,因此给定不能满足的查询条件
val sql: String = s"select * FROM ${options.getFullTable} where 1=0"
resultSet = statement.executeQuery(sql).asInstanceOf[ClickHouseResultSet]
//获取到了指定表的元数据信息
metaData = resultSet.getMetaData.asInstanceOf[ClickHouseResultSetMetaData]
val columnCount: Int = metaData.getColumnCount
for (i <- 1 to columnCount) {
val columnName: String = metaData.getColumnName(i)
val columnTypeName: String = metaData.getColumnTypeName(i)
val javaTypeName: String = ClickHouseDataType.fromTypeString(columnTypeName).getJavaClass.getSimpleName
println("columnTypeName:"+columnTypeName)
println("javaTypeName:"+javaTypeName)
fileds.add(new Triplet(columnName, columnTypeName, javaTypeName))
}
} catch {
case ex: Exception => ex.printStackTrace()
} finally {
if (statement != null) statement.close()
if (connection != null) connection.close()
}
fileds
}
def createTable(table: String, schema: StructType): String = {
val cols = ArrayBuffer[String]()
for (field <- schema.fields) {
val dataType = field.dataType
val ckColName = field.name
if (ckColName != opType) {
var ckColType = getClickhouseSqlType(dataType)
if (!StringUtils.isEmpty(ckColType)) {
if (ckColType.toLowerCase == "string") {
ckColType = "Nullable(" + ckColType + ")"
}
}
cols += ckColName + " " + ckColType
}
}
s"CREATE TABLE IF NOT EXISTS $table(${cols.mkString(",")},sign Int8,version UInt64) ENGINE=VersionedCollapsingMergeTree(sign, version) ORDER BY $id"
}
def executeUpdate(sql: String): Int = {
var state = 0;
var st: ClickHouseStatement = null;
try {
if (null == connection || connection.isClosed) {
connection = getConnection
}
st = connection createStatement()
state = st.executeUpdate(sql)
} catch {
case e: Exception => logError(s"执行异常:$sql\n${e.getMessage}")
} finally {
//closeAll(connection, st)
}
state
}
def executeUpdateBatch(sqlArray: ArrayBuffer[String]) = {
//sql操作包括了insert、update、delete
var batchSQL: StringBuffer = new StringBuffer()
var statement: ClickHouseStatement = null
try {
statement = clickHouseConnection.createStatement()
if (clickHouseConnection == null || clickHouseConnection.isClosed) {
clickHouseConnection = getConnection
}
//insert语句是否出现过
var insertFlag: Boolean = false
for (i <- 0 until sqlArray.length) {
val line: String = sqlArray(i)
//插入操作
if (line.toLowerCase.contains("insert") && line.toLowerCase.contains("values")) {
//找到values这个关键字出现的位置
val offset: Int = line.indexOf("VALUES")
if (!insertFlag) {
//第一次出现insert的关键字
//截取字符串,根据values这个关键字截取:INSERT INTO tbl_areas(citycode,id,lat,level,lng,mername,name,pid,pinyin,sname,yzcode,sign,version) VALUES
val prefix: String = line.substring(0, offset + 6)
batchSQL.append(prefix)
}
//截取插入操作sql的后缀
val suffix: String = line.substring(offset + 6)
batchSQL.append(suffix)
insertFlag = true
}
else if (line.toLowerCase.contains("update")) { //更新操作
//如果更新操作出现之前,已经出现了插入操作
if (insertFlag) {
println("拼接好的批量更新操作的sql语句:" + batchSQL.toString)
statement.executeUpdate(batchSQL.toString)
}
println("单条更新操作的sql语句:" + line)
statement.executeUpdate(line)
batchSQL = new StringBuffer()
insertFlag = false
} else if (line.toLowerCase().contains("delete")) { //删除操作
//如果删除操作出现之前,已经出现了插入操作
if (insertFlag) {
println("拼接好的批量更新操作的sql语句:" + batchSQL.toString)
statement.executeUpdate(batchSQL.toString)
}
println("单条删除操作的sql语句:" + line)
statement.executeUpdate(line)
batchSQL = new StringBuffer()
insertFlag = false
}
//如果只有插入操作则完成数据的写入
if (!batchSQL.toString.isEmpty() && i == sqlArray.length - 1) {
println("拼接好的批量更新操作的sql语句:" + batchSQL.toString)
statement.executeUpdate(batchSQL.toString)
}
}
} catch {
case ex: Exception => ex.printStackTrace()
}
}
def getStatement(table: String, schema: StructType, record: InternalRow): String = {
val opTypeValue: String = getFieldValue(opType, schema, record).toString
if (opTypeValue.toLowerCase() == "insert") {
getInsertStatement(table, schema, record)
}
else if (opTypeValue.toLowerCase() == "delete") {
getUpdateStatement(table, schema, record)
}
else if (opTypeValue.toLowerCase() == "update") {
getDeleteStatement(table, schema, record)
}
else {
""
}
}
def getSelectStatement(schema: StructType): String = {
s"SELECT ${schema.fieldNames.mkString(",")} FROM ${options.getFullTable}"
}
def getInsertStatement(table: String, schema: StructType, data: InternalRow): String = {
val fields = schema.fields
val names = ArrayBuffer[String]()
val values = ArrayBuffer[String]()
// // 表示DataFrame中的字段与数据库中的字段相同,拼接SQL语句时使用全量字段拼接
// if (data.numFields == fields.length) {
// } else { // 表示DataFrame中的字段与数据库中的字段不同,拼接SQL时需要仅拼接DataFrame中有的字段到SQL中
// }
for (i <- 0 until fields.length) {
val field = fields(i)
val fieldType = field.dataType
val fieldName = field.name
if (fieldName != opType) {
val fieldValue = fieldType match {
case DataTypes.BooleanType => if (data.isNullAt(i)) "NULL" else s"${data.getBoolean(i)}"
case DataTypes.DoubleType => if (data.isNullAt(i)) "NULL" else s"${data.getDouble(i)}"
case DataTypes.FloatType => if (data.isNullAt(i)) "NULL" else s"${data.getFloat(i)}"
case DataTypes.IntegerType => if (data.isNullAt(i)) "NULL" else s"${data.getInt(i)}"
case DataTypes.LongType => if (data.isNullAt(i)) "NULL" else s"${data.getLong(i)}"
case DataTypes.ShortType => if (data.isNullAt(i)) "NULL" else s"${data.getShort(i)}"
case DataTypes.StringType => if (data.isNullAt(i)) "NULL" else s"'${data.getUTF8String(i).toString.trim}'"
case DataTypes.DateType => if (data.isNullAt(i)) "NULL" else s"'${new SimpleDateFormat("yyyy-MM-dd").format(new Date(data.get(i, DateType).asInstanceOf[Date].getTime / 1000))}'"
case DataTypes.TimestampType => if (data.isNullAt(i)) "NULL" else s"'${new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(data.getLong(i) / 1000))}'"
case DataTypes.BinaryType => if (data.isNullAt(i)) "NULL" else s"${data.getBinary(i)}"
case DataTypes.NullType => "NULL"
}
names += fieldName
values += fieldValue
}
}
if (names.length > 0 && values.length > 0) {
names += ("sign", "version")
values += ("1", System.currentTimeMillis().toString)
}
val strSql = s"INSERT INTO $table(${names.mkString(",")}) VALUES(${values.mkString(",")})"
strSql
}
def getDeleteStatement(table: String, schema: StructType, data: InternalRow): String = {
val fields = schema.fields
val primaryKeyFields = if (options.getPrimaryKey.isEmpty) {
fields.filter(field => field.name == "id")
} else {
fields.filter(field => field.name == options.getPrimaryKey)
}
if (primaryKeyFields.length > 0) {
val primaryKeyField = primaryKeyFields(0)
val primaryKeyValue = getFieldValue(primaryKeyField.name, schema, data)
s"ALTER TABLE $table DELETE WHERE ${primaryKeyField.name} = $primaryKeyValue"
} else {
logError("==== 找不到主键,无法生成删除SQL!")
""
}
}
def getUpdateStatement(table: String, schema: StructType, data: InternalRow): String = {
val fields = schema.fields
val primaryKeyFields = if (options.getPrimaryKey.isEmpty) {
fields.filter(field => field.name == "id")
} else {
fields.filter(field => field.name == options.getPrimaryKey)
}
if (primaryKeyFields.length > 0) {
val primaryKeyField = primaryKeyFields(0)
val primaryKeyValue = getFieldValue(primaryKeyField.name, schema, data)
val noPrimaryKeyFields = fields.filter(field => field.name != primaryKeyField.name)
var sets = ArrayBuffer[String]()
for (i <- 0 until noPrimaryKeyFields.length) {
val noPrimaryKeyField = noPrimaryKeyFields(i)
val set = noPrimaryKeyField.name + "=" + getFieldValue(noPrimaryKeyField.name, schema, data).toString
sets += set
}
sets.remove(sets.length - 1)
s"ALTER TABLE $table UPDATE ${sets.mkString(" AND ")} WHERE ${primaryKeyField.name}=$primaryKeyValue"
} else {
logError("==== 找不到主键,无法生成修改SQL!")
""
}
}
private def getFieldValue(fieldName: String, schema: StructType, data: InternalRow): Any = {
var flag = true
var fieldValue: String = null
val fields = schema.fields
for (i <- 0 until fields.length if flag) {
val field = fields(i)
if (fieldName == field.name) {
fieldValue = field.dataType match {
case DataTypes.BooleanType => if (data.isNullAt(i)) "NULL" else s"${data.getBoolean(i)}"
case DataTypes.DoubleType => if (data.isNullAt(i)) "NULL" else s"${data.getDouble(i)}"
case DataTypes.FloatType => if (data.isNullAt(i)) "NULL" else s"${data.getFloat(i)}"
case DataTypes.IntegerType => if (data.isNullAt(i)) "NULL" else s"${data.getInt(i)}"
case DataTypes.LongType => if (data.isNullAt(i)) "NULL" else s"${data.getLong(i)}"
case DataTypes.ShortType => if (data.isNullAt(i)) "NULL" else s"${data.getShort(i)}"
case DataTypes.StringType => if (data.isNullAt(i)) "NULL" else s"${data.getUTF8String(i).toString.trim}"
case DataTypes.DateType => if (data.isNullAt(i)) "NULL" else s"'${new SimpleDateFormat("yyyy-MM-dd").format(new Date(data.get(i, DateType).asInstanceOf[Date].getTime / 1000))}'"
case DataTypes.TimestampType => if (data.isNullAt(i)) "NULL" else s"${new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(data.getLong(i) / 1000))}"
case DataTypes.BinaryType => if (data.isNullAt(i)) "NULL" else s"${data.getBinary(i)}"
case DataTypes.NullType => "NULL"
}
flag = false
}
}
fieldValue
}
def closeAll(connection: Connection = null, st: Statement = null, ps: PreparedStatement = null, rs: ResultSet = null): Unit = {
try {
if (rs != null && !rs.isClosed) rs.close()
if (ps != null && !ps.isClosed) ps.close()
if (st != null && !st.isClosed) st.close()
if (connection != null && !connection.isClosed) connection.close()
} catch {
case e: Exception => e.printStackTrace()
}
}
/**
* IntervalYear (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalQuarter (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalMonth (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalWeek (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalDay (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalHour (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalMinute (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalSecond (Types.INTEGER, Integer.class, true, 19, 0),
* UInt64 (Types.BIGINT, BigInteger.class, false, 19, 0),
* UInt32 (Types.INTEGER, Long.class, false, 10, 0),
* UInt16 (Types.SMALLINT, Integer.class, false, 5, 0),
* UInt8 (Types.TINYINT, Integer.class, false, 3, 0),
* Int64 (Types.BIGINT, Long.class, true, 20, 0, "BIGINT"),
* Int32 (Types.INTEGER, Integer.class, true, 11, 0, "INTEGER", "INT"),
* Int16 (Types.SMALLINT, Integer.class, true, 6, 0, "SMALLINT"),
* Int8 (Types.TINYINT, Integer.class, true, 4, 0, "TINYINT"),
* Date (Types.DATE, Date.class, false, 10, 0),
* DateTime (Types.TIMESTAMP, Timestamp.class, false, 19, 0, "TIMESTAMP"),
* Enum8 (Types.VARCHAR, String.class, false, 0, 0),
* Enum16 (Types.VARCHAR, String.class, false, 0, 0),
* Float32 (Types.FLOAT, Float.class, true, 8, 8, "FLOAT"),
* Float64 (Types.DOUBLE, Double.class, true, 17, 17, "DOUBLE"),
* Decimal32 (Types.DECIMAL, BigDecimal.class, true, 9, 9),
* Decimal64 (Types.DECIMAL, BigDecimal.class, true, 18, 18),
* Decimal128 (Types.DECIMAL, BigDecimal.class, true, 38, 38),
* Decimal (Types.DECIMAL, BigDecimal.class, true, 0, 0, "DEC"),
* UUID (Types.OTHER, UUID.class, false, 36, 0),
* String (Types.VARCHAR, String.class, false, 0, 0, "LONGBLOB", "MEDIUMBLOB", "TINYBLOB", "MEDIUMTEXT", "CHAR", "VARCHAR", "TEXT", "TINYTEXT", "LONGTEXT", "BLOB"),
* FixedString (Types.CHAR, String.class, false, -1, 0, "BINARY"),
* Nothing (Types.NULL, Object.class, false, 0, 0),
* Nested (Types.STRUCT, String.class, false, 0, 0),
* Tuple (Types.OTHER, String.class, false, 0, 0),
* Array (Types.ARRAY, Array.class, false, 0, 0),
* AggregateFunction (Types.OTHER, String.class, false, 0, 0),
* Unknown (Types.OTHER, String.class, false, 0, 0);
*
* @param clickhouseDataType
* @return
*/
private def getSparkSqlType(clickhouseDataType: String) = clickhouseDataType match {
case "IntervalYear" => DataTypes.IntegerType
case "IntervalQuarter" => DataTypes.IntegerType
case "IntervalMonth" => DataTypes.IntegerType
case "IntervalWeek" => DataTypes.IntegerType
case "IntervalDay" => DataTypes.IntegerType
case "IntervalHour" => DataTypes.IntegerType
case "IntervalMinute" => DataTypes.IntegerType
case "IntervalSecond" => DataTypes.IntegerType
case "UInt64" => DataTypes.LongType //DataTypes.IntegerType;
case "UInt32" => DataTypes.LongType
case "UInt16" => DataTypes.IntegerType
case "UInt8" => DataTypes.IntegerType
case "Int64" => DataTypes.LongType
case "Int32" => DataTypes.IntegerType
case "Int16" => DataTypes.IntegerType
case "Int8" => DataTypes.IntegerType
case "Date" => DataTypes.DateType
case "DateTime" => DataTypes.TimestampType
case "Enum8" => DataTypes.StringType
case "Enum16" => DataTypes.StringType
case "Float32" => DataTypes.FloatType
case "Float64" => DataTypes.DoubleType
case "Decimal32" => DataTypes.createDecimalType
case "Decimal64" => DataTypes.createDecimalType
case "Decimal128" => DataTypes.createDecimalType
case "Decimal" => DataTypes.createDecimalType
case "UUID" => DataTypes.StringType
case "String" => DataTypes.StringType
case "FixedString" => DataTypes.StringType
case "Nothing" => DataTypes.NullType
case "Nested" => DataTypes.StringType
case "Tuple" => DataTypes.StringType
case "Array" => DataTypes.StringType
case "AggregateFunction" => DataTypes.StringType
case "Unknown" => DataTypes.StringType
case _ => DataTypes.NullType
}
private def getClickhouseSqlType(sparkDataType: DataType) = sparkDataType match {
case DataTypes.ByteType => "Int8"
case DataTypes.ShortType => "Int16"
case DataTypes.IntegerType => "Int32"
case DataTypes.FloatType => "Float32"
case DataTypes.DoubleType => "Float64"
case DataTypes.LongType => "Int64"
case DataTypes.DateType => "DateTime"
case DataTypes.TimestampType => "DateTime"
case DataTypes.StringType => "String"
case DataTypes.NullType => "String"
}
}
/** 批处理方式下的数据写入 */
override def createWriter(writeUUID: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = Optional.of(new ClickHouseWriter(writeUUID, schema, mode, null, new ClickHouseOptions(options.asMap())))
/** 流处理方式下的数据写入 */
override def createStreamWriter(queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamWriter = new ClickHouseWriter(queryId, schema, null, mode, new ClickHouseOptions(options.asMap()))
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。