首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >foreachRDD在Twitter API的J8 Spark Streaming中为每个RDD提取平均单词数和字符数

foreachRDD在Twitter API的J8 Spark Streaming中为每个RDD提取平均单词数和字符数
EN

Stack Overflow用户
提问于 2021-05-02 23:13:29
回答 2查看 111关注 0票数 2

我正在尝试使用Java8中的spark从Twitter API中获取每个RDD中的平均单词数和字符数。然而,我在使用streams来实现这一点时遇到了问题。我的代码如下:

代码语言:javascript
运行
复制
//Create the stream.
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Outputs the text of tweets to a JavaDStream.
JavaDStream<String> statuses = twitterStream.map(Status::getText);
//Get the average number of words & characters in each RDD pulled during streaming.
statuses.foreachRDD(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            System.out.println(wc / c);
            System.out.println(cc / c);
        return avgWc, avgCc;});

我得到的错误是foreachRDD期望的返回类型是空的,而我的返回是一个长格式。

我怎么才能避免这个问题呢?有没有其他方法可以解决这个问题?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-05-03 04:12:41

一种可能的解决方案是使用JavaDStream.transform。此函数允许停留在SparkStreaming-API中:

代码语言:javascript
运行
复制
JavaDStream<String> statuses = ...
JavaDStream<Tuple2<Long, Long>> avgs = statuses.transform(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            //System.out.println(wc / c);
            //System.out.println(cc / c);
            return jssc.sparkContext().parallelize(Collections.singletonList(Tuple2.apply(avgWc, avgCc)));
        }
);
avgs.print();
票数 1
EN

Stack Overflow用户

发布于 2021-05-03 02:19:09

如果返回类型为void,则不可能返回数据。您可以在"foreachRDD“函数之外创建一个列表,并传递如下所示的值:

代码语言:javascript
运行
复制
List<Data> listData=new ArrayList();
statuses.foreachRDD(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            System.out.println(wc / c);
            System.out.println(cc / c);
            Data data=new Data();
            data.setAvgCc(avgCc);
            data.setAvgWc(avgWc);
            listData.add(data);
        });

Data是一个具有两个变量avgCc和AvgWc的类,如下所示

代码语言:javascript
运行
复制
public class Data {
    long avgWc;
    long avgCc;
    public long getAvgWc() {
        return avgWc;
    }
    public void setAvgWc(long avgWc) {
        this.avgWc = avgWc;
    }
    public long getAvgCc() {
        return avgCc;
    }
    public void setAvgCc(long avgCc) {
        this.avgCc = avgCc;
    }
    public Data(long avgWc, long avgCc) {
        super();
        this.avgWc = avgWc;
        this.avgCc = avgCc;
    }
    public Data() {
        super();
    }
}

如果有帮助,请让我知道。或者你需要更多的澄清。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67357935

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档