Spark Tips 2: 在Spark Streaming中均匀分配从Kafka directStream 中读出的数据

下面这段code用于在Spark Streaming job中读取Kafka的message:

......

JavaPairInputDStream<String,String> messages =KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

JavaDStream<String>lines = messages.map(newFunction<Tuple2<String, String>, String>(){

@Override

publicString call(Tuple2<String, String> tuple2){

returntuple2._2();

}

});

lines.foreachRDD(newFunction<JavaRDD<String>,Void>(){

@Override

publicVoid call(JavaRDD<String> strJavaRDD)throws Exception {

int rddPartitionNum = strJavaRDD.partitions().size();

List<String>messages = strJavaRDD.collect();

List<String>sizeStrs = newArrayList<String>();

for(String message: messages) {

if(message== null)

continue;

StringlogStr = "message size is" + message.length();

strs.add(logStr);

}

saveToLog(outputLogPath,strs);

returnnull;

}

});

......

以上代码虽然可以正常运行,不过却出现了一个问题:当message size非常大(比如10MB/message)的时候,spark端的处理速度非常缓慢,在3brokers的Kafka + 32 nodes的spark上运行时(本job的executorinstance # =16, 1 core/instance),基本上在<10messages/second的速度。

但是如果单独看Kafka的pullmessage的速度,要快得多,所以bottleneck不是Kafka。具体看16个worker(executorinstance)的log,会发现,同一个duration中,只有2个worker在运行。于是加入上面红色一行代码,发现rddPartitionNum是2。而map function是按照RDD的partition的数量来分配到worker上去的。strJavaRDD一共只有2个partition,所有,每次只有2个worker在工作。

为什么strJavaRDD只有2个partition呢?因为Kafka配置中的default partition number只有2个,在创建topic的时候,没有制定专门的partitionnumber,所以采用了defaultpartition number,导致当前topic只有2个partition。于是修改Kafka的server.properties文件中的num.partitions如下:

------------------

num.partitions=16

------------------

NOTE:当然,也可以用repartition()method对strJavaRDD进行repartition,不过这样需要shuffle数据,对于job的性能有所影响。

这样修改过之后,果然新建的topic具有了16个partition。可是在向新生成的topic中publishmessage之后却发现,并不是所有partition中都有数据。显然publish到Kafka中的数据没有平均分布。究其原因,是因为在publishmessage的时候,没有制定partitionclass name,因此,partition class就采用了DefaultPartitioner。在Kafka0.8.1.1(我们采用的Kafka版本)中,其代码如下:

package kafka.producer

import kafka.utils._

class DefaultPartitioner(props: VerifiableProperties =null)extends Partitioner {

private val random = newjava.util.Random

def partition(key: Any,numPartitions: Int): Int = {

Utils.abs(key.hashCode) % numPartitions

}

}

又因为publish message的时候没有制定key,因此,在partitionclass的partitionmethod中,key == null,而null.hashCode = 0。因此所有的数据都进入到了一个partition当中。

修复这一问题的方法是,添加一个人工的partition class,使得在无key的情况下message平均分配,例如下面这个:

public classSimplePartitioner implements Partitioner { public SimplePartitioner(VerifiableProperties props) { } public int partition(Objectkey, int numPartitions) { int intKey =0; if(key != null) { try { Integer.parseInt((String) key); } catch (exception e) {} }

Random rand = newRandom(); int randomKey= rand.nextInt(numPartitions); partition =(intKey + randomKey) % numPartitions; returnpartition; } }

然后在KafkaProducer中加入这个partitionclass:

… … Properties props = new Properties(); props.put("partitioner.class", partitionerClassName); … … // other properties kafka.producer.ProducerConfig config = newkafka.producer.ProducerConfig(props); kafka.javaapi.producer.Producer producer = new kafka.javaapi.producer.Producer(config); … … KeyedMessage<String, String> data = new KeyedMessage<String,String>(topic, message);

producer.send(data); … …

这样做之后,所有publish到Kafkatopic中的message便平均分配到了16个partition,在sparkstreamingjob中被读取出之后也就是均匀分布到了16个executor core中运行。达到了load balance的效果。

原文发布于微信公众号 - 悦思悦读(yuesiyuedu)

原文发表时间:2015-11-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Albert陈凯

Spark自定义累加器的实现

Spark自定义累加器的实现 Java版本: package com.luoxuehuan.sparkproject.spark; import org.apa...

4385
来自专栏牛肉圆粉不加葱

Spark Task 的执行流程③ - 执行 task

创建、分发 Task一文中我们提到 TaskRunner(继承于 Runnable) 对象最终会被提交到 Executor 的线程池中去执行,本文就将对该执行过...

831
来自专栏Spark学习技巧

RDD的join和Dstream的join有什么区别?

Dstream这个类实际上支持的只是Spark Streaming的基础操作算子,比如: map, filter 和window.PairDStreamFunc...

2141
来自专栏Spark学习技巧

hashpartitioner-Spark分区计算器

一点点回忆 年初了,帮助大家回忆一下spark的重要知识点。 首先,我们回顾的知识点是RDD的五大特性: 1,一系列的分区。 2,一个函数作用于分区上。 3,R...

4249
来自专栏岑玉海

Spark源码系列(五)分布式缓存

这一章想讲一下Spark的缓存是如何实现的。这个persist方法是在RDD里面的,所以我们直接打开RDD这个类。 def persist(newLevel...

3995
来自专栏Spark生态圈

[spark] DAGScheduler划分stage源码解析

Spark Application只有遇到action操作时才会真正的提交任务并进行计算,DAGScheduler 会根据各个RDD之间的依赖关系形成一个DAG...

2072
来自专栏行者悟空

Spark核心数据结构RDD的定义

3574
来自专栏浪淘沙

SparkStreaming编程实现

3.MyNetworkTotalWordCountV2.scala(开发自己的实时词频统计程序(累计单词出现次数))

2165
来自专栏星汉技术

原 荐 Spark框架核心概念

3988
来自专栏木东居士的专栏

Spark源码解析:DStream

2614

扫码关注云+社区

领取腾讯云代金券