1.消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
public class Consumer {
public static void main(String[] args){
HashMap<String,Object > config = new HashMap<>();
config.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");
config.put("key.deserializer",StringDeserializer.class.getName());
config.put("value.deserializer",StringDeserializer.class.getName());
config.put("group.id","g000001");
/**
* 从哪个位置获取数据
* [latest,earliest,none]
*/
config.put("auto.offset.reset","earliest");
//是否要自动递交偏移量(offset)
//config.put("enable.auto.commit","false");
config.put("enable.auto.commit","true");
config.put("","500");
// 创建一个消费者客户端实例
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe(Arrays.asList("test"));
while (true) {
//拉去数据, 会从kafka所有分区下拉取数据
ConsumerRecords<String, String> records = consumer.poll(2000);
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
System.out.println("record = " + record);
}
}
//释放连接
// consumer.close();
}
}
2.生产者
package day12;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer {
public static void main(String[] args) throws Exception{
Properties props = new Properties();
props.setProperty("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");
props.setProperty("key.serializer",StringSerializer.class.getName());
props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//发送数据的时候时候应答 默认1
//props.setProperty("scks","1");
//自定义分区 默认为
//org.apache.kafka.clients.producer.internals.DefaultPartitioner
// props.setProperty("partitioner.class","org.apache.kafka.clients.producer.internals.DefaultPartitioner");
//创建生产者实例
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(props);
int count = 0;
while(count < 10000000){
// int partitionNum = count%3;
ProducerRecord record = new ProducerRecord("test",0,"","NO:"+count);
kafkaProducer.send(record);
count++;
Thread.sleep(1000);
}
kafkaProducer.close();
}
}
3.Kafka整合SparkStreaming
package day12
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.codehaus.jackson.map.deser.std.StringDeserializer
/**
* SparkStreaming整合kafka
*/
object SparkStreaming_kafka {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setMaster("local[2]").setAppName(s"${this.getClass.getSimpleName}")
val ssc = new StreamingContext(conf,Seconds(5))
/**
* kafka参数列表
*/
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "day12_005",
"auto.offset.reset" -> "earliest",
"enable.auto.comit" -> (false:java.lang.Boolean)
)
//指定主题
val topics = Array("test")
/**
* 指定kafka数据源
*/
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
/* val maped: DStream[(String, String)] = stream.map(record => (record.key,record.value))
maped.foreachRDD(rdd => {
//计算逻辑
rdd.foreach(println)
})*/
stream.foreachRDD(rdd => {
val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
//计算逻辑
maped.foreach(println)
//循环输出
for(o <- offsetRange){
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
})
//启动程序
ssc.start()
//等待程序被终止
ssc.awaitTermination()
}
}
4.Zookeeper管理Kafka的Offset
package day12
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* zk管理kafka的offset
* Created by zhangjingcun on 2018/10/11 8:49.
*/
object SSCDirectKafka010_ZK_Offset {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf().setMaster("local[*]").setAppName(s"${this.getClass.getSimpleName}")
// 批次时间为2s
val ssc = new StreamingContext(conf, Seconds(2))
val groupId = "day13_001";
/**
* kafka参数列表
*/
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topic = "testTopic"
val topics = Array(topic)
/**
* 如果我们自己维护偏移量
* 问题:
* 1:程序在第一次启动的时候,应该从什么开始消费数据?earliest
* 2:程序如果不是第一次启动的话,应该 从什么位置开始消费数据?上一次自己维护的偏移量接着往后消费,比如上一次存储的offset=88
*
* 该类主要拼接字符串
*/
val zKGroupTopicDirs: ZKGroupTopicDirs = new ZKGroupTopicDirs(groupId, topic)
/**
* 生成的目录结构
* /customer/day13_001/offsets/testTopic
*/
val offsetDir = zKGroupTopicDirs.consumerOffsetDir
//zk字符串连接组
val zkGroups = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
/**
* 创建一个zkClient连接
* 判断/customer/day13_001/offsets/testTopic 下面有没有孩子节点,如果有说明之前维护过偏移量,如果没有的话说明程序是第一次执行
*/
val zkClient = new ZkClient(zkGroups)
val childrenCount = zkClient.countChildren(offsetDir)
val stream = if(childrenCount>0){ //非第一次启动
println("----------已经启动过------------")
//用来存储我们读取到的偏移量
var fromOffsets = Map[TopicPartition, Long]()
//customer/day13_001/offsets/testTopic/0
//customer/day13_001/offsets/testTopic/1
//customer/day13_001/offsets/testTopic/2
(0 until childrenCount).foreach(partitionId => {
val offset = zkClient.readData[String](offsetDir+s"/${partitionId}")
fromOffsets += (new TopicPartition(topic, partitionId) -> offset.toLong)
})
KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets))
} else { //第一次启动
println("-------------第一次启动-----------")
KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
}
stream.foreachRDD(rdd=>{
val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val maped: RDD[(String, String)] = rdd.map(record => (record.key, record.value))
//计算逻辑
maped.foreach(println)
//自己存储数据,自己管理
for(o<-offsetRange){
//println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
//写入到Zookeeper
ZkUtils(zkClient, false).updatePersistentPath(offsetDir+"/"+o.partition, o.untilOffset.toString)
}
})
ssc.start()
ssc.awaitTermination()
}
}