首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >使用结构化火花流在HBase中大容量插入数据

使用结构化火花流在HBase中大容量插入数据
EN

Stack Overflow用户
提问于 2019-05-24 21:49:54
回答 1查看 1.4K关注 0票数 4

我正在使用结构化火花流读取来自Kafka (每秒100.000行)的数据,并尝试将所有数据插入到HBase中。

我使用的是Cloudera Hadoop 2.6,我使用的是Spark 2.3

我试过了就像我见过here一样。

代码语言:javascript
复制
eventhubs.writeStream
 .foreach(new MyHBaseWriter[Row])
 .option("checkpointLocation", checkpointDir)
 .start()
 .awaitTermination()

MyHBaseWriter看起来像这样:

代码语言:javascript
复制
class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
  override def toPut(record: Row): Put = {
    override val tableName: String = "hbase-table-name"

    override def toPut(record: Row): Put = {
        // Get Json
        val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
        val key = data.getOrElse(Map())("key")+ ""
        val val = data.getOrElse(Map())("val")+ ""

        val p = new Put(Bytes.toBytes(key))
        //Add columns ... 
        p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))

        p
     }
    }

HBaseForeachWriter类如下所示:

代码语言:javascript
复制
trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
  val tableName: String

  def pool: Option[ExecutorService] = None

  def user: Option[User] = None

  private var hTable: Table = _
  private var connection: Connection = _


  override def open(partitionId: Long, version: Long): Boolean = {
    connection = createConnection()
    hTable = getHTable(connection)
    true
  }

  def createConnection(): Connection = {
    // I create HBase Connection Here
  }

  def getHTable(connection: Connection): Table = {
    connection.getTable(TableName.valueOf(Variables.getTableName()))
  }

  override def process(record: RECORD): Unit = {
    val put = toPut(record)
    hTable.put(put)
  }

  override def close(errorOrNull: Throwable): Unit = {
    hTable.close()
    connection.close()
  }

  def toPut(record: RECORD): Put
}

所以我在这里逐行放入,即使我允许20个执行器和每个执行器有4个核心,我也不会将数据立即插入到HBase中。所以我需要做的是批量加载,但我很挣扎,因为我在互联网上找到的所有东西都是用RDDs和Map/Reduce实现的。

EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56294039

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档