完全流式XML解析器?

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (88)

我正在尝试使用请求lxmlbase64io来使用Exchange GetAttachment webservice 。此服务在SOAP XML HTTP响应中返回base64编码的文件。文件内容包含在单个XML元素中的单行中。只是一个例子,但问题更为笼统。GetAttachment

我想将解码后的文件内容直接流式传输到磁盘,而不会在任何时候将附件的全部内容存储在内存中,因为附件可能是几百MB。

我尝试过这样的事情:

r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
with open('foo.txt', 'wb') as f:
    for action, elem in lxml.etree.iterparse(GzipFile(fileobj=r.raw)):
    if elem.tag == 't:Content':
        b64_encoder = Base64IO(BytesIO(elem.text))
        f.write(b64_encoder.read())

lxml仍然存储附件的副本elem.text。有没有什么办法可以创建一个完全流式XML解析器,它也可以直接从输入流中流式传输元素的内容?

提问于
用户回答回答于

iterparse在这种情况下不要使用。该iterparse()方法只能发出元素开始和结束事件,因此在找到结束XML标记时会向您提供元素中的任何文本

而是使用SAX解析器接口。这是XML解析库的通用标准,用于将解析后的数据传递给内容处理程序。该ContentHandler.characters()回调是块(假设执行XML库实际上使这种可能性使用)通过字符数据。这是来自ElementTree API的低级API,并且Python标准库已经捆绑了Expat解析器来驱动它。

那么流程就变成了:

  • 将传入的请求流包装起来GzipFile以便于解压缩。或者,更好的是,response.raw.decode_content = True根据服务器设置的内容编码,将解压缩设置并保留到请求库。
  • GzipFile实例或原始流传递给使用创建的解析器的.parse()方法xml.sax.make_parser()。解析器然后继续以块的形式从流中读取。通过make_parser()首先使用您可以启用名称空间处理等功能(如果Exchange决定更改用于每个名称空间的短前缀,则可确保代码不会中断)。
  • characters()使用大量XML数据调用内容处理程序方法; 检查正确的元素启动事件,以便知道何时期望base64数据。您可以一次解码(多个)4个字符的块中的 base64数据,并将其写入文件。我不会base64io在这里使用,只是做你自己的分块。

简单的内容处理程序可以是:

from xml.sax import handler
from base64 import b64decode

class AttachmentContentHandler(handler.ContentHandler):
    types_ns = 'http://schemas.microsoft.com/exchange/services/2006/types'

    def __init__(self, filename):
        self.filename = filename

    def startDocument(self):
        self._buffer = None
        self._file = None

    def startElementNS(self, name, *args):
        if name == (self.types_ns, 'Content'):
            # we can expect base64 data next
            self._file = open(self.filename, 'wb')
            self._buffer = []

    def endElementNS(self, name, *args):
        if name == (self.types_ns, 'Content'):
            # all attachment data received, close the file
            try:
                if self._buffer:
                    raise ValueError("Incomplete Base64 data")
            finally:
                self._file.close()
                self._file = self._buffer = None

    def characters(self, data):
        if self._buffer is None:
            return
        self._buffer.append(data)
        self._decode_buffer()

    def _decode_buffer(self):
        remainder = ''
        for data in self._buffer:
            available = len(remainder) + len(data)
            overflow = available % 4
            if remainder:
                data = (remainder + data)
                remainder = ''
            if overflow:
                remainder, data = data[-overflow:], data[:-overflow]
            if data:
                self._file.write(b64decode(data))
        self._buffer = [remainder] if remainder else []

你会像这样使用它:

import requests
from xml.sax import make_parser, handler

parser = make_parser()
parser.setFeature(handler.feature_namespaces, True)
parser.setContentHandler(AttachmentContentHandler('foo.txt'))

r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
r.raw.decode_content = True  # if content-encoding is used, decompress as we read
parser.parse(r.raw)

这将以高达64KB(默认IncrementalParser缓冲区大小)的块来解析输入XML ,因此附件数据在最多48KB的原始数据块中被解码。

我可能会扩展内容处理程序以获取目标目录,然后查找<t:Name>提取文件名的元素,然后使用它将数据提取到找到的每个附件的正确文件名。您还需要验证您实际上是在处理GetAttachmentResponse文档,并处理错误响应。

扫码关注云+社区

领取腾讯云代金券