前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark查询Hbase小案例

Spark查询Hbase小案例

作者头像
CBeann
发布2023-12-25 18:40:57
1460
发布2023-12-25 18:40:57
举报
文章被收录于专栏:CBeann的博客CBeann的博客

写作目的

1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看

2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下

3)好久没发博客了,水一篇

版本

Scala 2.11.1

Spark 2.11

HBase 2.0.5

代码

其中hbase-site.xml为hbase安装目录下/hbase/conf里的hbase-site.xml

pom依赖

代码语言:javascript
复制
 <properties>
        <mysql.version>6.0.5</mysql.version>
        <spring.version>4.3.6.RELEASE</spring.version>
        <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
        <log4j.version>1.2.17</log4j.version>
        <quartz.version>2.2.3</quartz.version>
        <slf4j.version>1.7.22</slf4j.version>
        <hibernate.version>5.2.6.Final</hibernate.version>
        <camel.version>2.18.2</camel.version>
        <config.version>1.10</config.version>
        <jackson.version>2.8.6</jackson.version>
        <servlet.version>3.0.1</servlet.version>
        <net.sf.json.version>2.4</net.sf.json.version>
        <activemq.version>5.14.3</activemq.version>
        <spark.version>2.1.1</spark.version>
        <scala.version>2.11.11</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>
<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.7</version>
        </dependency>


        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>

    </dependencies>

查询

查全表
代码语言:javascript
复制
package com.bjfu.spark.demo.hbasedemo

import com.google.common.collect.Table.Cell
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author chaird
  * @create 2020-11-13 22:04
  */
object HBaseDemo {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseDemo")

    //创建Spark上下文对象
    val sc = new SparkContext(config)
    //创建HBase配置对象
    val conf: Configuration = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, "EcodataFei")

    //全表扫描
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result])
    hbaseRDD.foreach {
      //第一种展示方式
      case (rowkey, result) => {
        val cells: Array[hbase.Cell] = result.rawCells()
        for (cell <- cells) {
          println(Bytes.toString(CellUtil.cloneValue(cell)))
        }
      }


      第二种展示方式
      //      case (rowkey,result)=>{
      //        //rowKey
      //        val key: String = Bytes.toString(result.getRow)
      //        //列族,列,值
      //        val value: String = Bytes.toString(result.getValue(Bytes.toBytes("datatype"),Bytes.toBytes("288")))
      //        println("rowKey:"+key+"  "+"value:"+value)
      //
      //      }
    }





    //释放资源
    sc.stop()
  }

}
根据rowKey查询
代码语言:javascript
复制
package com.bjfu.spark.demo.hbasedemo

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HTable, Scan}
import org.apache.hadoop.hbase.filter.FilterList.Operator
import org.apache.hadoop.hbase.filter._
import java.util

import org.apache.hadoop.hbase.protobuf.ProtobufUtil

/**
  * @author chaird
  * @create 2020-11-14 20:39
  */
object HBaseConditionDemo {
  def main(args: Array[String]): Unit = {


    //创建Spark上下文对象
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseDemo")
    val sc = new SparkContext(config)

    //创建HBaseConf
    val hbaseConf: Configuration = HBaseConfiguration.create()
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "EcodataFei")


    //创建过滤器(主键)
    val scan = new Scan()
    val rowkeys = List("17-2020-41-12 17:09", "2-2020-37-12 17:09")
    val filters = new util.ArrayList[Filter]()
    for (cookieid <- rowkeys) {
      val filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(cookieid)))
      filters.add(filter)
    }
    val filterList = new FilterList(Operator.MUST_PASS_ONE, filters)
    scan.setFilter(filterList)

    hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))
    val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])


    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result])
    hbaseRDD.foreach {

      case (rowkey, result) => {
        //rowKey
        val key: String = Bytes.toString(result.getRow)
        //列族,列,值
        val value: String = Bytes.toString(result.getValue(Bytes.toBytes("datatype"), Bytes.toBytes("288")))
        println("rowKey:" + key + "  " + "value:" + value)

      }
    }

    //释放资源
    sc.stop()
  }

  def convertScanToString(scan: Scan) = {
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray)
  }


}
将结果集保存下来
代码语言:javascript
复制
 val list: List[String] = hbaseRDD.map(
      x => (Bytes.toString(x._2.getValue(Bytes.toBytes("datatype"), Bytes.toBytes("customer_id"))))).collect().toList
    list.foreach(println(_))

参考

hbase根据rowkey多个值过滤查询(scala环境)_裴大帅2021_新浪博客

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写作目的
  • 版本
  • 代码
    • pom依赖
      • 查全表
      • 根据rowKey查询
      • 将结果集保存下来
  • 查询
  • 参考
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档