Spark教程(二)Spark连接MongoDB

如何导入数据

数据可能有各种格式,虽然常见的是HDFS,但是因为在Python爬虫中数据库用的比较多的是MongoDB,所以这里会重点说说如何用spark导入MongoDB中的数据。

当然,首先你需要在自己电脑上安装spark环境,简单说下,在这里下载spark,同时需要配置好JAVAScala环境。

这里建议使用Jupyter notebook,会比较方便,在环境变量中这样设置

PYSPARK_DRIVER_PYTHON=jupyter 
PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

如果你的环境中有多个Python版本,同样可以制定你想要使用的解释器,我这里是python36,根据需求修改。

PYSPARK_PYTHON=/usr/bin/python36

启动命令

进入spark根目录,./bin/pyspark这是最简单的启动命令,默认会打开Python的交互式解释器,但是由于我们上面有设置过,会打开Jupyter notebook,接下来变成会方便很多。

先来看看最简单的例子:

>>> textFile = spark.read.text("README.md")

>>> textFile.count()  # Number of rows in this DataFrame126

>>> textFile.first()  # First row in this DataFrameRow(value=u'# Apache Spark')

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))


>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?15

这里有我之前写过的例子,可以照着写一遍 basic_exercise

我们的启动方式是./bin/pyspark,我们可以家后面加很多参数,比如说如若我们要连接MongoDB,就需要这样

完整的可以参考Spark Connector Python Guide

./bin/pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
              --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection" \
              --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0

这里有两个uri,分别是inputoutput,对应读取的数据库和写入的数据库,最后面的packages相当于引入的包的名字,我一般喜欢在代码中定义。

读取/保存数据

这里我们可以增加参数option,在这里设置想要读取的数据库地址,注意格式。

读取数据

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/people.contacts").load()

保存数据

people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("uri","mongodb://127.0.0.1/people.contacts").option("database","people").option("collection", "contacts").save()

简单对比下,option还可以定义databasecollection,这样就不需要在启动Spark时定义。

以上是官网推荐的连接方式,这里需要说的是另一种,如果我没有从命令行中启动,而是直接新建一个py文件,该如何操作?

搜索相关资料后,发现是这样

#!/usr/bin/env python
# -*- coding: utf-8 -*-

__author__ = 'zhangslob'

import os
from pyspark.sql import SparkSession

# set PYSPARK_PYTHON to python36
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36'

# load mongo data
input_uri = "mongodb://127.0.0.1:spark.spark_test"
output_uri = "mongodb://127.0.0.1:spark.spark_test"

my_spark = SparkSession\
    .builder\
    .appName("MyApp")\
    .config("spark.mongodb.input.uri", input_uri)\
    .config("spark.mongodb.output.uri", output_uri)\
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.11:2.2.0')\
    .getOrCreate()

df = my_spark.read.format('com.mongodb.spark.sql.DefaultSource').load()

必须要增加默认设置('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.11:2.2.0'),否则会报错。

原文发布于微信公众号 - Python爬虫与算法进阶(zhangslob)

原文发表时间:2018-09-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏会跳舞的机器人

RabbitMQ简介以及应用

2、Queue:队列,rabbitmq的内部对象,用于存储消息,其属性类似于Exchange,同样可以设置是否持久化、自动删除等。 消费者重Queue中获取消息...

15720
来自专栏王二麻子IT技术交流园地

《跟我学IDEA》四、配置模板(提高代码编写效率)

上一篇博文,我们学习了idea的一些实用配置,相信大家也对idea这个开发工具有了一个大概的了解。今天我们来学习模板的配置,idea提供很多模板从而提高编写代码...

96270
来自专栏架构师小秘圈

HBase极简教程

HBase 系统架构 HBase是Apache Hadoop的数据库,能够对大型数据提供随机、实时的读写访问。HBase的目标是存储并处理大型的数据。HBase...

51060
来自专栏一个爱瞎折腾的程序猿

asp.net core使用Swashbuckle.AspNetCore(swagger)生成接口文档

开局一张图,然后开始编,一些基本的asp.net core东西就不再赘述,本文只对Swashbuckle.AspNetCore的几个使用要点进行描述。

18610
来自专栏分布式系统进阶

Kafka源码分析-启动流程

使用getPropsFromArgs方法来获取各配置项, 然后将启动和停止动作全部代理给KafkaServerStartable类;

22600
来自专栏Ken的杂谈

ASP.NET(C#) 发送邮件帮助类Mailhelper

21810
来自专栏极客慕白的成长之路

光棍节程序员闯关秀过关攻略

查看源代码,发现a标签的颜色和背景色一样,导致了我们看不到a过关地址,方法很简单,直接复制源代码中的key即可,或者ctrl+a然后单击进入下一关即可

14150
来自专栏Hadoop实操

如何使用HBase存储文本文件

54830
来自专栏小灰灰

报警系统QuickAlarm使用手册

本片将主要说明QuickAlarm该如何使用,以及使用时需要注意事项 1. 基本使用姿势 首先我们不做任何的自定义操作,全部依靠系统默认的实现,我们的使用步骤如...

440160
来自专栏Jerry的SAP技术分享

如何处理错误信息 Pricing procedure could not be determined

当给一个SAP CRM Quotation文档的行项目维护一个产品时,遇到如下错误信息:Pricing procedure could not be deter...

28180

扫码关注云+社区

领取腾讯云代金券