1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看
2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下
3)好久没发博客了,水一篇
Scala 2.11.1
Spark 2.11
HBase 2.0.5
其中hbase-site.xml为hbase安装目录下/hbase/conf里的hbase-site.xml
<properties>
<mysql.version>6.0.5</mysql.version>
<spring.version>4.3.6.RELEASE</spring.version>
<spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
<log4j.version>1.2.17</log4j.version>
<quartz.version>2.2.3</quartz.version>
<slf4j.version>1.7.22</slf4j.version>
<hibernate.version>5.2.6.Final</hibernate.version>
<camel.version>2.18.2</camel.version>
<config.version>1.10</config.version>
<jackson.version>2.8.6</jackson.version>
<servlet.version>3.0.1</servlet.version>
<net.sf.json.version>2.4</net.sf.json.version>
<activemq.version>5.14.3</activemq.version>
<spark.version>2.1.1</spark.version>
<scala.version>2.11.11</scala.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.7</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
package com.bjfu.spark.demo.hbasedemo
import com.google.common.collect.Table.Cell
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author chaird
* @create 2020-11-13 22:04
*/
object HBaseDemo {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseDemo")
//创建Spark上下文对象
val sc = new SparkContext(config)
//创建HBase配置对象
val conf: Configuration = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "EcodataFei")
//全表扫描
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result])
hbaseRDD.foreach {
//第一种展示方式
case (rowkey, result) => {
val cells: Array[hbase.Cell] = result.rawCells()
for (cell <- cells) {
println(Bytes.toString(CellUtil.cloneValue(cell)))
}
}
第二种展示方式
// case (rowkey,result)=>{
// //rowKey
// val key: String = Bytes.toString(result.getRow)
// //列族,列,值
// val value: String = Bytes.toString(result.getValue(Bytes.toBytes("datatype"),Bytes.toBytes("288")))
// println("rowKey:"+key+" "+"value:"+value)
//
// }
}
//释放资源
sc.stop()
}
}
package com.bjfu.spark.demo.hbasedemo
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HTable, Scan}
import org.apache.hadoop.hbase.filter.FilterList.Operator
import org.apache.hadoop.hbase.filter._
import java.util
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
/**
* @author chaird
* @create 2020-11-14 20:39
*/
object HBaseConditionDemo {
def main(args: Array[String]): Unit = {
//创建Spark上下文对象
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseDemo")
val sc = new SparkContext(config)
//创建HBaseConf
val hbaseConf: Configuration = HBaseConfiguration.create()
hbaseConf.set(TableInputFormat.INPUT_TABLE, "EcodataFei")
//创建过滤器(主键)
val scan = new Scan()
val rowkeys = List("17-2020-41-12 17:09", "2-2020-37-12 17:09")
val filters = new util.ArrayList[Filter]()
for (cookieid <- rowkeys) {
val filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(cookieid)))
filters.add(filter)
}
val filterList = new FilterList(Operator.MUST_PASS_ONE, filters)
scan.setFilter(filterList)
hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))
val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result])
hbaseRDD.foreach {
case (rowkey, result) => {
//rowKey
val key: String = Bytes.toString(result.getRow)
//列族,列,值
val value: String = Bytes.toString(result.getValue(Bytes.toBytes("datatype"), Bytes.toBytes("288")))
println("rowKey:" + key + " " + "value:" + value)
}
}
//释放资源
sc.stop()
}
def convertScanToString(scan: Scan) = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
}
val list: List[String] = hbaseRDD.map(
x => (Bytes.toString(x._2.getValue(Bytes.toBytes("datatype"), Bytes.toBytes("customer_id"))))).collect().toList
list.foreach(println(_))