我有一个队列,我正在使用Spark Streaming从队列中读取数据。我需要将这些数据(在一些计算之后)写入2个不同的表(两个表的计算是不同的)。我注意到我的代码只执行第一个JavaPairDStream,而不执行另一个。我在两个PairStreams上都有输出操作。
我还注意到,如果队列中的数据很小-比方说只有一条消息,那么它会被插入到两个表中,但当队列中的数据很大时就不是这样了
我的代码如下所示-
public static void main(String ar[]) {
JavaReceiverInputDStream<String> receiverStream = RabbitMQUtils.createJavaStream(streamCtx, String.class, rabbitMqConParams, messageHandler);
//this is first pair stream
JavaPairDStream<String, Integer> map1 = receiverStream.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {...}
JavaPairDStream<String, Integer> red1 = map1.reduceByKey(new Function2<Integer, Integer, Integer>() {...}
red1.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
//i write to hbase from here to "x" table
}
//this is second pair stream
JavaPairDStream<String, Integer> map2 = receiverStream.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {...}
JavaPairDStream<String, Integer> red2 = map2.reduceByKey(new Function2<Integer, Integer, Integer>() {...}
red2.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
//i write to hbase from here to "y" table
}
}
这有可能吗?
发布于 2016-09-08 03:30:08
上面的代码没有问题。对于我的一个用例,我也在做同样的事情。
可能的情况是,在HBase
中写入数据的代码需要花费一些时间。
您能否尝试在SparkConf
中设置以下属性以控制流输入速率。它将帮助您调试该问题。
// In case if reading from Kafka
conf.set("spark.streaming.kafka.maxRatePerPartition", 100);
// In case of other streams
conf.set("spark.streaming.receiver.maxRate", 100);
上述属性控制每个接收器将接收数据的接收器的最大输入速率(每秒的记录数)。实际上,每个流每秒最多只能使用100
记录。有关流配置的更多详细信息,请参阅http://spark.apache.org/docs/latest/configuration.html#spark-streaming。
https://stackoverflow.com/questions/39380295
复制