首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >使用结构化火花流在HBase中大容量插入数据

使用结构化火花流在HBase中大容量插入数据
EN

Stack Overflow用户
提问于 2019-05-24 21:49:54
回答 1查看 1.4K关注 0票数 4

我正在使用结构化火花流读取来自Kafka (每秒100.000行)的数据,并尝试将所有数据插入到HBase中。

我使用的是Cloudera Hadoop 2.6,我使用的是Spark 2.3

我试过了就像我见过here一样。

代码语言:javascript
复制
eventhubs.writeStream
 .foreach(new MyHBaseWriter[Row])
 .option("checkpointLocation", checkpointDir)
 .start()
 .awaitTermination()

MyHBaseWriter看起来像这样:

代码语言:javascript
复制
class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
  override def toPut(record: Row): Put = {
    override val tableName: String = "hbase-table-name"

    override def toPut(record: Row): Put = {
        // Get Json
        val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
        val key = data.getOrElse(Map())("key")+ ""
        val val = data.getOrElse(Map())("val")+ ""

        val p = new Put(Bytes.toBytes(key))
        //Add columns ... 
        p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))

        p
     }
    }

HBaseForeachWriter类如下所示:

代码语言:javascript
复制
trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
  val tableName: String

  def pool: Option[ExecutorService] = None

  def user: Option[User] = None

  private var hTable: Table = _
  private var connection: Connection = _


  override def open(partitionId: Long, version: Long): Boolean = {
    connection = createConnection()
    hTable = getHTable(connection)
    true
  }

  def createConnection(): Connection = {
    // I create HBase Connection Here
  }

  def getHTable(connection: Connection): Table = {
    connection.getTable(TableName.valueOf(Variables.getTableName()))
  }

  override def process(record: RECORD): Unit = {
    val put = toPut(record)
    hTable.put(put)
  }

  override def close(errorOrNull: Throwable): Unit = {
    hTable.close()
    connection.close()
  }

  def toPut(record: RECORD): Put
}

所以我在这里逐行放入,即使我允许20个执行器和每个执行器有4个核心,我也不会将数据立即插入到HBase中。所以我需要做的是批量加载,但我很挣扎,因为我在互联网上找到的所有东西都是用RDDs和Map/Reduce实现的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-05-25 02:27:33

据我所知,hbase接收记录的速度很慢。我对你有几点建议。

1) r

下面的属性可能会对您有所帮助。

hbase.client.write.buffer

说明 BufferedMutator写入缓冲区的默认大小,以字节为单位。更大的缓冲区在客户端和服务器端都需要更多的内存 - ,因为服务器实例化传递的写缓冲区来处理它的 - ,但是更大的缓冲区大小会减少产生的RPC的数量。有关服务器端内存使用量的估计,请评估hbase.client.write.buffer * hbase.regionserver.handler.count

默认2097152 (大约2 mb )

我更喜欢foreachBatch see spark docs (它是spark core中的一种foreachPartition )而不是foreach

还可以在你的hbase编写器中扩展ForeachWriter

open方法初始化put in process的数组列表,将put添加到close table.put(listofputs);中的put数组列表,然后在更新表后重置数组列表...

它所做的基本上就是上面提到的缓冲区大小是用2MB填充的,然后它会刷新到hbase表中。在此之前,记录不会进入hbase表。

你可以把它增加到10mb,这样....这样,RPC的数量将会减少。巨大的数据块将被刷新,并将在hbase表中。

当写缓冲区被填满并触发到hbase表的flushCommits时。

示例代码:在我的answer

2)关闭WAL您可以关闭WAL(预写日志危险是不可恢复的),但它会加快写入速度……如果不想恢复数据。

备注:如果您在hbase表上使用solr或cloudera搜索,则不应将其关闭,因为Solr将在WAL上工作。如果将其关闭,Solr索引将不起作用。这是我们很多人都会犯的一个常见错误。

如何关闭: https://hbase.apache.org/1.1/apidocs/org/apache/hadoop/hbase/client/Put.html#setWriteToWAL(boolean)

进一步研究的

  • 基本架构和link

正如我提到的,看跌期权列表是一种很好的方法。这是在结构化流式处理之前的旧方法(带有puts列表的foreachPartition),示例如下所示。其中foreachPartition针对每个分区进行操作,而不是针对每一行。

代码语言:javascript
复制
def writeHbase(mydataframe: DataFrame) = {
      val columnFamilyName: String = "c"
      mydataframe.foreachPartition(rows => {
        val puts = new util.ArrayList[ Put ]
        rows.foreach(row => {
          val key = row.getAs[ String ]("rowKey")
          val p = new Put(Bytes.toBytes(key))
          val columnV = row.getAs[ Double ]("x")
          val columnT = row.getAs[ Long ]("y")
          p.addColumn(
            Bytes.toBytes(columnFamilyName),
            Bytes.toBytes("x"),
            Bytes.toBytes(columnX)
          )
          p.addColumn(
            Bytes.toBytes(columnFamilyName),
            Bytes.toBytes("y"),
            Bytes.toBytes(columnY)
          )
          puts.add(p)
        })
        HBaseUtil.putRows(hbaseZookeeperQuorum, hbaseTableName, puts)
      })
    }

到sumup :

我的感觉是我们需要了解spark和hbase的心理学,才能使他们成为有效的组合。

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56294039

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档