我正在尝试使用Java8中的spark从Twitter API中获取每个RDD中的平均单词数和字符数。然而,我在使用streams来实现这一点时遇到了问题。我的代码如下:
//Create the stream.
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Outputs the text of tweets to a JavaDStream.
JavaDStream<String> statuses = twitterStream.map(Status::getText);
//Get the average number of words & characters in each RDD pulled during streaming.
statuses.foreachRDD(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
System.out.println(wc / c);
System.out.println(cc / c);
return avgWc, avgCc;});
我得到的错误是foreachRDD
期望的返回类型是空的,而我的返回是一个长格式。
我怎么才能避免这个问题呢?有没有其他方法可以解决这个问题?
发布于 2021-05-03 04:12:41
一种可能的解决方案是使用JavaDStream.transform。此函数允许停留在SparkStreaming-API中:
JavaDStream<String> statuses = ...
JavaDStream<Tuple2<Long, Long>> avgs = statuses.transform(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
//System.out.println(wc / c);
//System.out.println(cc / c);
return jssc.sparkContext().parallelize(Collections.singletonList(Tuple2.apply(avgWc, avgCc)));
}
);
avgs.print();
发布于 2021-05-03 02:19:09
如果返回类型为void,则不可能返回数据。您可以在"foreachRDD“函数之外创建一个列表,并传递如下所示的值:
List<Data> listData=new ArrayList();
statuses.foreachRDD(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
System.out.println(wc / c);
System.out.println(cc / c);
Data data=new Data();
data.setAvgCc(avgCc);
data.setAvgWc(avgWc);
listData.add(data);
});
Data是一个具有两个变量avgCc和AvgWc的类,如下所示
public class Data {
long avgWc;
long avgCc;
public long getAvgWc() {
return avgWc;
}
public void setAvgWc(long avgWc) {
this.avgWc = avgWc;
}
public long getAvgCc() {
return avgCc;
}
public void setAvgCc(long avgCc) {
this.avgCc = avgCc;
}
public Data(long avgWc, long avgCc) {
super();
this.avgWc = avgWc;
this.avgCc = avgCc;
}
public Data() {
super();
}
}
如果有帮助,请让我知道。或者你需要更多的澄清。
https://stackoverflow.com/questions/67357935
复制相似问题