前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spark篇】---SparkStreaming算子操作transform和updateStateByKey

【Spark篇】---SparkStreaming算子操作transform和updateStateByKey

作者头像
LhWorld哥陪你聊算法
发布2018-09-13 14:16:47
1.1K0
发布2018-09-13 14:16:47
举报

一、前述

今天分享一篇SparkStreaming常用的算子transform和updateStateByKey。

  • 可以通过transform算子,对Dstream做RDD到RDD的任意操作。其实就是DStream的类型转换。

算子内,拿到的RDD算子外,代码是在Driver端执行的,每个batchInterval执行一次,可以做到动态改变广播变量。

  • 为SparkStreaming中每一个Key维护一份state状态,通过更新函数对该key的状态不断更新。

二、具体细节

        1、transform 是一个transformation类算子

package com.spark.sparkstreaming;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.base.Optional;

import scala.Tuple2;
/**
 * 过滤黑名单
 * transform操作
 * DStream可以通过transform做RDD到RDD的任意操作。
 * @author root
 *
 */
public class TransformOperator {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]").setAppName("transform");
        JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
        
        //模拟黑名单
        List<Tuple2<String,Boolean>> blackList = new ArrayList<Tuple2<String,Boolean>>();
        blackList.add(new Tuple2<String,Boolean>("zhangsan",true));
        //将黑名单转换成RDD
        final JavaPairRDD<String, Boolean> blackNameRDD = jsc.sparkContext().parallelizePairs(blackList);
        
        //接受socket数据源
        JavaReceiverInputDStream<String> nameList = jsc.socketTextStream("node5", 9999);
        JavaPairDStream<String, String> pairNameList = 
                nameList.mapToPair(new PairFunction<String, String, String>() {

            /**
             *这块代码在Driver端执行。
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, String> call(String s) throws Exception {
                return new Tuple2<String, String>(s.split(" ")[1], s);
            }
        });
        JavaDStream<String> transFormResult =
                pairNameList.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String>>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public JavaRDD<String> call(JavaPairRDD<String, String> nameRDD)
                    throws Exception {
                /**
                 * nameRDD:
                 *   ("zhangsan","1 zhangsan")
                 *   ("lisi","2 lisi")
                 *   ("wangwu","3 wangwu")
                 * blackNameRDD:
                 *   ("zhangsan",true)
                 *   
                 * ("zhangsan",("1 zhangsan",[true]))
                 * 
                 */
                JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> leftOuterJoin = 
                        nameRDD.leftOuterJoin(blackNameRDD);
                //打印下leftOuterJoin
                /*leftOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Optional<Boolean>>>>() {
                    
                    *//**
                     * 
                     *//*
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Tuple2<String, Tuple2<String, Optional<Boolean>>> t)
                            throws Exception {
                        System.out.println(t);
                    }
                });*/
                
                
                //过滤:true的留下,false的过滤
                //("zhangsan",("1 zhangsan",[true]))
                JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filter = 
                        leftOuterJoin.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)throws Exception {
                        if(tuple._2._2.isPresent()){
                            return !tuple._2._2.get();
                        }
                        return true;
                    }
                });
                
                JavaRDD<String> resultJavaRDD = filter.map(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public String call(
                            Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                            throws Exception {
                        
                        return tuple._2._1;
                    }
                });
                
                //返回过滤好的结果
                return resultJavaRDD;
            }
        });
        
        transFormResult.print();
        
        jsc.start();
        jsc.awaitTermination();
        jsc.stop();
    }
}

 2、UpdateStateByKey算子(相当于对不同批次的累加和更新)

UpdateStateByKey的主要功能:
 * 1、为Spark Streaming中每一个Key维护一份state状态,state类型可以是任意类型的, 可以是一个自定义的对象,那么更新函数也可以是自定义的。
 * 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新

*  使用到updateStateByKey要开启checkpoint机制和功能。

*   多久会将内存中的数据写入到磁盘一份?

         如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份。

 java代码:

package com.spark.sparkstreaming;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.base.Optional;

import scala.Tuple2;

/**
 * UpdateStateByKey的主要功能:
 * 1、为Spark Streaming中每一个Key维护一份state状态,state类型可以是任意类型的, 可以是一个自定义的对象,那么更新函数也可以是自定义的。
 * 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新
 * 
 * hello,3
 * spark,2
 * 
 * 如果要不断的更新每个key的state,就一定涉及到了状态的保存和容错,这个时候就需要开启checkpoint机制和功能 
 * 
 * 全面的广告点击分析
 * @author root
 *
 * 有何用?   统计广告点击流量,统计这一天的车流量,统计点击量
 */

public class UpdateStateByKeyOperator {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyDemo");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
        /**
         * 设置checkpoint目录
         * 
         * 多久会将内存中的数据(每一个key所对应的状态)写入到磁盘上一份呢?
         *     如果你的batch interval小于10s  那么10s会将内存中的数据写入到磁盘一份
         *     如果bacth interval 大于10s,那么就以bacth interval为准
         * 
         * 这样做是为了防止频繁的写HDFS
         */
        JavaSparkContext sparkContext = jsc.sparkContext();
        sparkContext.setCheckpointDir("./checkpoint");
        
//         jsc.checkpoint("hdfs://node1:9000/spark/checkpoint");
//         jsc.checkpoint("./checkpoint");
         
