上一篇讲了Flume安装配置,kafka安装配置,Flume+Kafka调试。
这篇继续使用SparkStreaming作为kafka的消费者消费数据,为了防止SparkStreaming应用失败,需要记录所读取topic的offset。最后进行Flume->kafka->sparkStreaming联合调试。
一)使用Spark-KafkaUtil
spark-streaming文档高级消费源部分:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources
spark-streaming kafka集成部分:
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
里边有
spark-streaming-kafka-0-8
spark-streaming-kafka-0-10
两个版本
注意:1.Kafka 0.8 support is deprecated as of Spark 2.3.0.
2.Kafka 0.10This version of the integration is marked as experimental, so the API is potentially subject to change.
由于当前使用spark-core 2.2.1版本,所以采用spark-streaming-kafka-0-8
0.8的集成文档
http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html
There are two approaches to this - the old approach usingReceivers and Kafka’s high-level API, and a new approach (introduced in Spark 1.3)without using Receivers.
0.8有两种集成方式,第一种有Receivers,第二种直连。参考文档第一种有缺陷,第二种直连方式 sparkStreaming的分区和kafka的topic分区保持一一对应,更易于理解。
所以采用直连方式。
为了防止sparkStreaming应用问题,需要sparkStreaming在处理完当前RDD后,将offset存储起来,可以存储在Redis里,kafka里,甚至zk里。
本文存储在kafka里的一个自带的topic里: __consumer_offsets
代码如下:
import java.io.
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
//import org.apache.spark.streaming.kafka._
object Order_Streaming_Hbase_08Version {
def main(args: Array[String]): Unit = {
val Array(brokers,topics,groupId) = Array("hadoop2:9092,hadoop3:9092,hadoop4:9092","order","order_consume")
//不加这个,ImmutableBytesWritable出现序列化错误---加这个使用spark的序列化方式
val spark = SparkSession.builder().appName("Order_Streaming").master("local[*]")
.config("spark.streaming.kafka.maxRatePerPartition", "5") //每个Partition每秒接收5个
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc,Seconds(1))
sc.setLogLevel("WARN")
println(spark.toString)
val kafkaParams = Map[String,String](
"bootstrap.servers" -> brokers, //kafka服务列表
"group.id" -> groupId , //SparkStreaming端的消费者组---kafka的概念
)
val topic_set = Set(topics) //kafka里存放的Topic
val kafkaCluster = new KafkaCluster(kafkaParams = kafkaParams)
//根据Kafka里的Topic获取Topic的分区信息
val getTopicPartitionSet = kafkaCluster.getPartitions(topic_set)
//可以获取到topic的partition信息,也就是没有错误
if(getTopicPartitionSet.isRight){
val topicAndPartitionSet = kafkaCluster.getPartitions(topic_set).right.get
//用来存储Topic-Per-partition的读取位置
var consumerOffsetsLong = new mutable.HashMap[TopicAndPartition,Long]()
//从kafka获取(消费组group.id)在topic-per-partition下的消费offset
val getConsumerOffSets = kafkaCluster.getConsumerOffsets(kafkaParams.get("group.id").toString,topicAndPartitionSet)
if(getConsumerOffSets.isLeft){ //没有找到group.id的消费OffSets--即没有消费过
//从Topic最老的Offset读取数据
val latestOffset = kafkaCluster.getEarliestLeaderOffsets(topicAndPartitionSet)
topicAndPartitionSet.foreach(tp=>{
//println(tp) //[order,0]
//println(latestOffset.right.get(tp)) //LeaderOffset(hadoop2,9092,7520)
})
}else{//以前消费过
val consumerOffsetsTemp = kafkaCluster.getConsumerOffsets(kafkaParams.get("group.id").toString,topicAndPartitionSet)
println(consumerOffsetsTemp.right)
println(consumerOffsetsTemp.left)
topicAndPartitionSet.foreach(tp=>{
})
}
println("这次应该从哪消费:"+consumerOffsetsLong) //Map([order,0] -> 7520, [order,1] -> 7523)
val kafkaClusterParamsBroadcast = ssc.sparkContext.broadcast(kafkaParams)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,String](ssc, kafkaParams, consumerOffsetsLong.toMap,(mmd: MessageAndMetadata[String, String]) => mmd.message())
var offsetRanges = Array.empty[OffsetRange]
stream.transform(rdd =>{
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}).foreachRDD(rdd =>{
println("这个rdd里的条目是:"+rdd.count())
// 处理业务逻辑---这种方式每个partition使用一个单例sparkSession.
rdd.foreachPartition(it=>{
//WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
val spark_tmp = SparkSession.builder().getOrCreate()
//hbase信息
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "order_streaming_hbase")
println("rdd-partion里:spark_session地址: "+spark_tmp.toString)
it.foreach(x=>{
val value = x.split("-")(1)
println("rdd-partion里的每个值:"+x)
//开始rowkey和结束一样代表精确查询某条数据
val startRowKey = "a1000001"
val endRowKey = "a1000001"
val column_family = "detail"
val column = "value"
conf.set(TableInputFormat.SCAN_ROW_START,startRowKey)
conf.set(TableInputFormat.SCAN_ROW_STOP,endRowKey)
conf.set(TableInputFormat.SCAN_COLUMN_FAMILY,column_family)
conf.set(TableInputFormat.SCAN_COLUMNS,column_family,column)
val hb_rdd = spark_tmp.sparkContext.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
val hb_rdd_data = hb_rdd.take(1)
if (hb_rdd_data.length>0){
val result = hb_rdd_data(0)._2
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue(column_family.getBytes,column.getBytes))
println("Row key:" + key + " Name:" + name)
}
})
})
####################这种将数据收取到driver端
//hbase信息
// val conf = HBaseConfiguration.create()
// conf.set("hbase.zookeeper.quorum","hadoop2,hadoop3,hadoop4")
// conf.set("hbase.zookeeper.property.clientPort","2181")
// conf.set(TableInputFormat.INPUT_TABLE, "order_streaming_hbase")
// val column_family = "detail"
// val column = "value"
// rdd.collect().foreach(x=>{
//
// val value = x.split("-")(1)
// println("rdd里"+value)
//
// //开始rowkey和结束一样代表精确查询某条数据
// val startRowKey = "a1000001"
// val endRowKey = "a1000001"
// conf.set(TableInputFormat.SCAN_ROW_START,startRowKey)
// conf.set(TableInputFormat.SCAN_ROW_STOP,endRowKey)
// conf.set(TableInputFormat.SCAN_COLUMN_FAMILY,column_family)
// conf.set(TableInputFormat.SCAN_COLUMNS,column_family,column)
//
// val hb_rdd = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
//
// val hb_rdd_data = hb_rdd.take(1)
// if (hb_rdd_data.length>0){
// val result = hb_rdd_data(0)._2
// val row_key = Bytes.toString(result.getRow)
// val person_value = Bytes.toString(result.getValue(column_family.getBytes,column.getBytes))
//
// println("Row key:" + row_key + " value:" + person_value)
//
// val result_value = value.toDouble + person_value.toDouble
//
// println("成长值将被更新为:"+result_value)
//
//
//
// }
//
// })
//向kafka里写offset
val m = new mutable.HashMap[TopicAndPartition, Long]()
if (null != offsetRanges) {
offsetRanges.foreach(
println(s"$ $ $ $")
val tp = o.topicAndPartition
println(tp)
m.put(tp, o.untilOffset)
}
)
}
// println(m)
})
ssc.start()
ssc.awaitTermination()
}
}
}
下一篇:关于Kafka_monitor的一个组件=KafkaOffsetMonitor
能查看topic被哪个消费组读取到哪里了
领取专属 10元无门槛券
私享最新 技术干货