首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何将数据转换为所需的格式并写入文件- Python + Apache Beam

如何将数据转换为所需的格式并写入文件- Python + Apache Beam
EN

Stack Overflow用户
提问于 2021-08-16 20:22:39
回答 1查看 117关注 0票数 0

我有一个.ndjson文件,如下所示:

代码语言:javascript
复制
{"property_id": "107", "transaction_unique_id": "{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}", "price": 80000, "date_of_transfer": "2021-05-07 00:00", "postcode": "BL2 2GY", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "14", "SAON": "", "street": "RIVER VIEW COURT", "locality": "", "town_city": "BOLTON", "district": "BOLTON", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "108", "transaction_unique_id": "{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}", "price": 330000, "date_of_transfer": "2021-02-26 00:00", "postcode": "SK6 4AN", "property_type": "S", "old_new": "N", "duration": "F", "PAON": "18", "SAON": "", "street": "GUYWOOD LANE", "locality": "ROMILEY", "town_city": "STOCKPORT", "district": "STOCKPORT", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}", "price": 215000, "date_of_transfer": "2021-02-19 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 022", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}", "price": 226500, "date_of_transfer": "2021-02-08 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 727", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}", "price": 262000, "date_of_transfer": "2021-05-14 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 025", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
...

我使用Apache Beam读取数据,并按property_id对数据进行分组,然后将输出写入json文件,但数据如下所示:

代码语言:javascript
复制
('107', [PPD(property_id='107', transaction_unique_id='{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}', price=80000, date_of_transfer='2021-05-07 00:00', postcode='BL2 2GY', property_type='F', old_new='N', duration='L', PAON='14', SAON='', street='RIVER VIEW COURT', locality='', town_city='BOLTON', district='BOLTON', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
('108', [PPD(property_id='108', transaction_unique_id='{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}', price=330000, date_of_transfer='2021-02-26 00:00', postcode='SK6 4AN', property_type='S', old_new='N', duration='F', PAON='18', SAON='', street='GUYWOOD LANE', locality='ROMILEY', town_city='STOCKPORT', district='STOCKPORT', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
('109', [PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}', price=215000, date_of_transfer='2021-02-19 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 022', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A'), PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}', price=226500, date_of_transfer='2021-02-08 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 727', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A'), PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}', price=262000, date_of_transfer='2021-05-14 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 025', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
...

我们可以看到,对于property_id = '109',它将三条记录组合在一起,但上面的输出格式实际上是weird...Does。知道为什么吗?我如何将其转换为换行符分隔的JSON格式,然后写入JSON文件?

预期的格式类似于(不确定这是否是有效的换行符分隔的json格式,但想法是将相同property_id (例如109)的事务包含在一个数组中):

代码语言:javascript
复制
{"property_id": "107", "transaction_unique_id": "{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}", "price": 80000, "date_of_transfer": "2021-05-07 00:00", "postcode": "BL2 2GY", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "14", "SAON": "", "street": "RIVER VIEW COURT", "locality": "", "town_city": "BOLTON", "district": "BOLTON", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "108", "transaction_unique_id": "{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}", "price": 330000, "date_of_transfer": "2021-02-26 00:00", "postcode": "SK6 4AN", "property_type": "S", "old_new": "N", "duration": "F", "PAON": "18", "SAON": "", "street": "GUYWOOD LANE", "locality": "ROMILEY", "town_city": "STOCKPORT", "district": "STOCKPORT", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transactions": [{"transaction_unique_id": "{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}", "price": 215000, "date_of_transfer": "2021-02-19 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 022", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"},{"transaction_unique_id": "{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}", "price": 226500, "date_of_transfer": "2021-02-08 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 727", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"},{"transaction_unique_id": "{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}", "price": 262000, "date_of_transfer": "2021-05-14 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 025", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}]}
...

有没有人能帮上忙,我是新来的,如果能帮上忙,我会很感激。谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-16 22:36:38

我假设PPD是一个命名元组,您将获取一个PPD对象的PCollection,并将它们分组如下

代码语言:javascript
复制
grouped = (
    ppd_pcoll
    | beam.Map(lambda ppd: (ppd.property_id, property_id)
    | beam.GroupByKey())

现在grouped是一个二元组的PCollection,其中第一个是属性id字符串,第二个是可迭代的PPD(具有该属性id)。

为了得到你想要的东西,你需要把它映射到你想要的字典,输出为json,例如

代码语言:javascript
复制
to_write_to_json = (grouped
 | beam.MapTuple(lambda property_id, ppds: {
      'property_id': property_id,
      'transactions': [ppd_to_transaction(ppd) for ppd in ppds],
     })

其中,ppd_to_transaction是一个函数,它接受您的PPD对象并返回一个带有所需事务属性的字典。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68808813

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档