Flume-Kafka-SparkStreaming实战2

上一篇讲了Flume安装配置,kafka安装配置,Flume+Kafka调试。

这篇继续使用SparkStreaming作为kafka的消费者消费数据,为了防止SparkStreaming应用失败,需要记录所读取topic的offset。最后进行Flume->kafka->sparkStreaming联合调试。

一)使用Spark-KafkaUtil

spark-streaming文档高级消费源部分:

http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources

spark-streaming kafka集成部分:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

里边有

spark-streaming-kafka-0-8

spark-streaming-kafka-0-10

两个版本

注意:1.Kafka 0.8 support is deprecated as of Spark 2.3.0.

2.Kafka 0.10This version of the integration is marked as experimental, so the API is potentially subject to change.

由于当前使用spark-core 2.2.1版本,所以采用spark-streaming-kafka-0-8

0.8的集成文档

http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html

There are two approaches to this - the old approach usingReceivers and Kafka’s high-level API, and a new approach (introduced in Spark 1.3)without using Receivers.

0.8有两种集成方式,第一种有Receivers,第二种直连。参考文档第一种有缺陷,第二种直连方式 sparkStreaming的分区和kafka的topic分区保持一一对应,更易于理解。

所以采用直连方式。

为了防止sparkStreaming应用问题,需要sparkStreaming在处理完当前RDD后,将offset存储起来,可以存储在Redis里,kafka里,甚至zk里。

本文存储在kafka里的一个自带的topic里: __consumer_offsets

代码如下:

import java.io.

import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

//import org.apache.spark.streaming.kafka._

