前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark从hbase读数据到存入hbase数据两种版本写法

spark从hbase读数据到存入hbase数据两种版本写法

作者头像
gzq大数据
发布2021-04-19 18:01:04
7740
发布2021-04-19 18:01:04
举报
文章被收录于专栏:大数据那些事大数据那些事

spark2版本:

代码语言:javascript
复制
object SparkCoreTest {
  def main(args: Array[String]): Unit = {
    // 使用sparksession来创建对象
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    // 设置读表和写表
    val readTable: String = "hydrogenation_flow_record"
    val writeTable: String = "test200"

    // 创建hbase输入的配置文件,并且把服务器上的hbase-site放进resources目录下
    val hBaseConfRead: Configuration = HBaseConfiguration.create()
    // inputtable代表是读数据的配置
    hBaseConfRead.set(TableInputFormat.INPUT_TABLE, readTable)


    //配置写入表,要定义一个Jobconf,与读表不同
    val hBaseConfWrite: Configuration = HBaseConfiguration.create()
    val jobConf = new JobConf(hBaseConfWrite)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, writeTable);

    // 创建hadooprdd算子,出来的rdd为一个元组对象,第一个元素类型为ImmutableBytesWritable,所以写入时也要转成同样的转子
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(hBaseConfRead, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])
    // 导入toDF变成dataframe的隐式依赖,让下面可以用toDF方法
    import spark.implicits._

    val sps: DataFrame = hbaseRDD.map(r => (
      Bytes.toString(r._2.getValue(Bytes.toBytes("SPSFlowInfo"), Bytes.toBytes("SPSFlowTotal"))),
      Bytes.toString(r._2.getRow)
    )).toDF("SPSFlowTotal", "row")
    // 创建出来的dataframe进行命名
    sps.createOrReplaceTempView("sps")
    // 执行sql语句
    val frame: DataFrame = spark.sql("SELECT sum(SPSFlowTotal) as A FROM sps WHERE row BETWEEN '4000069:1618539744390' and '4000069:1618539744426'")
    // 将查到的数据组装成元组类型,元组的第一个为qualifier,元组的第二个是从dataframe里读到的数据
    val tupleDS: Dataset[(String, String)] = frame.map(t => ("SPSFlowTotal", t(0).toString))
    // 配置输出到hbase的rdd,新建一个put,第一个为row,第二个为具体列,具体列可以填写列族列,值,可以同时加多个列
    val rdd: RDD[(ImmutableBytesWritable, Put)] = tupleDS.rdd.map { a => {
      val put: Put = new Put(Bytes.toBytes("34353454353"))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(a._1.toString), Bytes.toBytes(a._2))
      // 封装成元组时第一个必须为ImmutableBytesWritable,符合spark和hadoop的规范
      (new ImmutableBytesWritable, put)
    }
    }
    // 执行保存操作
    rdd.saveAsHadoopDataset(jobConf)
    // 关闭session
    spark.stop()
  }

}

spark老版本:

代码语言:javascript
复制
object SparkCoreTest {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest")
//    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc: SparkContext = new SparkContext(sparkConf)

    val hBaseConf = HBaseConfiguration.create()
    hBaseConf.set(TableInputFormat.INPUT_TABLE, "hydrogenation_flow_record")
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
//    import spark.implicits._
    val hBaseConf1 = HBaseConfiguration.create()
    val jobConf = new JobConf(hBaseConf1)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "test200");


    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])

    val sps: DataFrame = hbaseRDD.map(r => (
      Bytes.toString(r._2.getValue(Bytes.toBytes("SPSFlowInfo"), Bytes.toBytes("SPSFlowTotal"))),
      Bytes.toString(r._2.getRow)
    )).toDF("SPSFlowTotal", "row")
    //    sps.registerTempTable("sps")
    sps.createOrReplaceTempView("sps")

    val frame: DataFrame = sqlContext.sql("SELECT sum(SPSFlowTotal) as A FROM sps WHERE row BETWEEN '4000069:1618539744390' and '4000069:1618539744426'")
    val tupleDS: Dataset[(String, String)] = frame.map(t => ("SPSFlowTotal", t(0).toString))

    val rdd: RDD[(ImmutableBytesWritable, Put)] = tupleDS.rdd.map { a => {
      val put: Put = new Put(Bytes.toBytes("343534543533".toString))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(a._1.toString), Bytes.toBytes(a._2))
      (new ImmutableBytesWritable, put)
    }
    }
    rdd.saveAsHadoopDataset(jobConf)
    spark.stop()
  }

}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-04-16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档