首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spark在写入elasticsearch时不支持arraylist吗?

Spark在写入elasticsearch时不支持arraylist吗?
EN

Stack Overflow用户
提问于 2015-07-14 23:15:17
回答 3查看 2.4K关注 0票数 4

我有以下结构:

代码语言:javascript
运行
复制
mylist = [{"key1":"val1"}, {"key2":"val2"}]
myrdd = value_counts.map(lambda item: ('key', { 
    'field': somelist 
}))

我收到错误: 15/02/10 15:54:08信息scheduler.TaskSetManager:在执行器ip-10-80-15-145.ec2.阶段2.0 (TID 6)中丢失任务1.0。内部: org.apache.spark.SparkException ( java.util.ArrayList类型的数据不能使用)重复1

代码语言:javascript
运行
复制
rdd.saveAsNewAPIHadoopFile( 
            path='-', 
            outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
            keyClass="org.apache.hadoop.io.NullWritable", 
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
            conf={ 
        "es.nodes" : "localhost", 
        "es.port" : "9200", 
        "es.resource" : "mboyd/mboydtype" 
    }) 

我希望文档在写入ES时最终是这样的:

代码语言:javascript
运行
复制
{
field:[{"key1":"val1"}, {"key2":"val2"}]
}
EN

回答 3

Stack Overflow用户

发布于 2015-11-06 00:35:08

这个游戏有点晚了,但这是我们昨天遇到这个问题后提出的解决方案。将'es.input.json': 'true'添加到您的conf中,然后对数据运行json.dumps()

修改您的示例,如下所示:

代码语言:javascript
运行
复制
import json

rdd = sc.parallelize([{"key1": ["val1", "val2"]}])
json_rdd = rdd.map(json.dumps)
json_rdd.saveAsNewAPIHadoopFile( 
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf={ 
        "es.nodes" : "localhost", 
        "es.port" : "9200", 
        "es.resource" : "mboyd/mboydtype",
        "es.input.json": "true"
    }
) 
票数 3
EN

Stack Overflow用户

发布于 2016-05-23 22:00:31

刚刚遇到了这个问题,解决方案通过将所有列表转换为元组来解决。转换为json也是如此。

票数 3
EN

Stack Overflow用户

发布于 2018-06-20 15:14:43

我觉得在其他答案中有几点缺失,比如你必须从你的RDD返回一个2元组(我不知道为什么),并且还需要Elasticsearch hadoop jar文件来使其工作。因此,我将编写我必须遵循的整个过程,以使其工作。

  1. 下载Elasticsearch Hadoop jar文件。您可以从central maven repository下载它(最新版本在大多数情况下都可以使用-查看他们的official requirements README了解更多信息)。
  2. 使用以下最少的代码片段为演示创建一个run.py文件。

import json import pymongo_spark pymongo_spark.activate() from pyspark import SparkContext,SparkConf conf = SparkConf().setAppName('demo').setMaster('local') sc = SparkContext(conf=conf) rdd = sc.parallelize([{"key1":"val1","val2"}]) final_rdd = rdd.map(json.dumps).map(lambda x:('key',x)) final_rdd.saveAsNewAPIHadoopFile( path='-',pathkeyClass="org.apache.hadoop.io.NullWritable",valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",conf={ "es.nodes“:"","es.port”:"9200","es.resource“:”索引名称/文档类型名称“,"es.input.json":"true“})使用以下命令./bin/spark-submit --jars /path/to/your/jar/file/elasticsearch-hadoop-5.6.4.jar --driver-class-path /path/to/you/jar/file/elasticsearch-hadoop-5.6.4.jar --master yarn /path/to/your/run/file/run.py运行

  • 作业

哈!

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31410608

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档