首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Spark Tips 2: 在Spark Streaming中均匀分配从Kafka directStream 中读出的数据

下面这段code用于在Spark Streaming job中读取Kafka的message:

......

JavaPairInputDStream<String,String> messages =KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

JavaDStream<String>lines = messages.map(newFunction<Tuple2<String, String>, String>(){

@Override

publicString call(Tuple2<String, String> tuple2){

returntuple2._2();

}

});

lines.foreachRDD(newFunction<JavaRDD<String>,Void>(){

@Override

publicVoid call(JavaRDD<String> strJavaRDD)throws Exception {

int rddPartitionNum = strJavaRDD.partitions().size();

List<String>messages = strJavaRDD.collect();

List<String>sizeStrs = newArrayList<String>();

for(String message: messages) {

if(message== null)

continue;

StringlogStr = "message size is" + message.length();

strs.add(logStr);

}

saveToLog(outputLogPath,strs);

returnnull;

}

});

......

以上代码虽然可以正常运行,不过却出现了一个问题:当message size非常大(比如10MB/message)的时候,spark端的处理速度非常缓慢,在3brokers的Kafka + 32 nodes的spark上运行时(本job的executorinstance # =16, 1 core/instance),基本上在<10messages/second的速度。

但是如果单独看Kafka的pullmessage的速度,要快得多,所以bottleneck不是Kafka。具体看16个worker(executorinstance)的log,会发现,同一个duration中,只有2个worker在运行。于是加入上面红色一行代码,发现rddPartitionNum是2。而map function是按照RDD的partition的数量来分配到worker上去的。strJavaRDD一共只有2个partition,所有,每次只有2个worker在工作。

为什么strJavaRDD只有2个partition呢?因为Kafka配置中的default partition number只有2个,在创建topic的时候,没有制定专门的partitionnumber,所以采用了defaultpartition number,导致当前topic只有2个partition。于是修改Kafka的server.properties文件中的num.partitions如下:

------------------

num.partitions=16

------------------

NOTE:当然,也可以用repartition()method对strJavaRDD进行repartition,不过这样需要shuffle数据,对于job的性能有所影响。

这样修改过之后,果然新建的topic具有了16个partition。可是在向新生成的topic中publishmessage之后却发现,并不是所有partition中都有数据。显然publish到Kafka中的数据没有平均分布。究其原因,是因为在publishmessage的时候,没有制定partitionclass name,因此,partition class就采用了DefaultPartitioner。在Kafka0.8.1.1(我们采用的Kafka版本)中,其代码如下:

package kafka.producer

import kafka.utils._

class DefaultPartitioner(props: VerifiableProperties =null)extends Partitioner {

private val random = newjava.util.Random

def partition(key: Any,numPartitions: Int): Int = {

Utils.abs(key.hashCode) % numPartitions

}

}

又因为publish message的时候没有制定key,因此,在partitionclass的partitionmethod中,key == null,而null.hashCode = 0。因此所有的数据都进入到了一个partition当中。

修复这一问题的方法是,添加一个人工的partition class,使得在无key的情况下message平均分配,例如下面这个:

public classSimplePartitioner implements Partitioner { public SimplePartitioner(VerifiableProperties props) { } public int partition(Objectkey, int numPartitions) { int intKey =0; if(key != null) { try { Integer.parseInt((String) key); } catch (exception e) {} }

Random rand = newRandom(); int randomKey= rand.nextInt(numPartitions); partition =(intKey + randomKey) % numPartitions; returnpartition; } }

然后在KafkaProducer中加入这个partitionclass:

… … Properties props = new Properties(); props.put("partitioner.class", partitionerClassName); … … // other properties kafka.producer.ProducerConfig config = newkafka.producer.ProducerConfig(props); kafka.javaapi.producer.Producer producer = new kafka.javaapi.producer.Producer(config); … … KeyedMessage<String, String> data = new KeyedMessage<String,String>(topic, message);

producer.send(data); … …

这样做之后,所有publish到Kafkatopic中的message便平均分配到了16个partition,在sparkstreamingjob中被读取出之后也就是均匀分布到了16个executor core中运行。达到了load balance的效果。

下一篇
举报
领券