        JavaReceiverInputDStream<String> lines = jsc.socketTextStream("node5", 9999);

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String s) {
                return Arrays.asList(s.split(" "));
            }
        });

        JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairDStream<String, Integer> counts = 
                ones.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
                /**
                 * values:经过分组最后 这个key所对应的value  [1,1,1,1,1]
                 * state:这个key在本次之前之前的状态
                */
                Integer updateValue = 0 ;
                 if(state.isPresent()){
                     updateValue = state.get();
                 }
                 
                 for (Integer value : values) {
                     updateValue += value;
                }
                return Optional.of(updateValue);
            }
        });

 scala代码:

package com.bjsxt.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.StreamingContext

object Operator_UpdateStateByKey {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[2]").setAppName("updateStateByKey")
    val jsc = new StreamingContext(conf,Durations.seconds(5))
    //设置日志级别
    jsc.sparkContext.setLogLevel("WARN")
    //设置checkpoint路径
    jsc.checkpoint("hdfs://node1:9000/spark/checkpoint")
    
    val lineDStream = jsc.socketTextStream("node5", 9999)
    val wordDStream = lineDStream.flatMap { _.split(" ") }
    val pairDStream = wordDStream.map { (_,1)}
    
    val result = pairDStream.updateStateByKey((seq:Seq[Int],option:Option[Int])=>{
      var value = 0
      value += option.getOrElse(0)
      for(elem <- seq){
        value +=elem
      }
      
     Option(value)
    })
    
    result.print()
    jsc.start()
    jsc.awaitTermination()
    jsc.stop()
  }
}

 结果:

 可见从启动以来一直维护这个累加状态!!!

 2、windows窗口函数(实现一阶段内的累加 ,而不是程序启动时)

  假设每隔5s 1个batch,上图中窗口长度为15s,窗口滑动间隔10s。

窗口长度和滑动间隔必须是batchInterval的整数倍。如果不是整数倍会检测报错。

优化后的window操作要保存状态所以要设置checkpoint路径,没有优化的window操作可以不设置checkpoint路径。

package com.spark.sparkstreaming;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

/**
 * 基于滑动窗口的热点搜索词实时统计
 * @author root
 *
 */
public class WindowOperator {
    
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("WindowHotWord"); 
        
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        /**
         * 设置日志级别为WARN
         *
         */
        jssc.sparkContext().setLogLevel("WARN");
        /**
         * 注意:
         *  没有优化的窗口函数可以不设置checkpoint目录
         *  优化的窗口函数必须设置checkpoint目录         
         */
//           jssc.checkpoint("hdfs://node1:9000/spark/checkpoint");
           jssc.checkpoint("./checkpoint");
        JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream("node04", 9999);
        //word    1
        JavaDStream<String> searchWordsDStream = searchLogsDStream.flatMap(new FlatMapFunction<String, String>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String t) throws Exception {
                return Arrays.asList(t.split(" "));
            }
        });
        
        // 将搜索词映射为(searchWord, 1)的tuple格式
        JavaPairDStream<String, Integer> searchWordPairDStream = searchWordsDStream.mapToPair(
                
                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String searchWord)
                            throws Exception {
                        return new Tuple2<String, Integer>(searchWord, 1);
                    }
                    
                });
        /**
         * 每隔10秒,计算最近60秒内的数据,那么这个窗口大小就是60秒,里面有12个rdd,在没有计算之前,这些rdd是不会进行计算的。
         * 那么在计算的时候会将这12个rdd聚合起来,然后一起执行reduceByKeyAndWindow操作 ,
         * reduceByKeyAndWindow是针对窗口操作的而不是针对DStream操作的。
         */
            JavaPairDStream<String, Integer> searchWordCountsDStream = 
                
                searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }
        }, Durations.seconds(15), Durations.seconds(5)); //窗口长度,滑动间隔
        
        
        /**
         * window窗口操作优化:不用设置checkpoint目录。
         */
//         JavaPairDStream<String, Integer> searchWordCountsDStream = 
//        
//         searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
//
//            private static final long serialVersionUID = 1L;
//
//            @Override
//            public Integer call(Integer v1, Integer v2) throws Exception {
//                return v1 + v2;
//            }
//            
//        },new Function2<Integer, Integer, Integer>() {
//
//            private static final long serialVersionUID = 1L;
//
//            @Override
//            public Integer call(Integer v1, Integer v2) throws Exception {
//                return v1 - v2;
//            }
//            
//        }, Durations.seconds(15), Durations.seconds(5));    

          searchWordCountsDStream.print();
        
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }

}

 Scala代码:

package com.bjsxt.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.StreamingContext

object Operator_Window {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[2]").setAppName("updateStateByKey")
    val jsc = new StreamingContext(conf,Durations.seconds(5))
    //设置日志级别
    jsc.sparkContext.setLogLevel("WARN")
    //设置checkpoint路径
    jsc.checkpoint("hdfs://node1:9000/spark/checkpoint")
    val lineDStream = jsc.socketTextStream("node04", 9999)
    val wordDStream = lineDStream.flatMap { _.split(" ") }
    val mapDStream = wordDStream.map { (_,1)}
    
    
    //window没有优化后的
    val result = mapDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=>{
        v1+v2
      }, Durations.seconds(60), Durations.seconds(10))
      
   //优化后的
//   val result = mapDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=>{
//       v1+v2
//     }, (v1:Int,v2:Int)=>{
//       v1-v2
//     }, Durations.seconds(60), Durations.seconds(10))

    result.print()
    jsc.start()
    jsc.awaitTermination()
    jsc.stop()
  }
}

 结果:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-02-09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档