spark streaming写入kafka性能优化

本文原文(点击下面阅读原文即可进入) https://blog.csdn.net/xianpanjia4616/article/details/81432869

在实际的项目中,有时候我们需要把一些数据实时的写回到kafka中去,一般的话我们是这样写的,如下:

kafkaStreams.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        rdd.foreachPartition(pr => {
          val properties = new Properties()
          properties.put("group.id", "jaosn_")
          properties.put("acks", "all")
          properties.put("retries ", "1")
          properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesScalaUtils.loadProperties("broker"))
          properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
          properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
          val producer = new KafkaProducer[String, String](properties)
          pr.foreach(pair => {
            producer.send(new ProducerRecord(PropertiesScalaUtils.loadProperties("topic") + "_error", pair.value()))
          })
        })
      }
    })

但是这种写法有很严重的缺点,对于每个rdd的每一个partition的数据,每一次都需要创建一个KafkaProducer,显然这种做法是不太合理的,而且会带来性能问题,导致写的速度特别慢,那怎么解决这个问题呢?

1、首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:

package kafka

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }


class broadcastKafkaProducer[K,V](createproducer:() => KafkaProducer[K,V]) extends Serializable{
  lazy val producer = createproducer()
  def send(topic:String,key:K,value:V):Future[RecordMetadata] = producer.send(new ProducerRecord[K,V](topic,key,value))
  def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value))
}



object broadcastKafkaProducer {
  import scala.collection.JavaConversions._
  def apply[K,V](config:Map[String,Object]):broadcastKafkaProducer[K,V] = {
    val createProducerFunc  = () => {
      val producer = new KafkaProducer[K,V](config)
      sys.addShutdownHook({
        producer.close()
      })
      producer
    }
    new broadcastKafkaProducer(createProducerFunc)
  }
  def apply[K, V](config: java.util.Properties): broadcastKafkaProducer[K, V] = apply(config.toMap)
}

2、之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下:

// 广播 broadcastKafkaProducer 到每一个excutors上面;
    val kafkaProducer: Broadcast[broadcastKafkaProducer[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.put("group.id", "jaosn_")
        p.put("acks", "all")
        p.put("retries ", "1")
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesScalaUtils.loadProperties("broker"))
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
        p
      }
      scc.sparkContext.broadcast(broadcastKafkaProducer[String, String](kafkaProducerConfig))
    }

3、然后我们就可以在每一个executor上面将数据写入到kafka中了

kafkaStreams.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        rdd.foreachPartition(pr => {
          pr.foreach(pair => {
            kafkaProducer.value.send("topic_name",pair.value())
          })
        })
      }
    })

这样的话,就不需要每次都去创建了。先写到这儿吧。经过测试优化过的写法性能是之前的几十倍。如果有写的不对的地方,欢迎大家指正。

原文发布于微信公众号 - 大数据手稿笔记(gh_fe6a620edd40)

原文发表时间:2019-05-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券