下面这段code用于在Spark Streaming job中读取Kafka的message:
......
JavaPairInputDStream<String,String> messages =KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream<String>lines = messages.map(newFunction<Tuple2<String, String>, String>(){
@Override
publicString call(Tuple2<String, String> tuple2){
returntuple2._2();
}
});
lines.foreachRDD(newFunction<JavaRDD<String>,Void>(){
@Override
publicVoid call(JavaRDD<String> strJavaRDD)throws Exception {
int rddPartitionNum = strJavaRDD.partitions().size();
List<String>messages = strJavaRDD.collect();
List<String>sizeStrs = newArrayList<String>();
for(String message: messages) {
if(message== null)
continue;
StringlogStr = "message size is" + message.length();
strs.add(logStr);
}
saveToLog(outputLogPath,strs);
returnnull;
}
});
......
以上代码虽然可以正常运行,不过却出现了一个问题:当message size非常大(比如10MB/message)的时候,spark端的处理速度非常缓慢,在3brokers的Kafka + 32 nodes的spark上运行时(本job的executorinstance # =16, 1 core/instance),基本上在<10messages/second的速度。
但是如果单独看Kafka的pullmessage的速度,要快得多,所以bottleneck不是Kafka。具体看16个worker(executorinstance)的log,会发现,同一个duration中,只有2个worker在运行。于是加入上面红色一行代码,发现rddPartitionNum是2。而map function是按照RDD的partition的数量来分配到worker上去的。strJavaRDD一共只有2个partition,所有,每次只有2个worker在工作。
为什么strJavaRDD只有2个partition呢?因为Kafka配置中的default partition number只有2个,在创建topic的时候,没有制定专门的partitionnumber,所以采用了defaultpartition number,导致当前topic只有2个partition。于是修改Kafka的server.properties文件中的num.partitions如下:
------------------
num.partitions=16
------------------
NOTE:当然,也可以用repartition()method对strJavaRDD进行repartition,不过这样需要shuffle数据,对于job的性能有所影响。
这样修改过之后,果然新建的topic具有了16个partition。可是在向新生成的topic中publishmessage之后却发现,并不是所有partition中都有数据。显然publish到Kafka中的数据没有平均分布。究其原因,是因为在publishmessage的时候,没有制定partitionclass name,因此,partition class就采用了DefaultPartitioner。在Kafka0.8.1.1(我们采用的Kafka版本)中,其代码如下:
package kafka.producer
import kafka.utils._
class DefaultPartitioner(props: VerifiableProperties =null)extends Partitioner {
private val random = newjava.util.Random
def partition(key: Any,numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
}
又因为publish message的时候没有制定key,因此,在partitionclass的partitionmethod中,key == null,而null.hashCode = 0。因此所有的数据都进入到了一个partition当中。
修复这一问题的方法是,添加一个人工的partition class,使得在无key的情况下message平均分配,例如下面这个:
public classSimplePartitioner implements Partitioner { public SimplePartitioner(VerifiableProperties props) { } public int partition(Objectkey, int numPartitions) { int intKey =0; if(key != null) { try { Integer.parseInt((String) key); } catch (exception e) {} }
Random rand = newRandom(); int randomKey= rand.nextInt(numPartitions); partition =(intKey + randomKey) % numPartitions; returnpartition; } }
然后在KafkaProducer中加入这个partitionclass:
… … Properties props = new Properties(); props.put("partitioner.class", partitionerClassName); … … // other properties kafka.producer.ProducerConfig config = newkafka.producer.ProducerConfig(props); kafka.javaapi.producer.Producer producer = new kafka.javaapi.producer.Producer(config); … … KeyedMessage<String, String> data = new KeyedMessage<String,String>(topic, message);
producer.send(data); … …
这样做之后,所有publish到Kafkatopic中的message便平均分配到了16个partition,在sparkstreamingjob中被读取出之后也就是均匀分布到了16个executor core中运行。达到了load balance的效果。