前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Streaming自定义Receivers

Spark Streaming自定义Receivers

作者头像
岑玉海
发布2018-02-28 17:18:53
6510
发布2018-02-28 17:18:53
举报
文章被收录于专栏:岑玉海岑玉海

自定义一个Receiver

代码语言:javascript
复制
 class SocketTextStreamReceiver(host: String, port: Int(
         extends NetworkReceiver[String]
       {
         protected lazy val blocksGenerator: BlockGenerator =
           new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)

         protected def onStart() = {
           blocksGenerator.start()
           val socket = new Socket(host, port)
           val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
           var data: String = dataInputStream.readLine()
           while (data != null) {
             blocksGenerator += data
             data = dataInputStream.readLine()
           }
         }

         protected def onStop() {
           blocksGenerator.stop()
         }
       }

An Actor as Receiver

代码语言:javascript
复制
 class SocketTextStreamReceiver (host:String,
         port:Int,
         bytesToString: ByteString => String) extends Actor with Receiver {

          override def preStart = IOManager(context.system).connect(host, port)

          def receive = {
           case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
         }

       }

A Sample Spark Application

代码语言:javascript
复制
  val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
      Seconds(batchDuration))
  //使用自定义的receiver
  val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
      "localhost", 8445))

  //或者使用这个自定义的actor Receiver
  val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
      "localhost",8445, z => z.utf8String)),"SocketReceiver") */

提交成功之后,启动Netcat测试一下

代码语言:javascript
复制
$ nc -l localhost 8445 hello world hello hello

下面是合并多个输入流的方法:

代码语言:javascript
复制
  val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
      "localhost",8445, z => z.utf8String)),"SocketReceiver")

  // Another socket stream receiver
  val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
      "localhost",8446, z => z.utf8String)),"SocketReceiver")

  val union = lines.union(lines2)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2014-03-03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 自定义一个Receiver
  • An Actor as Receiver
  • A Sample Spark Application
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档