前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkStreaming_Kafka_Redis整合

SparkStreaming_Kafka_Redis整合

作者头像
曼路
发布2018-10-18 15:16:08
9180
发布2018-10-18 15:16:08
举报
文章被收录于专栏:浪淘沙浪淘沙

1.将kafka  streaming 和 redis整合 实现词频统计

   Producer.class  生成数据daokafka

代码语言:javascript
复制
package day14;

/**
 * 创建一个生产者 生成随机的key 和 字母
 * 用于实现实时流统计词频 并 存储到redis
 */
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Random;
import java.util.UUID;

public class GenerateWord {
    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");//kafka的brokers列表
        //key和value的序列化方式,因为需要网络传输所以需要序列化
        props.setProperty("key.serializer", StringSerializer.class.getName());
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建一个生产者的客户端实例
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        while(true){
            Thread.sleep(500);
            String key = UUID.randomUUID().toString();
            int value = new Random().nextInt(26) + 97;
            char word = (char)value;
            ProducerRecord<String,String> record = new ProducerRecord("wordcount",key,String.valueOf(word));
            kafkaProducer.send(record);
            System.out.println("record = " + record);

        }

    }
}
代码语言:javascript
复制
MyNetWordCountRedis.scala
代码语言:javascript
复制
package day14


import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.JPools


/**
  * 将kafka  streaming 和 redis整合 实现词频统计
  */
object MyNetWordCountRedis {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    //创建SparkStreaming 对象
    val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local[*]")
    conf.set("spark.streaming.kafka.maxRatePerPartition","5")   //从kafka拉取数据限速 (5)*(分区个数)*(采集数据时间)
    conf.set("spark.streaming.kafka.stopGracefullyOnShutdown","true")     //优雅的停止关闭
    val ssc = new StreamingContext(conf,Seconds(2))

    //定义一个消费者
    val groupId = "day14_001"
    //定义一个主题
    val topic = "wordcount"

    /**
      * kafka参数列表
      */
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    //连接到kafka数据源
    val stream = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,          //位置策略(可用的Executor上均匀分配分区)
      ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParams))

    //创建在Master节点的Driver
    stream.foreachRDD(rdd => {
      val reduced = rdd.map(x => (x.value(),1)).reduceByKey(_+_)
      reduced.foreachPartition(rdd => {
        //连接redis
        val redis = JPools.getJedis

        rdd.foreach({x => redis.hincrBy("wordcount",x._1,x._2.toLong)})

        redis.close()
      })
    })

    ssc.start()

    ssc.awaitTermination()

  }

}

Jpools.scala(Redis连接池)

代码语言:javascript
复制
package utils

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool

/**
  * 一个简单的redis连接池
  */
object JPools {

  private val poolConfi = new GenericObjectPoolConfig()
  poolConfi.setMaxIdle(5)   //最大的空闲连接  连接池中最大的空闲连接数,默认为8
  poolConfi.setMaxTotal(2000)   //支持最大的连接数  默认为8

  //连接池是私有的 不能对外公开访问
  private lazy val jedisPool = new JedisPool(poolConfi,"hadoop02")

  def getJedis={
    val jedis = jedisPool.getResource
    jedis.select(0)
    jedis
  }
}

2.使用Redis 管理Kafka Offset

代码语言:javascript
复制
JedisOffSet.scala(查找Redis中保存的groupId的Offset)
代码语言:javascript
复制
package day14

import java.util

import org.apache.kafka.common.TopicPartition
import utils.JPools

object JedisOffSet {

  def apply(groupid: String)={
    var fromDbOffset = Map[TopicPartition, Long]()
    val jedis = JPools.getJedis

    val topicPartitionOffset: util.Map[String, String] = jedis.hgetAll(groupid)
    import scala.collection.JavaConversions._
    val topicPartitionOffsetList: List[(String, String)] = topicPartitionOffset.toList

    for (topicPL <- topicPartitionOffsetList){
      val split: Array[String] = topicPL._1.split("[-]")
      fromDbOffset += (new TopicPartition(split(0), split(1).toInt) -> topicPL._2.toLong)
    }
    fromDbOffset
  }
}
代码语言:javascript
复制
Streaming_Kafka_Redis_Offset.scala
代码语言:javascript
复制
package day14

import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import utils.JPools


/**
  * 实现streaming, kafka, redis的结合 完成wordcount
  * 并通过redis实现偏移量的管理
  */
object Streaming_Kafka_Redis_Offset {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    //创建SparkStreaming 对象
    val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local[*]")
    conf.set("spark.streaming.kafka.maxRatePerPartition","5")   //从kafka拉取数据限速 (5)*(分区个数)*(采集数据时间)
    conf.set("spark.streaming.kafka.stopGracefullyOnShutdown","true")     //优雅的停止关闭
    val ssc = new StreamingContext(conf,Seconds(2))

    //定义一个消费者
    val groupId = "day14_001"
    //定义一个主题
    val topic = "wordcount"

    /**
      * kafka参数列表
      */
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val formOffset: Map[TopicPartition, Long] = JedisOffSet(groupId)

    //连接到kafka数据源
    val stream = if(formOffset.size == 0) {
      KafkaUtils.createDirectStream(ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams))
    } else {
      KafkaUtils.createDirectStream(ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Assign[String, String](formOffset.keys, kafkaParams, formOffset))
    }
    //创建在Master节点的Driver
    stream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      val reduced = rdd.map(x => (x.value(),1)).reduceByKey(_+_)

      reduced.foreachPartition(rdd => {
        //连接redis
        val redis = JPools.getJedis

        rdd.foreach({x => redis.hincrBy("wordcount",x._1,x._2.toLong)})

        redis.close()
      })

      //将偏移量存到redis
      val jedis = JPools.getJedis
      for(o <- offsetRange){
        jedis.hset(groupId,o.topic+"-"+o.partition,o.untilOffset.toString)
      }
    })

    ssc.start()

    ssc.awaitTermination()

  }

}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年10月14日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.将kafka  streaming 和 redis整合 实现词频统计
  • 2.使用Redis 管理Kafka Offset
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档