首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将键/值对的Pyspark解析为.csv格式

将键/值对的Pyspark解析为.csv格式
EN

Stack Overflow用户
提问于 2017-08-02 15:41:58
回答 1查看 1.4K关注 0票数 1

我正在构建一个解析器,它接受"key"="value“对的原始文本文件,并使用PySpark写入tabular/..csv结构。

在我被困的地方,我可以访问函数中的键和值来构造每个csv_row,甚至可以检查键是否等于预期键(col_list)的列表,但是当我在lambda中调用函数processCsv时,我不知道如何将每个csv_row附加到用于保存.csv行最终列表的列表l_of_l的全局列表中。

如何以键/值格式遍历RDD的每个记录并解析为.csv格式?如您所见,我的最终列表(l_of_l)是空的,但是我可以在循环中得到每一行.令人沮丧。

感谢所有建议!

原始文本结构(foo.log):

代码语言:javascript
复制
"A"="foo","B"="bar","C"="baz"
"A"="oof","B"="rab","C"="zab"
"A"="aaa","B"="bbb","C"="zzz"

迄今的做法:

代码语言:javascript
复制
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import Row

sc=SparkContext('local','foobar')
sql = SQLContext(sc)

# Read raw text to RDD
lines=sc.textFile('foo.log')
records=lines.map(lambda x: x.replace('"', '').split(","))

print 'Records pre-transform:\n'
print records.take(100)
print '------------------------------\n'

def processRecord(record, col_list):    
    csv_row=[]
    for idx, val in enumerate(record):
        key, value = val.split('=')        
        if(key==col_list[idx]):
            # print 'Col name match'
            # print value
            csv_row.append(value)
        else:
            csv_row.append(None)
            print 'Key-to-Column Mismatch, dropping value.'
    print csv_row
    global l_of_l
    l_of_l.append(csv_row)

l_of_l=[]
colList=['A', 'B', 'C']
records.foreach(lambda x: processRecord(x, col_list=colList))

print 'Final list of lists:\n'
print l_of_l

输出:

代码语言:javascript
复制
Records pre-transform:
[[u'A=foo', u'B=bar', u'C=baz'], [u'A=oof', u'B=rab', u'C=zab'], [u'A=aaa', u'B=bbb', u'C=zzz']]
------------------------------

[u'foo', u'bar', u'baz']
[u'oof', u'rab', u'zab']
[u'aaa', u'bbb', u'zzz']

Final list of lists:
[]
EN

Stack Overflow用户

回答已采纳

发布于 2017-08-02 16:49:08

尝试以下功能:

代码语言:javascript
复制
def processRecord(record, col_list):    
    csv_row=list()
    for idx, val in enumerate(record):
        key, value = val.split('=')        
        if(key==col_list[idx]):
            # print 'Col name match'
            # print value
            csv_row.append(value)
        else:
            csv_row.append(None)
            # print 'Key-to-Column Mismatch, dropping value.'
    return csv_row

然后

代码语言:javascript
复制
colList=['A', 'B', 'C']
l_of_l = records.map(lambda x: processRecord(x, col_list=colList)).collect()

print 'Final list of lists:\n'
print l_of_l

应给予

代码语言:javascript
复制
Final list of lists: 
[[u'foo', u'bar', u'baz'], [u'oof', u'rab', u'zab'], [u'aaa', u'bbb', u'zzz']]
票数 1
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45465322

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档