前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark笔记15-Spark数据源及操作

Spark笔记15-Spark数据源及操作

作者头像
皮大大
发布2021-03-02 15:41:16
7480
发布2021-03-02 15:41:16
举报
文章被收录于专栏:机器学习/数据可视化

数据输入源

Spark Streaming中的数据来源主要是

  • 系统文件源
  • 套接字流
  • RDD对列流
  • 高级数据源Kafka
文件流
  • 交互式环境下执行
代码语言:javascript
复制
# 创建文件存放的目录
cd /usr/loca/spark/mycode
mkdir streaming
cd streaming
mkdir logfile
cd logfile   # 对这个子目录进行数据监控
代码语言:javascript
复制
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)   # 每10秒监听;交互式环境下自带sc实例对象
lines = ssc.textFileStream(".../logfile")  # 创建文件流,监控目录的全称地址
words = lines.flatMap(lambda line:line.split(' '))  # 通过flatMap操作将数据进行lambda操作,再进行拍平
wordCounts = words.map(lambda x:(x,1)).reduceByKey(lambda a,b: a+b)
wordCounts.pprint()  # 在交互式环境下查看
ssc.start()   # 启动流计算
ssc.awaitTermination()  # 等待流计算结束
套接字流
创建客户端和服务端

tcp编程包含客户端和服务端,通信过程:

  • 服务端先进行端口的绑定,再进入监听和阻塞状态,等待来自客户端的连接
  • 客户端发送请求,连接到指定的端口号,服务端收到请求,完成通信过程

SparkStreaming扮演的是客户端的角色,不断的发送数据。

代码语言:javascript
复制
# 创建文件存放的目录
cd /usr/loca/spark/mycode
mkdir streaming
cd streaming
mkdir socket
cd socket
代码语言:javascript
复制
# vim NetworkWordCount.py:扮演的是客户端角色

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
  if len(sys.argv) != 3:   # 第一个参数默认是self
    print("Usage: NetworkWordCount.py<hostname><port>", file=sys.stderr)
    exit(-1)   # 参数长度不够,自动退出
  sc = SparkContext(appName="pythonStreamingNetworkWordCount")
  ssc = StreamingContext(sc, 1)   # 流计算的指挥官
  lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))  # 定义套接字类型的输入源
  counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).reduceByKey(lambda a,b: a+b)

  counts.pprint()
  ssc.start()
  ssc.awaitTermination()


# 服务端的角色
# 在linux中:nc -lk 9999
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
代码语言:javascript
复制
# 使用socket编程实现自定义数据源
# DataSourceSocket.py

import socket
server = socket.socket()  #  生成对象
server.bind("localhose", 9999)   # 设置监听的机器和端口号
server.listen(1)
while 1:
  conn,addr = server.accept()  # 使用两个值进行接受
  print("connect success! connection is from %s" %addr[0])
  print("sending data....")
  conn.send("I love hadoop  I love spark hadoop is good spark is fast".encode())  # 打印正在传输的数据
  conn.close()
  print("connection is broken.")
如何启动
代码语言:javascript
复制
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit DataSourceSocket.py

# 启动客户端
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
RDD队列流
代码语言:javascript
复制
cd /usr/local/spark/mycode/streaming/rddqueue
/usr/local/spark/bin/spark-submit RDDQueueStream.py
代码语言:javascript
复制
# RDDQueueStream.py

import time
from pyspark import SparkContext
from pyspark.Streaming import StreamingContext

if __name__ == "__main__":
  sc = sparkContext(appName="pythonStreamingQueueStream")
  ssc = StreamingContext(sc, 2)   # 数据流指挥官的生成
  rddQueue = []
  for i in range(5):
    rddQueue += [ssc.sparkContext.parallelize[j for j in range(1,1001)], 10]   #. 创建RDD队列流
    time.sleep(1)

  inputStream = ssc.queueStream(rddQueue)
  mappedStream = inputStream.map(lambda x:(x %10, 1))
  reduceStream = mappedStream.reduceByKey(lambda a,b: a + b)
  reduceStream.pprint()
  ssc.start()
  ssc.stop(stopSparkContext=True, stopGraceFully=True)
Kafka(Apache)
功能

不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统等)可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实现高效交换

信息传递的枢纽,主要功能是:

  • 高吞吐量的分布式发布订阅消息系统
  • 同时满足在线实时处理和批量离线处理
组件
  • Broker:一个或者多个服务器
  • Topic:每条消息发布到Kafka集群的消息都有一个类别,这个类别就是Topic
    • 不同的topic消息分开存储
    • 用户不必关心数据存放位置,只需要指定消息的topic即可产生或者消费数据
  • partition:每个topic分布在一个或者多个分区上
  • Producer:生产者,负责发布消息
  • Consumer:向Broker读取消息额客户端
  • Consumer Group:所属组

Kafka的运行是依赖于Zookeeper

启动Kafka
spark 配置

先下载jar包:

代码语言:javascript
复制
# 将下载解压后的jar包拷贝到spark的jars目录下
cd /usr/local/spark/jars
mkdir kafka
cd ~
cp ./spark-streaming-kafka-0.8_2.11-2.4.0.jar /usr/local/spark/jars/kafka

# 将Kafka安装目录下的libs目录下的所有文件复制到spark的jars目录下
cd /usr/local/kafka/libs
cp ./* /usr/local/spark/jars/kafka   # 进入libs目录后,将当权目录下的所有文件进行拷贝
修改spark配置文件
代码语言:javascript
复制
cd /usr/local/spark/conf
vim spark-env.sh
kafka数据源
代码语言:javascript
复制
# kafkaWordCount.py

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
  if len(sys.argv) != 3:   # 第一个参数默认是self
    print("Usage: kafkaWordCount.py<hostname><port>", file=sys.stderr)
    exit(-1)   # 参数长度不够,自动退出
  sc = SparkContext(appName="pythonStreamingKafkaWordCount")
  ssc = StreamingContext(sc, 1)   # 流计算的指挥官

  zkQuorum,topic = sys.argv[1:]
	kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1})   # 建立数据源

  lines = kvs.map(lambda x:x[1])
  counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).reduceByKey(lambda a,b: a+b)    # 第二个 map 函数的作用是形成键值对,因为 reduceByKeyd 的参数必须是键值对

  counts.pprint()
  ssc.start()
  ssc.awaitTermination()
执行过程
代码语言:javascript
复制
cd /usr/local/spark/mycode/streaming/kafka
/usr/local/spark/bin/spark-submit ./kafkaWordCount.py locaohost:2181 wordsendertest   # 2181是ZK服务器的地址,wordsendertest是topic的名称
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-11-2,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据输入源
    • 文件流
      • 套接字流
        • 创建客户端和服务端
        • 如何启动
      • RDD队列流
        • Kafka(Apache)
          • 功能
          • 组件
          • 启动Kafka
        • spark 配置
          • 修改spark配置文件
        • kafka数据源
          • 执行过程
          相关产品与服务
          数据库
          云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档