我想用Python从一个文件/流中读取多个JSON对象,一次一个。不幸的是,在文件结束之前,json.load()
只能使用.read()
s;似乎没有任何方法可以使用它来读取单个对象或懒惰地遍历对象。
有没有办法做到这一点?使用标准库将是理想的,但如果有第三方库,我会使用它。
目前,我将每个对象放在单独的一行上,并使用json.loads(f.readline())
,但我真的不希望需要这样做。
示例用法
example.py
import my_json as json
import sys
for o in json.iterload(sys.stdin):
print("Working on a", type(o))
in.txt
{"foo": ["bar", "baz"]} 1 2 [] 4 5 6
示例会话
$ python3.2 example.py < in.txt
Working on a dict
Working on a int
Working on a int
Working on a list
Working on a int
Working on a int
Working on a int
发布于 2017-05-26 05:33:19
这里有一个简单得多的解决方案。秘诀在于尝试、失败并使用异常中的信息来正确解析。唯一的限制是文件必须是可查找的。
def stream_read_json(fn):
import json
start_pos = 0
with open(fn, 'r') as f:
while True:
try:
obj = json.load(f)
yield obj
return
except json.JSONDecodeError as e:
f.seek(start_pos)
json_str = f.read(e.pos)
obj = json.loads(json_str)
start_pos += e.pos
yield obj
编辑:我刚刚注意到这只适用于Python >=3.5。对于较早的情况,失败会返回一个ValueError,您必须从字符串中解析出位置,例如
def stream_read_json(fn):
import json
import re
start_pos = 0
with open(fn, 'r') as f:
while True:
try:
obj = json.load(f)
yield obj
return
except ValueError as e:
f.seek(start_pos)
end_pos = int(re.match('Extra data: line \d+ column \d+ .*\(char (\d+).*\)',
e.args[0]).groups()[0])
json_str = f.read(end_pos)
obj = json.loads(json_str)
start_pos += end_pos
yield obj
发布于 2011-07-31 08:18:08
你当然可以做到。您只需直接使用raw_decode
即可。此实现将整个文件加载到内存中并对该字符串进行操作(与json.load
非常相似);如果您有大文件,则可以将其修改为只在必要时从文件中读取,不会有太大困难。
import json
from json.decoder import WHITESPACE
def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
if isinstance(string_or_fp, file):
string = string_or_fp.read()
else:
string = str(string_or_fp)
decoder = cls(**kwargs)
idx = WHITESPACE.match(string, 0).end()
while idx < len(string):
obj, end = decoder.raw_decode(string, idx)
yield obj
idx = WHITESPACE.match(string, end).end()
用法:正如您所要求的,它是一个生成器。
发布于 2012-04-18 00:40:00
我想提供一个解决方案。关键思想是“尝试”解码:如果它失败了,给它更多的提要,否则使用偏移信息来准备下一次解码。
然而,当前的json模块不能容忍要解码的字符串头中的空格,所以我必须将它们去掉。
import sys
import json
def iterload(file):
buffer = ""
dec = json.JSONDecoder()
for line in file:
buffer = buffer.strip(" \n\r\t") + line.strip(" \n\r\t")
while(True):
try:
r = dec.raw_decode(buffer)
except:
break
yield r[0]
buffer = buffer[r[1]:].strip(" \n\r\t")
for o in iterload(sys.stdin):
print("Working on a", type(o), o)
=========================我已经测试了几个txt文件,它工作得很好。(在1.txt)
{"foo": ["bar", "baz"]
}
1 2 [
] 4
{"foo1": ["bar1", {"foo2":{"A":1, "B":3}, "DDD":4}]
}
5 6
(在2.txt)
{"foo"
: ["bar",
"baz"]
}
1 2 [
] 4 5 6
(in.txt,您的姓名首字母)
{"foo": ["bar", "baz"]} 1 2 [] 4 5 6
(本尼迪克特测试用例的输出)
python test.py < in.txt
('Working on a', <type 'list'>, [u'hello'])
('Working on a', <type 'dict'>, {u'goodbye': 1})
('Working on a', <type 'int'>, 1)
('Working on a', <type 'int'>, 2)
('Working on a', <type 'dict'>, {})
('Working on a', <type 'int'>, 2)
('Working on a', <type 'int'>, 9)
('Working on a', <type 'int'>, 78)
('Working on a', <type 'int'>, 4)
('Working on a', <type 'int'>, 5)
('Working on a', <type 'dict'>, {u'animals': [u'dog', u'lots of mice', u'cat']})
https://stackoverflow.com/questions/6886283
复制相似问题