我正试图按如下方式将spark (pyspark)连接到mongodb:
conf = SparkConf()
conf.set('spark.mongodb.input.uri', default_mongo_uri)
conf.set('spark.mongodb.output.uri', default_mongo_uri)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = SparkSession \
.builder \
.appName("my-app") \
.config("spark.mongodb.input.uri", default_mongo_uri) \
.config("spark.mongodb.output.uri", default_mongo_uri) \
.getOrCreate()但当我做以下事情时:
users = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
.option("uri", '{uri}.{col}'.format(uri=mongo_uri, col='users')).load()我知道这个错误:
java.lang.ClassNotFoundException:未能找到数据源: com.mongodb.spark.sql.DefaultSource
我也做了同样的事情,我从火花公子壳,我能够检索数据。这是我运行的命令:
pyspark --conf "spark.mongodb.input.uri=mongodb_uri" --conf "spark.mongodb.output.uri=mongodburi" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2但是在这里我们可以选择指定我们需要使用的包。但是独立的应用程序和脚本呢?我如何配置芒果火花连接器那里。
有什么想法吗?
发布于 2019-01-01 16:58:01
这是我在木星笔记本上做的:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
working_directory = 'jars/*'
my_spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection") \
.config("spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection") \
.config('spark.driver.extraClassPath', working_directory) \
.getOrCreate()
people = my_spark.createDataFrame([("JULIA", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", 22)], ["name", "age"])
people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.select('*').where(col("name") == "JULIA").show()因此,您将看到以下内容:

发布于 2019-07-11 06:27:20
如果您正在使用SparkContext & SparkSession,您已经在SparkConf中提到了连接器jar包,请检查以下代码:
from pyspark import SparkContext,SparkConf
conf = SparkConf().set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2")
sc = SparkContext(conf=conf)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
.config("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()如果只使用 SparkSession,则使用以下代码:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
.config("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017/sample1.zips") \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.2') \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()发布于 2019-02-20 16:55:53
我还遇到了同样的错误"java.lang.ClassNotFoundException:未能找到数据源: com.mongodb.spark.sql.DefaultSource“,同时试图从Spark2.3连接到MongoDB。
我不得不下载并复制mongo-火花连接器_2.11 JAR文件到jars目录中进行火花安装。
这解决了我的问题,我成功地调用了我的火花代码通过火花提交。
希望能帮上忙。
https://stackoverflow.com/questions/50963444
复制相似问题