我正在使用结构化火花流读取来自Kafka (每秒100.000行)的数据,并尝试将所有数据插入到HBase中。
我使用的是Cloudera Hadoop 2.6,我使用的是Spark 2.3
我试过了就像我见过here一样。
eventhubs.writeStream
.foreach(new MyHBaseWriter[Row])
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()
MyHBaseWriter看起来像这样:
class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
override def toPut(record: Row): Put = {
override val tableName: String = "hbase-table-name"
override def toPut(record: Row): Put = {
// Get Json
val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
val key = data.getOrElse(Map())("key")+ ""
val val = data.getOrElse(Map())("val")+ ""
val p = new Put(Bytes.toBytes(key))
//Add columns ...
p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))
p
}
}
HBaseForeachWriter类如下所示:
trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
val tableName: String
def pool: Option[ExecutorService] = None
def user: Option[User] = None
private var hTable: Table = _
private var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
connection = createConnection()
hTable = getHTable(connection)
true
}
def createConnection(): Connection = {
// I create HBase Connection Here
}
def getHTable(connection: Connection): Table = {
connection.getTable(TableName.valueOf(Variables.getTableName()))
}
override def process(record: RECORD): Unit = {
val put = toPut(record)
hTable.put(put)
}
override def close(errorOrNull: Throwable): Unit = {
hTable.close()
connection.close()
}
def toPut(record: RECORD): Put
}
所以我在这里逐行放入,即使我允许20个执行器和每个执行器有4个核心,我也不会将数据立即插入到HBase中。所以我需要做的是批量加载,但我很挣扎,因为我在互联网上找到的所有东西都是用RDDs和Map/Reduce实现的。
https://stackoverflow.com/questions/56294039
复制相似问题