前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark操作Hbase表

spark操作Hbase表

作者头像
Tim在路上
发布2020-08-04 21:40:41
8880
发布2020-08-04 21:40:41
举报
1. 创建conf和table
代码语言:javascript
复制
var tableName = "httpsystem_dev" 
val conf= HBaseConfiguration.create()
//设置要查询的表
conf.set(TableInputFormat.INPUT_TABLE,tableName)
val table = new HTable(conf,tableName)
2. 通过SparkAPI读取数据
代码语言:javascript
复制
val hbaseRDD = sc.newAPIHadoopRDD(hbaseConfiguration, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
        classOf[org.apache.hadoop.hbase.client.Result])

返回的数据是一个ImmutableBytesWritable,和一个result组成的二元组,result就是一个列表

3. 通过扫描设置相查询数据
代码语言:javascript
复制
var scan = new Scan()
      scan.addFamily(Bytes.toBytes("0"))
      scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("ML_rule_juge_id"))
      scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_mal"))
      scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_type"))
      scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_mal"))
      scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_type"))

      //spark读取hbase转换rdd
      var proto = ProtobufUtil.toScan(scan)
      var scanToString = Base64.encodeBytes(proto.toByteArray)
      hbaseConfiguration.set(TableInputFormat.SCAN, scanToString)
4. 将RDD转换为Df
代码语言:javascript
复制
      //rdd返回df
      var rdd = hbaseRDD.map(new org.apache.spark.api.java.function.Function[(ImmutableBytesWritable, Result), Row] {
        override def call(v1: (ImmutableBytesWritable, Result)): Row = {
          var result: Result = v1._2
          var rowkey: String = Bytes.toString(result.getRow)
          var ML_juge_type: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_type")))
          var ML_rule_juge_id: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("ML_rule_juge_id")))
          var ML_juge_mal: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_mal")))
          var DLCNN_juge_type: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_type")))
          var DLCNN_juge_mal: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_mal")))

          RowFactory.create(rowkey, ML_rule_juge_id, ML_juge_mal, ML_juge_type, DLCNN_juge_mal, DLCNN_juge_type)
        }
      })

      //创建df
      var df = sparkSession.createDataFrame(rdd, HttpParingSchema.struct)
5.数据的写入
代码语言:javascript
复制
val put = new Put(Bytes.toBytes("rowKey"))
put.add("cf","q","value")

批量写入

代码语言:javascript
复制
val rdd = sc.textFile("/data/produce/2015/2015-03-01.log") v
al data = rdd.map(_.split("\t")).map{x=>(x(0)+x(1),x(2))} 
val result = data.foreachPartition{x => {
val conf= HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE,"data");
conf.set("hbase.zookeeper.quorum","slave5,slave6,slave7");
conf.set("hbase.zookeeper.property.clientPort","2181");
conf.addResource("/home/hadoop/data/lib/hbase-site.xml");
val table = new HTable(conf,"data");
table.setAutoFlush(false,false);
table.setWriteBufferSize(3*1024*1024);
x.foreach{y => { var put= new Put(Bytes.toBytes(y._1));
put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(y._2));table.put(put)
};
table.flushCommits}}}
6.使用Bulkload插入数据
代码语言:javascript
复制
val conf = HBaseConfiguration.create(); 
val tableName = "data1" val table = new HTable(conf,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName) 
lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad(job,table) 
val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));
(new ImmutableBytesWritable(kv.getKey),kv)}}
rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration()) 
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 创建conf和table
  • 2. 通过SparkAPI读取数据
  • 3. 通过扫描设置相查询数据
  • 4. 将RDD转换为Df
  • 5.数据的写入
  • 6.使用Bulkload插入数据
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档