前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark集成hbase与hive数据转换与代码练习

spark集成hbase与hive数据转换与代码练习

作者头像
用户3003813
发布2018-09-06 14:28:58
4630
发布2018-09-06 14:28:58
举报
文章被收录于专栏:个人分享个人分享

  帮一个朋友写个样例,顺便练手啦~一直在做平台的各种事,但是代码后续还要精进啊。。。

代码语言:javascript
复制
 1 import java.util.Date
 2 
 3 import org.apache.hadoop.hbase.HBaseConfiguration
 4 import org.apache.hadoop.hbase.client.{Put, Scan, Result}
 5 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 6 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 7 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 8 import org.apache.hadoop.hbase.util.Bytes
 9 import org.apache.hadoop.mapred.JobConf
10 import org.apache.log4j.{Level, Logger}
11 import org.apache.spark.rdd.RDD
12 import org.apache.spark.sql.DataFrame
13 import org.apache.spark.sql.hive.HiveContext
14 import org.apache.spark.{SparkContext, SparkConf}
15 
16 /**
17  * Created by ysy on 2/10/17.
18  */
19 object test {
20 
21     case class ysyTest(LS_certifier_no: String,loc: String,LS_phone_no: String)
22 
23     def main (args: Array[String]) {
24       val sparkConf = new SparkConf().setMaster("local").setAppName("ysy").set("spark.executor.memory", "1g")
25       val sc = new SparkContext(sparkConf)
26       val sqlContext = new HiveContext(sc)
27       sqlContext.sql("drop table pkq")
28       val columns = "LS_certifier_no,LS_location,LS_phone_no"
29       val hbaseRDD = dataInit(sc,"EVENT_LOG_LBS",columns).map(data =>{
30         val id =Bytes.toString(data._2.getValue("f1".getBytes, "LS_certifier_no".getBytes))
31         val loc = Bytes.toString(data._2.getValue("f1".getBytes, "LS_location".getBytes))
32         val phone = Bytes.toString(data._2.getValue("f1".getBytes, "LS_phone_no".getBytes))
33         (id,loc,phone)
34       })
35       val showData = hbaseRDD.foreach(println)
36       val datas = hbaseRDD.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null)
37       val hiveDF = initHiveTableFromHbase(sc:SparkContext,sqlContext,datas)
38       writeHiveTableToHbase(sc,hiveDF)
39 
40 
41     }
42 
43   def initHiveTableFromHbase(sc:SparkContext,sqlContext: HiveContext,hiveRDD:RDD[(String,String,String)]) : DataFrame = {
44     val hRDD = hiveRDD.map(p => ysyTest(p._1,p._2,p._3))
45       val hiveRDDSchema = sqlContext.createDataFrame(hiveRDD)
46       hiveRDDSchema.registerTempTable("pkq")
47       hiveRDDSchema.show(10)
48       hiveRDDSchema
49   }
50 
51   def dataInit(sc : SparkContext,tableName : String,columns : String) : RDD[(ImmutableBytesWritable,Result)] = {
52     val configuration = HBaseConfiguration.create()
53     configuration.addResource("hbase-site.xml")
54     configuration.set(TableInputFormat.INPUT_TABLE,tableName )
55     val scan = new Scan
56     val column = columns.split(",")
57     for(columnName <- column){
58       scan.addColumn("f1".getBytes(),columnName.getBytes())
59     }
60     val hbaseRDD = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
61     System.out.println(hbaseRDD.count())
62     hbaseRDD
63   }
64 
65   def writeHiveTableToHbase(sc : SparkContext,hiveDF : DataFrame) = {
66     val configuration = HBaseConfiguration.create()
67     configuration.addResource("hbase-site.xml ")
68     configuration.set(TableOutputFormat.OUTPUT_TABLE,"EVENT_LOG_LBS")
69     val jobConf = new JobConf(configuration)
70     jobConf.setOutputFormat(classOf[TableOutputFormat])
71 
72     val putData = hiveDF.map(data =>{
73       val LS_certifier_no = data(0)
74       val LS_location = data(1)
75       val LS_phone_no = data(2)
76       (LS_certifier_no,LS_location,LS_phone_no)
77     })
78 
79     val rdd = putData.map(datas =>{
80       val put = new Put(Bytes.toBytes(Math.random()))
81       put.addColumn("f1".getBytes(),"LS_certifier_no".getBytes(),Bytes.toBytes(datas._1.toString))
82       put.addColumn("f1".getBytes(),"LS_location".getBytes(),Bytes.toBytes(datas._2.toString))
83       put.addColumn("f1".getBytes(),"LS_phone_no".getBytes(),Bytes.toBytes(datas._3.toString))
84       (new ImmutableBytesWritable, put)
85     })
86     val showRdd = rdd.foreach(println)
87     rdd.saveAsHadoopDataset(jobConf)
88   }
89 
90   }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-02-12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档