如何使用io.TextIOWrapper打包流?

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (6)
  • 关注 (0)
  • 查看 (2540)

我如何打包一个开放的二进制流 - 一个Python 2文件,一个Python 3 io.BufferedReader,一个io.BytesIO - 在io.TextIOWrapper

我试图编写的代码将保持不变:

  • 运行在Python 2上。
  • 运行在Python 3上。
  • 使用从标准库生成的二进制流(即我无法控制它们的类型)
  • 使用二进制流进行双测试(即没有文件句柄,不能重新打开)。
  • io.TextIOWrapper它包装指定的流。

io.TextIOWrapper是需要的,因为标准库的其他部分都需要它的API。其他类似文件的类型存在,但不要提供正确的api。

将二进制流包装为subprocess.Popen.stdout属性:

import subprocess
import io

gnupg_subprocess = subprocess.Popen(
        ["gpg", "--version"], stdout=subprocess.PIPE)
gnupg_stdout = io.TextIOWrapper(gnupg_subprocess.stdout, encoding="utf-8")

在单元测试中,流被替换为io.BytesIO实例来控制其内容,而不触及任何子进程或文件系统。

gnupg_subprocess.stdout = io.BytesIO("Lorem ipsum".encode("utf-8"))

这在Python 3的标准库创建的流上运行得很好。但是,相同的代码在Python 2生成的流上失败:

[Python 2]
>>> type(gnupg_subprocess.stdout)
<type 'file'>
>>> gnupg_stdout = io.TextIOWrapper(gnupg_subprocess.stdout, encoding="utf-8")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'file' object has no attribute 'readable'
提问于
用户回答回答于

我也需要这个,但基于这里的线索,我确定使用Python 2的io模块是不可能的。 虽然这打破了对文件的“特殊待遇”规则,但我使用的技术是为文件(代码如下)创建一个非常薄的包装器,然后可以将其包装在io.BufferedReader中,然后可以将其传递给io .TextIOWrapper构造函数。 单元测试将是一件痛苦的事情,很显然新的代码路径不能在Python 3上测试。

顺便说一句,open()的结果可以直接传递给Python 3中的io.TextIOWrapper的原因是因为二进制模式open()实际上返回一个io.BufferedReader实例(至少在Python 3.4上, 那时我正在测试)。

import io
import six  # for six.PY2

if six.PY2:
    class _ReadableWrapper(object):
        def __init__(self, raw):
            self._raw = raw

        def readable(self):
            return True

        def writable(self):
            return False

        def seekable(self):
            return True

        def __getattr__(self, name):
            return getattr(self._raw, name)

def wrap_text(stream, *args, **kwargs):
    # Note: order important here, as 'file' doesn't exist in Python 3
    if six.PY2 and isinstance(stream, file):
        stream = io.BufferedReader(_ReadableWrapper(stream))

    return io.TextIOWrapper(stream)

热门问答

如何设置物联网通信的规则引擎转发到微信小程序?

DylanRichard

腾讯 · 产品经理 (已认证)

万物互联的时代,欢迎来到IoT的世界
推荐

正如参考文章所述,通过规则引擎述是将设备上行数据转发到服务端而不是直接到小程序,需要开发者基于API SDK用于服务端开发,并于小程序进行上下行通信。

在房间内切出app/闪退后还继续收费吗?

腾讯音视频小蔡

深圳市腾讯科技 · 软件开发工程师 (已认证)

QQ:471381568
推荐
1、切出app调用pause的时候是在房间的话,会一直收费。 2、比如A在程序闪退后,如果没有重连,90秒后A会自动退出房间。 3、扣费有疑问可以先看下运营指引:https://cloud.tencent.com/document/product/607/17448 如果对扣费还...... 展开详请

腾讯IM web端sdk 1.7版本,更新到当前最新,在哪里可以查看有什么改动呢?

使用微信小程序也可以做腾讯云的标准视频直播吗?使用微信小程序做腾讯云的视频直播和im即时通讯可行吗?

美女视频一起走向共同富裕
推荐
支持的 IM部分,集成云通信 https://cloud.tencent.com/document/product/269/36838 标准直播部分,使用live-pusher和live-player标签,然后用云直播即可 https://cloud.tencent.com/do...... 展开详请

为什么我通过控制台vnc看到的界面和用vnc viewer看的的界面不一样???

HappyLau谈云计算专注于公有云,私有云解决方案,在kubernetes,openstack,kvm,ceph,linux,shell有丰富的实战经验。
推荐

控制台页面使用的是novnc,novnc是一种基于web页面使用的vnc客户端,详情可参考https://github.com/novnc/noVNC介绍,常规的vnc viewer是图形的客户端,所使用的依赖库有所不同,因此显示也不一样。

API获取域名列表一直AuthFailure,code:4100?

zqfan

腾讯 · 高级工程师 (已认证)

推荐
推荐使用SDK进行调用:https://github.com/QcloudApi/qcloudapi-sdk-python 如果要自己写,也请参考下签名文档中python的代码示例,或者SDK中的源码:https://github.com/QcloudApi/qcloudapi-...... 展开详请

所属标签

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励