前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark streaming写入kafka性能优化

spark streaming写入kafka性能优化

作者头像
大数据技术架构
发布2019-08-16 15:43:08
1.5K0
发布2019-08-16 15:43:08
举报

本文原文(点击下面阅读原文即可进入) https://blog.csdn.net/xianpanjia4616/article/details/81432869

在实际的项目中,有时候我们需要把一些数据实时的写回到kafka中去,一般的话我们是这样写的,如下:

代码语言:javascript
复制
kafkaStreams.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        rdd.foreachPartition(pr => {
          val properties = new Properties()
          properties.put("group.id", "jaosn_")
          properties.put("acks", "all")
          properties.put("retries ", "1")
          properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesScalaUtils.loadProperties("broker"))
          properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
          properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
          val producer = new KafkaProducer[String, String](properties)
          pr.foreach(pair => {
            producer.send(new ProducerRecord(PropertiesScalaUtils.loadProperties("topic") + "_error", pair.value()))
          })
        })
      }
    })

但是这种写法有很严重的缺点,对于每个rdd的每一个partition的数据,每一次都需要创建一个KafkaProducer,显然这种做法是不太合理的,而且会带来性能问题,导致写的速度特别慢,那怎么解决这个问题呢?

1、首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:

代码语言:javascript
复制
package kafka

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }


class broadcastKafkaProducer[K,V](createproducer:() => KafkaProducer[K,V]) extends Serializable{
  lazy val producer = createproducer()
  def send(topic:String,key:K,value:V):Future[RecordMetadata] = producer.send(new ProducerRecord[K,V](topic,key,value))
  def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value))
}



object broadcastKafkaProducer {
  import scala.collection.JavaConversions._
  def apply[K,V](config:Map[String,Object]):broadcastKafkaProducer[K,V] = {
    val createProducerFunc  = () => {
      val producer = new KafkaProducer[K,V](config)
      sys.addShutdownHook({
        producer.close()
      })
      producer
    }
    new broadcastKafkaProducer(createProducerFunc)
  }
  def apply[K, V](config: java.util.Properties): broadcastKafkaProducer[K, V] = apply(config.toMap)
}

2、之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下:

代码语言:javascript
复制
// 广播 broadcastKafkaProducer 到每一个excutors上面;
    val kafkaProducer: Broadcast[broadcastKafkaProducer[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.put("group.id", "jaosn_")
        p.put("acks", "all")
        p.put("retries ", "1")
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesScalaUtils.loadProperties("broker"))
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
        p
      }
      scc.sparkContext.broadcast(broadcastKafkaProducer[String, String](kafkaProducerConfig))
    }

3、然后我们就可以在每一个executor上面将数据写入到kafka中了

代码语言:javascript
复制
kafkaStreams.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        rdd.foreachPartition(pr => {
          pr.foreach(pair => {
            kafkaProducer.value.send("topic_name",pair.value())
          })
        })
      }
    })

这样的话,就不需要每次都去创建了。先写到这儿吧。经过测试优化过的写法性能是之前的几十倍。如果有写的不对的地方,欢迎大家指正。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档