前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KafKa 代码实现

KafKa 代码实现

作者头像
曼路
发布2018-10-18 15:14:42
7700
发布2018-10-18 15:14:42
举报
文章被收录于专栏:浪淘沙浪淘沙

1.消费者

代码语言:javascript
复制
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args){
        HashMap<String,Object > config = new HashMap<>();
        config.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");
        config.put("key.deserializer",StringDeserializer.class.getName());
        config.put("value.deserializer",StringDeserializer.class.getName());
        config.put("group.id","g000001");
        /**
         * 从哪个位置获取数据
         * [latest,earliest,none]
         */
        config.put("auto.offset.reset","earliest");
        //是否要自动递交偏移量(offset)
        //config.put("enable.auto.commit","false");

        config.put("enable.auto.commit","true");

        config.put("","500");

        //      创建一个消费者客户端实例
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(config);
         consumer.subscribe(Arrays.asList("test"));

                while (true) {
                    //拉去数据, 会从kafka所有分区下拉取数据
                    ConsumerRecords<String, String> records = consumer.poll(2000);
                    Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
                    while (iterator.hasNext()) {
                        ConsumerRecord<String, String> record = iterator.next();
                        System.out.println("record = " + record);
                    }
                }

                //释放连接
        // consumer.close();
    }
}

2.生产者

代码语言:javascript
复制
package day12;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer {
    public static void main(String[] args) throws  Exception{
        Properties props = new Properties();
        props.setProperty("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");
        props.setProperty("key.serializer",StringSerializer.class.getName());
        props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //发送数据的时候时候应答 默认1
        //props.setProperty("scks","1");
        //自定义分区 默认为
        //org.apache.kafka.clients.producer.internals.DefaultPartitioner

        // props.setProperty("partitioner.class","org.apache.kafka.clients.producer.internals.DefaultPartitioner");

        //创建生产者实例
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(props);
        int count = 0;
        while(count < 10000000){
           // int partitionNum = count%3;
            ProducerRecord record = new ProducerRecord("test",0,"","NO:"+count);
            kafkaProducer.send(record);
            count++;
            Thread.sleep(1000);
        }

        kafkaProducer.close();
    }
}

3.Kafka整合SparkStreaming

代码语言:javascript
复制
package day12

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.codehaus.jackson.map.deser.std.StringDeserializer

/**
  * SparkStreaming整合kafka
  */
object SparkStreaming_kafka {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf = new SparkConf().setMaster("local[2]").setAppName(s"${this.getClass.getSimpleName}")
    val ssc = new StreamingContext(conf,Seconds(5))


    /**
      * kafka参数列表
      */
    val kafkaParams = Map[String,Object](
      "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "day12_005",
      "auto.offset.reset" -> "earliest",
      "enable.auto.comit" -> (false:java.lang.Boolean)
    )


    //指定主题
    val topics = Array("test")

    /**
      * 指定kafka数据源
      */
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    /* val maped: DStream[(String, String)] = stream.map(record => (record.key,record.value))
    maped.foreachRDD(rdd => {
    //计算逻辑
    rdd.foreach(println)

  })*/

    stream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
      //计算逻辑
      maped.foreach(println)
      //循环输出
      for(o <- offsetRange){
        println(s"${o.topic}  ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
    })

    //启动程序
    ssc.start()

    //等待程序被终止
    ssc.awaitTermination()





  }

}

4.Zookeeper管理Kafka的Offset

代码语言:javascript
复制
package day12

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * zk管理kafka的offset
  * Created by zhangjingcun on 2018/10/11 8:49.
  */
object SSCDirectKafka010_ZK_Offset {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setMaster("local[*]").setAppName(s"${this.getClass.getSimpleName}")
    // 批次时间为2s
    val ssc = new StreamingContext(conf, Seconds(2))

    val groupId = "day13_001";

    /**
      * kafka参数列表
      */
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topic = "testTopic"
    val topics = Array(topic)

    /**
      * 如果我们自己维护偏移量
      * 问题:
      *   1:程序在第一次启动的时候,应该从什么开始消费数据?earliest
      *   2:程序如果不是第一次启动的话,应该 从什么位置开始消费数据?上一次自己维护的偏移量接着往后消费,比如上一次存储的offset=88
      *
      * 该类主要拼接字符串
      */
    val zKGroupTopicDirs: ZKGroupTopicDirs = new ZKGroupTopicDirs(groupId, topic)

    /**
      * 生成的目录结构
      * /customer/day13_001/offsets/testTopic
      */
    val offsetDir = zKGroupTopicDirs.consumerOffsetDir

    //zk字符串连接组
    val zkGroups = "hadoop01:2181,hadoop02:2181,hadoop03:2181"

    /**
      * 创建一个zkClient连接
      * 判断/customer/day13_001/offsets/testTopic 下面有没有孩子节点,如果有说明之前维护过偏移量,如果没有的话说明程序是第一次执行
      */
    val zkClient = new ZkClient(zkGroups)
    val childrenCount = zkClient.countChildren(offsetDir)

    val stream =  if(childrenCount>0){ //非第一次启动
      println("----------已经启动过------------")
      //用来存储我们读取到的偏移量
       var fromOffsets = Map[TopicPartition, Long]()
      //customer/day13_001/offsets/testTopic/0
      //customer/day13_001/offsets/testTopic/1
      //customer/day13_001/offsets/testTopic/2
      (0  until childrenCount).foreach(partitionId => {
        val offset = zkClient.readData[String](offsetDir+s"/${partitionId}")
        fromOffsets += (new TopicPartition(topic, partitionId) -> offset.toLong)
      })
      KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets))
    } else { //第一次启动
      println("-------------第一次启动-----------")
      KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
    }

    stream.foreachRDD(rdd=>{
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record => (record.key, record.value))
      //计算逻辑
      maped.foreach(println)

      //自己存储数据,自己管理
      for(o<-offsetRange){
        //println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        //写入到Zookeeper
        ZkUtils(zkClient, false).updatePersistentPath(offsetDir+"/"+o.partition, o.untilOffset.toString)
      }
    })

    ssc.start()

    ssc.awaitTermination()
  }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年10月12日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档