我有一个.ndjson文件,如下所示:
{"property_id": "107", "transaction_unique_id": "{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}", "price": 80000, "date_of_transfer": "2021-05-07 00:00", "postcode": "BL2 2GY", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "14", "SAON": "", "street": "RIVER VIEW COURT", "locality": "", "town_city": "BOLTON", "district": "BOLTON", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "108", "transaction_unique_id": "{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}", "price": 330000, "date_of_transfer": "2021-02-26 00:00", "postcode": "SK6 4AN", "property_type": "S", "old_new": "N", "duration": "F", "PAON": "18", "SAON": "", "street": "GUYWOOD LANE", "locality": "ROMILEY", "town_city": "STOCKPORT", "district": "STOCKPORT", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}", "price": 215000, "date_of_transfer": "2021-02-19 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 022", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}", "price": 226500, "date_of_transfer": "2021-02-08 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 727", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}", "price": 262000, "date_of_transfer": "2021-05-14 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 025", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
...我使用Apache Beam读取数据,并按property_id对数据进行分组,然后将输出写入json文件,但数据如下所示:
('107', [PPD(property_id='107', transaction_unique_id='{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}', price=80000, date_of_transfer='2021-05-07 00:00', postcode='BL2 2GY', property_type='F', old_new='N', duration='L', PAON='14', SAON='', street='RIVER VIEW COURT', locality='', town_city='BOLTON', district='BOLTON', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
('108', [PPD(property_id='108', transaction_unique_id='{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}', price=330000, date_of_transfer='2021-02-26 00:00', postcode='SK6 4AN', property_type='S', old_new='N', duration='F', PAON='18', SAON='', street='GUYWOOD LANE', locality='ROMILEY', town_city='STOCKPORT', district='STOCKPORT', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
('109', [PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}', price=215000, date_of_transfer='2021-02-19 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 022', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A'), PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}', price=226500, date_of_transfer='2021-02-08 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 727', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A'), PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}', price=262000, date_of_transfer='2021-05-14 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 025', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
...我们可以看到,对于property_id = '109',它将三条记录组合在一起,但上面的输出格式实际上是weird...Does。知道为什么吗?我如何将其转换为换行符分隔的JSON格式,然后写入JSON文件?
预期的格式类似于(不确定这是否是有效的换行符分隔的json格式,但想法是将相同property_id (例如109)的事务包含在一个数组中):
{"property_id": "107", "transaction_unique_id": "{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}", "price": 80000, "date_of_transfer": "2021-05-07 00:00", "postcode": "BL2 2GY", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "14", "SAON": "", "street": "RIVER VIEW COURT", "locality": "", "town_city": "BOLTON", "district": "BOLTON", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "108", "transaction_unique_id": "{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}", "price": 330000, "date_of_transfer": "2021-02-26 00:00", "postcode": "SK6 4AN", "property_type": "S", "old_new": "N", "duration": "F", "PAON": "18", "SAON": "", "street": "GUYWOOD LANE", "locality": "ROMILEY", "town_city": "STOCKPORT", "district": "STOCKPORT", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transactions": [{"transaction_unique_id": "{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}", "price": 215000, "date_of_transfer": "2021-02-19 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 022", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"},{"transaction_unique_id": "{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}", "price": 226500, "date_of_transfer": "2021-02-08 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 727", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"},{"transaction_unique_id": "{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}", "price": 262000, "date_of_transfer": "2021-05-14 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 025", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}]}
...有没有人能帮上忙,我是新来的,如果能帮上忙,我会很感激。谢谢。
发布于 2021-08-16 22:36:38
我假设PPD是一个命名元组,您将获取一个PPD对象的PCollection,并将它们分组如下
grouped = (
ppd_pcoll
| beam.Map(lambda ppd: (ppd.property_id, property_id)
| beam.GroupByKey())现在grouped是一个二元组的PCollection,其中第一个是属性id字符串,第二个是可迭代的PPD(具有该属性id)。
为了得到你想要的东西,你需要把它映射到你想要的字典,输出为json,例如
to_write_to_json = (grouped
| beam.MapTuple(lambda property_id, ppds: {
'property_id': property_id,
'transactions': [ppd_to_transaction(ppd) for ppd in ppds],
})其中,ppd_to_transaction是一个函数,它接受您的PPD对象并返回一个带有所需事务属性的字典。
https://stackoverflow.com/questions/68808813
复制相似问题