前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用Spark 实现数据的采集、清洗、存储和分析

利用Spark 实现数据的采集、清洗、存储和分析

原创
作者头像
brzhang
修改2024-01-28 17:23:52
4790
修改2024-01-28 17:23:52
举报
文章被收录于专栏:玩转全栈玩转全栈

学习本文,你将了解spark是干啥的,以及他的核心的特性是什么,然后了解这些核心特性的情况下,我们会继续学习,如何使用spark进行数据的采集/清洗/存储/和分析。

spark是干啥的

Apache Spark 是一个用于大规模数据处理的统一分析引擎。它提供了高级的数据分析能力和支持多种数据源的灵活性。Spark 的主要特点包括:

  • 快速处理:利用内存计算,Spark 能够快速处理大量数据。
  • 易于使用:提供了 Scala、Java、Python 和 R 等多种编程语言的接口,本文为了简单,使用Python进行示例的讲解,因为我已经装了Python的环境。
  • 多组件支持:包括 Spark SQL(用于处理结构化数据)、Spark Streaming(用于处理实时数据)、MLlib(机器学习库)和 GraphX(图计算框架)。

当然,做大规模数据处理的引擎可不止spark一个,还有 hadoop MapReduce,Flink等等,了解一下他们的特性,对于我们做项目选型也是非常重要的。

特性/框架

Apache Spark

Hadoop MapReduce

Apache Flink

Apache Storm

处理速度

快(内存计算)

较慢(磁盘计算)

快(流处理)

快(实时流处理)

实时处理

微批处理

不适合

低延迟真实时处理

低延迟实时处理

易用性

高(支持多种语言)

一般(主要 Java)

一般(需了解流处理概念)

一般(低级 API)

生态系统

丰富(SQL、MLlib等)

有限(Hadoop生态系统)

一般

一般

处理模型

基于 RDD

基于 MapReduce 模型

基于数据流

基于数据流

内存管理

JVM 管理

JVM 管理

自有内存管理系统

JVM 管理

容错性

一般

适用场景

大数据批处理、复杂分析

大批量数据处理

需要真实时处理的场景

低延迟实时处理需求

一个demo,使用spark做数据采集,清洗,存储,分析

好吧,废话也不在多说了,开始我们的demo环节了,Spark 可以从多种数据源(例如 HDFS、Cassandra、HBase 和 S3)读取数据,对于数据的清洗包括过滤、合并、格式化转换,处理后的数据可以存储回文件系统、数据库或者其他数据源,最后的工序就是用存储的清洗过的数据进行分析了。

假设我们有一个 CSV 格式的数据文件,其中包含了用户的信息,比如姓名、年龄和国籍。我们的目标是读取这个文件,清洗数据(比如去除无效或不完整的记录),并对年龄进行平均值计算,最后将处理后的数据存储到一个新的文件中。

其中有一些异常数据是需要我们清洗的,数据格式如下图所示:

代码环节:数据读取,从一个原始的 csv 文件里面读取,清洗是对一些脏数据进行清洗,这里是清理掉年龄为负数的项目,数据分析是看看这些人群的平均年龄。

代码语言:javascript
复制
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean
# 初始化 Spark 会话
spark = SparkSession.builder.appName("UserDataAnalysis").getOrCreate()
# 读取 CSV 文件
df = spark.read.csv("users.csv", header=True, inferSchema=True)

print(df.show())
# 清洗数据,例如去除年龄为 null 或非正常值的行
df_clean = df.filter(df.Age > 0)

print(df_clean.show())

# 计算年龄的平均值
avg_age = df_clean.select(mean("Age")).collect()[0][0]
print(f"Average age: {avg_age}")

# 将处理后的数据存储为新的 CSV 文件
# df_clean.write.csv("result.csv", header=True)

# 关闭 Spark 会话
spark.stop()

执行一下看看:

这里,可以看到,我们讲异常数据首先讲异常数据清理掉,然后使用 avg_age = df_clean.select(mean("Age")).collect()[0][0] 计算了一下平均年龄,符合预期。至于数据的存储,我们可以直接以csv的方式存在本地。df_clean.write.csv("result.csv", header=True)

以下是我存储的清洗后的数据的一个示例:

总结

本文这个例子对于 spark 来说应该算是高射炮打文字了,spark 在做数据清洗上绝对不是仅仅这么点刷子,我们这里使用 spark sql 对结构化数据做了简单的清洗,你可能了解过,我们还可以使用 Spark MLlib 或 Spark ML 来进行数据质量检查和数据 profiling,以识别数据中的异常值、离群值、噪声等问题。另外对于数据分析,我们可以使用 Spark MLlib 或 Spark ML 来进行机器学习和统计分析,如回归、分类、聚类、降维等,甚至使用 Spark GraphX 来进行图数据分析,如社区检测、页面排名等。另外,在数据可视化方面, Spark 连接外部可视化工具,如 Tableau、PowerBI、QlikView 等,来可视化数据。

因此本文就是一个 spark 入门而已,门槛很低的那种,一些高端的玩法还需要下点功夫去探索。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • spark是干啥的
  • 一个demo,使用spark做数据采集,清洗,存储,分析
  • 总结
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档