object Order_Streaming_Hbase_08Version {

def main(args: Array[String]): Unit = {

val Array(brokers,topics,groupId) = Array("hadoop2:9092,hadoop3:9092,hadoop4:9092","order","order_consume")

//不加这个,ImmutableBytesWritable出现序列化错误---加这个使用spark的序列化方式

val spark = SparkSession.builder().appName("Order_Streaming").master("local[*]")

.config("spark.streaming.kafka.maxRatePerPartition", "5") //每个Partition每秒接收5个

.getOrCreate()

val sc = spark.sparkContext

val ssc = new StreamingContext(sc,Seconds(1))

sc.setLogLevel("WARN")

println(spark.toString)

val kafkaParams = Map[String,String](

"bootstrap.servers" -> brokers, //kafka服务列表

"group.id" -> groupId , //SparkStreaming端的消费者组---kafka的概念

)

val topic_set = Set(topics) //kafka里存放的Topic

val kafkaCluster = new KafkaCluster(kafkaParams = kafkaParams)

//根据Kafka里的Topic获取Topic的分区信息

val getTopicPartitionSet = kafkaCluster.getPartitions(topic_set)

//可以获取到topic的partition信息,也就是没有错误

if(getTopicPartitionSet.isRight){

val topicAndPartitionSet = kafkaCluster.getPartitions(topic_set).right.get

//用来存储Topic-Per-partition的读取位置

var consumerOffsetsLong = new mutable.HashMap[TopicAndPartition,Long]()

//从kafka获取(消费组group.id)在topic-per-partition下的消费offset

val getConsumerOffSets = kafkaCluster.getConsumerOffsets(kafkaParams.get("group.id").toString,topicAndPartitionSet)

if(getConsumerOffSets.isLeft){ //没有找到group.id的消费OffSets--即没有消费过

//从Topic最老的Offset读取数据

val latestOffset = kafkaCluster.getEarliestLeaderOffsets(topicAndPartitionSet)

topicAndPartitionSet.foreach(tp=>{

//println(tp) //[order,0]

//println(latestOffset.right.get(tp)) //LeaderOffset(hadoop2,9092,7520)

})

}else{//以前消费过

val consumerOffsetsTemp = kafkaCluster.getConsumerOffsets(kafkaParams.get("group.id").toString,topicAndPartitionSet)

println(consumerOffsetsTemp.right)

println(consumerOffsetsTemp.left)

topicAndPartitionSet.foreach(tp=>{

})

}

println("这次应该从哪消费:"+consumerOffsetsLong) //Map([order,0] -> 7520, [order,1] -> 7523)

val kafkaClusterParamsBroadcast = ssc.sparkContext.broadcast(kafkaParams)

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,String](ssc, kafkaParams, consumerOffsetsLong.toMap,(mmd: MessageAndMetadata[String, String]) => mmd.message())

var offsetRanges = Array.empty[OffsetRange]

stream.transform(rdd =>{

offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

rdd

}).foreachRDD(rdd =>{

println("这个rdd里的条目是:"+rdd.count())

// 处理业务逻辑---这种方式每个partition使用一个单例sparkSession.

rdd.foreachPartition(it=>{

//WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.

val spark_tmp = SparkSession.builder().getOrCreate()

//hbase信息

val conf = HBaseConfiguration.create()

conf.set(TableInputFormat.INPUT_TABLE, "order_streaming_hbase")

println("rdd-partion里:spark_session地址: "+spark_tmp.toString)

it.foreach(x=>{

val value = x.split("-")(1)

println("rdd-partion里的每个值:"+x)

//开始rowkey和结束一样代表精确查询某条数据

val startRowKey = "a1000001"

val endRowKey = "a1000001"

val column_family = "detail"

val column = "value"

conf.set(TableInputFormat.SCAN_ROW_START,startRowKey)

conf.set(TableInputFormat.SCAN_ROW_STOP,endRowKey)

conf.set(TableInputFormat.SCAN_COLUMN_FAMILY,column_family)

conf.set(TableInputFormat.SCAN_COLUMNS,column_family,column)

val hb_rdd = spark_tmp.sparkContext.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

val hb_rdd_data = hb_rdd.take(1)

if (hb_rdd_data.length>0){

val result = hb_rdd_data(0)._2

val key = Bytes.toString(result.getRow)

val name = Bytes.toString(result.getValue(column_family.getBytes,column.getBytes))

println("Row key:" + key + " Name:" + name)

}

})

})

####################这种将数据收取到driver端

//hbase信息

// val conf = HBaseConfiguration.create()

// conf.set("hbase.zookeeper.quorum","hadoop2,hadoop3,hadoop4")

// conf.set("hbase.zookeeper.property.clientPort","2181")

// conf.set(TableInputFormat.INPUT_TABLE, "order_streaming_hbase")

// val column_family = "detail"

// val column = "value"

// rdd.collect().foreach(x=>{

//

// val value = x.split("-")(1)

// println("rdd里"+value)

//

// //开始rowkey和结束一样代表精确查询某条数据

// val startRowKey = "a1000001"

// val endRowKey = "a1000001"

// conf.set(TableInputFormat.SCAN_ROW_START,startRowKey)

// conf.set(TableInputFormat.SCAN_ROW_STOP,endRowKey)

// conf.set(TableInputFormat.SCAN_COLUMN_FAMILY,column_family)

// conf.set(TableInputFormat.SCAN_COLUMNS,column_family,column)

//

// val hb_rdd = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

//

// val hb_rdd_data = hb_rdd.take(1)

// if (hb_rdd_data.length>0){

// val result = hb_rdd_data(0)._2

// val row_key = Bytes.toString(result.getRow)

// val person_value = Bytes.toString(result.getValue(column_family.getBytes,column.getBytes))

//

// println("Row key:" + row_key + " value:" + person_value)

//

// val result_value = value.toDouble + person_value.toDouble

//

// println("成长值将被更新为:"+result_value)

//

//

//

// }

//

// })

//向kafka里写offset

val m = new mutable.HashMap[TopicAndPartition, Long]()

if (null != offsetRanges) {

offsetRanges.foreach(

println(s"$ $ $ $")

val tp = o.topicAndPartition

println(tp)

m.put(tp, o.untilOffset)

}

)

}

// println(m)

})

ssc.start()

ssc.awaitTermination()

}

}

}

下一篇:关于Kafka_monitor的一个组件=KafkaOffsetMonitor

能查看topic被哪个消费组读取到哪里了

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180830G0EZMO00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码关注腾讯云开发者

领取腾讯云代金券