490 lines (416 sloc) 16.611 kb
# -*- coding: utf8 -*- | |
---|---|
import sys | |
import math | |
#---------------------------------------- | |
debug_flag = False # for debug info ouput | |
disp_flag = False # for disp info ouput | |
# debug info output controller | |
def debug(*args): | |
if debug_flag == True: | |
print '----------------------------------------' | |
print ' '.join(str(item) for item in args) | |
print '\n' | |
# dislay info output controller | |
def disp(*args): | |
if disp_flag == True: | |
print '----------------------------------------' | |
print ' '.join(str(item) for item in args) | |
print '\n' | |
#---------------------------------------- | |
class JsonParser(dict): | |
def __init__(self): | |
self.dict = {} | |
self.string = '' | |
self.idx = 0 | |
def load(self, string): | |
self.string = string | |
self.idx = 0 | |
disp('load from string: ', string) | |
self.dict = self.parseObject() | |
disp('load into dict: ', self.dict) | |
def loadJson(self, f): | |
try: | |
fd = open(f, 'r') | |
try: | |
lines = fd.read().decode('utf8') | |
self.load(lines) | |
finally: | |
fd.close() | |
except IOError: | |
debug("Unexpected error: ", sys.exc_info()[0]) | |
raise | |
#---------------------------------------- | |
def parseObject(self): | |
token = self.nextToken() # for { | |
if token != JSON_TOKEN.LEFT_BRACE: | |
raise JsonParseError(u'error occurs in object parsing') | |
json_dict = {} | |
while(True): | |
token = self.getToken() | |
#disp('cur token: ', token) | |
if token == JSON_TOKEN.COMMA: # for , | |
self.nextToken() | |
elif token == JSON_TOKEN.RIGHT_BRACE: # for } | |
self.nextToken() | |
return json_dict | |
elif token == JSON_TOKEN.NONE: | |
raise JsonParseError(u'error occurs in object parsing') | |
else: | |
string = self.parseString() | |
debug('string: ', string) | |
token = self.nextToken() | |
if token != JSON_TOKEN.COLON: | |
raise JsonParseError(u'error occurs in object parsing') | |
value = self.parseValue() | |
debug('value: ', value) | |
json_dict[string] = value | |
def parseArray(self): | |
token = self.nextToken() # skip [ | |
if token != JSON_TOKEN.LEFT_BRACKET: | |
raise JsonParseError(u'error occurs in array parsing') | |
json_list = [] | |
while(True): | |
token = self.getToken() | |
if token == JSON_TOKEN.COMMA: | |
self.nextToken() | |
elif token == JSON_TOKEN.RIGHT_BRACKET: | |
debug('array parsing stops') | |
self.nextToken() | |
return json_list | |
elif token == JSON_TOKEN.NONE: | |
raise JsonParseError(u'error occurs in array parsing') | |
else: | |
value = self.parseValue() | |
json_list.append(value) | |
def parseNumber(self): | |
self.handleIndent() | |
token = self.getToken() | |
if token != JSON_TOKEN.NUMBER: | |
raise JsonParseError(u'error occurs in number parsing') | |
integer_list = [] | |
decimal_list = [] | |
exponent_list = [] | |
# integer part | |
if self.string[self.idx] == u'+' or self.string[self.idx] == u'-': # for + or - | |
integer_list.append(self.string[self.idx]) | |
self.idx += 1 | |
if self.string[self.idx] >= u'0' and self.string[self.idx] <= u'9': | |
while self.string[self.idx] >= u'0' and self.string[self.idx] <= u'9': | |
integer_list.append(self.string[self.idx]) | |
self.idx += 1 | |
else: | |
raise JsonParseError(u'error occurs in number parsing') | |
debug('integer_part: ', u''.join(integer_list)) | |
if self.string[self.idx] != u'.' and self.string[self.idx] != u'e' and self.string[self.idx] != u'E': # if not ., e and E | |
ret = int(u''.join(integer_list)) | |
return ret | |
# decimal part | |
if self.string[self.idx] == u'.': | |
decimal_list.append(self.string[self.idx]) | |
self.idx += 1 | |
while self.string[self.idx] >= u'0' and self.string[self.idx] <= u'9': | |
decimal_list.append(self.string[self.idx]) | |
self.idx += 1 | |
debug('decimal_list: ', u''.join(decimal_list)) | |
# exponent part | |
if self.string[self.idx] != u'e' and self.string[self.idx] != u'E': | |
ret = float(u''.join(integer_list) + u''.join(decimal_list)) | |
else: | |
exponent_list.append(self.string[self.idx]) # for e or E | |
self.idx += 1 | |
if self.string[self.idx] == u'+' or self.string[self.idx] == u'-': # for + or - | |
exponent_list.append(self.string[self.idx]) | |
self.idx += 1 | |
while self.string[self.idx] >= u'0' and self.string[self.idx] <= u'9': | |
exponent_list.append(self.string[self.idx]) | |
self.idx += 1 | |
debug('exponent_part: ', u''.join(exponent_list)) | |
ret = float(u''.join(integer_list) + u''.join(decimal_list) + u''.join(exponent_list)) | |
debug('number parsing result: ', ret) | |
return ret | |
def parseString(self): | |
length = len(self.string) | |
self.handleIndent() | |
if self.idx >= length: | |
raise JsonParseError(u'error occurs in string parsing') | |
if self.string[self.idx] != u'\"': | |
raise JsonParseError(u'error occurs in string parsing') | |
self.idx += 1 # skip " | |
string_list = [] | |
while(self.idx < length): | |
ch = self.string[self.idx] | |
if ch == u'\"': | |
self.idx += 1 | |
disp('string parsing result: ', u''.join(string_list)) | |
return u''.join(string_list) | |
elif ch == u'\\': | |
if self.idx + 1 >= length: | |
raise JsonParseError(u'error occurs in string parsing') | |
self.idx += 1 | |
tmpch = self.string[self.idx] | |
if tmpch == u'\"': | |
string_list.append(u'\"') | |
elif tmpch == u'\\': | |
string_list.append(u'\\') | |
elif tmpch == u'/': | |
string_list.append(u'/') | |
elif tmpch == u'b': | |
string_list.append(u'\b') | |
elif tmpch == u'f': | |
string_list.append(u'\f') | |
elif tmpch == u'n': | |
string_list.append(u'\n') | |
elif tmpch == u'r': | |
string_list.append(u'\r') | |
elif tmpch == u't': | |
string_list.append(u'\t') | |
elif tmpch == u'u': | |
if self.idx + 5 >= length: | |
raise JsonParseError(u'error occurs in string parsing') | |
hex_digits_string = self.string[self.idx+1 : self.idx+5] | |
self.idx += 4 | |
string_list.append(unichr(int(hex_digits_string, 16))) | |
else: | |
string_list.append(self.string[self.idx]) | |
self.idx += 1 | |
def parseValue(self): | |
token = self.getToken() | |
if token == JSON_TOKEN.DOUBLE_QUOTE: | |
return self.parseString() | |
elif token == JSON_TOKEN.NUMBER: | |
return self.parseNumber() | |
elif token == JSON_TOKEN.LEFT_BRACE: | |
return self.parseObject() | |
elif token == JSON_TOKEN.LEFT_BRACKET: | |
return self.parseArray() | |
elif token == JSON_TOKEN.TRUE: | |
self.nextToken() | |
return True | |
elif token == JSON_TOKEN.FALSE: | |
self.nextToken() | |
return False | |
elif token == JSON_TOKEN.NULL: | |
self.nextToken() | |
return None | |
else: | |
raise JsonParseError(u'error occurs in string parsing') | |
def handleIndent(self): | |
length = len(self.string) | |
while self.idx < length and (self.string[self.idx] == u' ' or self.string[self.idx] == u'\n' or self.string[self.idx] == u'\r'\ | |
or self.string[self.idx] == u'\t'or self.string[self.idx] == u'\b'): | |
self.idx += 1 | |
def getToken(self): | |
return self.handleToken(False) | |
def nextToken(self): | |
return self.handleToken(True) | |
def getResult(self, token, tidx, change_flag): # result dispatch function | |
if change_flag: | |
self.idx = tidx | |
return token | |
def handleToken(self, change_flag): | |
length = len(self.string) | |
self.handleIndent() | |
tidx = self.idx | |
# end of string | |
if tidx >= length: | |
return self.getResult(JSON_TOKEN.NONE, tidx, change_flag) | |
# parse charachters | |
ch = self.string[tidx] | |
if ch == u'{': | |
tidx += 1 | |
return self.getResult(JSON_TOKEN.LEFT_BRACE, tidx, change_flag) | |
elif ch == u'}': | |
tidx += 1 | |
return self.getResult(JSON_TOKEN.RIGHT_BRACE, tidx, change_flag) | |
elif ch == u',': | |
tidx += 1 | |
return self.getResult(JSON_TOKEN.COMMA, tidx, change_flag) | |
elif ch == u':': | |
tidx += 1 | |
return self.getResult(JSON_TOKEN.COLON, tidx, change_flag) | |
elif ch == u'[': | |
tidx += 1 | |
return self.getResult(JSON_TOKEN.LEFT_BRACKET, tidx, change_flag) | |
elif ch == u']': | |
tidx += 1 | |
return self.getResult(JSON_TOKEN.RIGHT_BRACKET, tidx, change_flag) | |
elif ch == u'\"': | |
tidx += 1 | |
return self.getResult(JSON_TOKEN.DOUBLE_QUOTE, tidx, change_flag) | |
elif ch == u'\\': | |
tidx += 1 | |
return self.getResult(JSON_TOKEN.BACKSLASH, tidx, change_flag) | |
elif ch >= u'0' and ch <= u'9' or ch == u'+' or ch == u'-': | |
tidx += 1 | |
return self.getResult(JSON_TOKEN.NUMBER, tidx, change_flag) | |
# prase true, false and null | |
if tidx + 4 <= length and self.string[tidx:tidx+4] == u'true': | |
tidx += 4 | |
return self.getResult(JSON_TOKEN.TRUE, tidx, change_flag) | |
elif tidx + 5 <= length and self.string[tidx:tidx+5] == u'false': | |
tidx += 5 | |
return self.getResult(JSON_TOKEN.FALSE, tidx, change_flag) | |
elif tidx + 4 <= length and self.string[tidx:tidx+4] == u'null': | |
tidx += 4 | |
return self.getResult(JSON_TOKEN.NULL, tidx, change_flag) | |
disp('this is none: ', self.string[tidx]) | |
return self.getResult(JSON_TOKEN.NONE, tidx, change_flag) | |
#---------------------------------------- | |
def dump(self): | |
disp('cur dict: ', self.dict) | |
return self.dumpObject(self.dict) | |
def dumpJson(self, f): | |
try: | |
fd = open(f, 'w') | |
try: | |
fd.write(self.dump().encode('utf8')) | |
finally: | |
fd.close() | |
except IOError: | |
debug("Unexpected error: ", sys.exc_info()[0]) | |
raise | |
def loadDict(self, d): | |
self.dict = d | |
def dumpDict(self): | |
return self.dict | |
def dumpObject(self, py_dict): | |
py_list = [] | |
py_list.append(u'{') | |
for(string, value) in py_dict.items(): | |
if isinstance(string, str) or isinstance(string, unicode): | |
py_list.append(self.dumpString(string)) | |
py_list.append(u':') | |
py_list.append(self.dumpValue(value)) | |
debug('object append: ', py_list) | |
py_list.append(u',') | |
if len(py_list) > 1: | |
py_list.pop() | |
py_list.append(u'}') | |
debug('dump ojbect: ', py_list) | |
return u''.join(py_list) | |
def dumpArray(self, py_array): | |
py_list = [] | |
py_list.append(u'[') | |
for item in py_array: | |
py_list.append(self.dumpValue(item)) | |
py_list.append(u',') | |
if len(py_list) > 1: | |
py_list.pop() | |
py_list.append(u']') | |
debug('dump array: ', py_list) | |
return u''.join(py_list) | |
def dumpString(self, py_string): | |
py_list = [] | |
py_list.append(u'\"') | |
for ch in py_string: | |
if self.isAscii(ch) == False: | |
py_list.append(u"\\u%04x" % ord(ch)) | |
elif ch == u'/': | |
py_list.append(u'\\/') | |
elif ch == u'\"': | |
py_list.append(u'\\\"') | |
elif ch == u'\\': | |
py_list.append(u'\\\\') | |
elif ch == u'\b': | |
py_list.append(u'\\b') | |
elif ch == u'\f': | |
py_list.append(u'\\f') | |
elif ch == u'\n': | |
py_list.append(u'\\n') | |
elif ch == u'\r': | |
py_list.append(u'\\r') | |
elif ch == u'\t': | |
py_list.append(u'\\t') | |
elif ch == u'\t': | |
py_list.append(u'\\t') | |
else: | |
py_list.append(ch) | |
py_list.append(u'\"') | |
return u''.join(py_list) | |
def isAscii(self, ch): | |
return ord(ch) < 128 | |
def dumpNumber(self, py_number): | |
return str(py_number) | |
def dumpValue(self, py_value): | |
py_list = [] | |
if isinstance(py_value, str) or isinstance(py_value, unicode): | |
py_list.append(self.dumpString(py_value)) | |
elif isinstance(py_value, bool) and py_value == True: | |
py_list.append(u'true') | |
elif isinstance(py_value, bool) and py_value == False: | |
py_list.append(u'false') | |
elif py_value == None: | |
py_list.append(u'null') | |
elif isinstance(py_value, int) or isinstance(py_value, long) or isinstance(py_value, float): | |
py_list.append(self.dumpNumber(py_value)) | |
elif isinstance(py_value, dict): | |
py_list.append(self.dumpObject(py_value)) | |
elif isinstance(py_value, list): | |
py_list.append(self.dumpArray(py_value)) | |
else: | |
raise JsonParseError(u'error occurs in value dumping') | |
return u''.join(py_list) | |
#---------------------------------------- | |
def __init__(self, *args, **kwargs): | |
self.dict = {} | |
self.string = '' | |
self.idx = 0 | |
self.dict.update(*args, **kwargs) | |
def __getitem__(self, key): | |
return self.dict.__getitem__(key) | |
def __setitem__(self, key, value): | |
return self.dict.__setitem__(key, value) | |
def __delitem__(self, key): | |
return self.dict.__delitem__(key) | |
def __contains__(self, key): | |
return self.dict.__contains__(key) | |
def __iter__(self): | |
return iter(self.dict) | |
def __len__(self): | |
return len(self.dict) | |
def update(self, *args, **dict_args): | |
self.dict.update(*args, **dict_args) | |
class JSON_TOKEN: | |
LEFT_BRACE = 0 #{ | |
RIGHT_BRACE = 1 #} | |
COMMA = 2 #, | |
COLON = 3 #: | |
LEFT_BRACKET = 4 #[ | |
RIGHT_BRACKET = 5 #] | |
DOUBLE_QUOTE = 6 #" | |
NUMBER = 7 #0-9, +(positive) and - (negative) | |
TRUE = 8 #True | |
FALSE = 9 #False | |
NULL = 10 #Null | |
NONE = 11 #others | |
class JsonParseError(Exception): | |
def __init__(self, value): | |
self.value = value | |
def __str__(self): | |
return repr(self.value) | |
def cmp_dict(dict1, dict2): | |
if dict1 == None and dict2 == None: | |
return True | |
elif dict1 == None or dict2 == None: | |
return False | |
if isinstance(dict1, dict) == False or isinstance(dict2, dict) == False or len(dict1.keys()) != len(dict2.keys()): | |
return False | |
for key in dict1.keys(): | |
if isinstance(key, dict) == False: | |
return dict1[key] == dict2[key] | |
else: | |
return cmp_dict(dict1[key], dict2[key]) | |
if __name__ == '__main__': | |
#---------------------------------------- | |
# test code 2 | |
load_file_path = './test/json_test_cases.txt' | |
dump_file_path = './test/json_dump_file.txt' | |
a1 = JsonParser() | |
a1.loadJson(load_file_path) | |
d1 = a1.dumpDict() | |
disp('parsed dict: ', d1) | |
a1.dumpJson(dump_file_path) | |
a1.loadJson(dump_file_path) | |
d2 = a1.dumpDict() | |
disp('parsed dict: ', d2) | |
disp('cmp result:' , cmp_dict(d1, d2)) |