首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从sys.stdin获取输入,非阻塞

从sys.stdin获取输入,非阻塞
EN

Stack Overflow用户
提问于 2014-02-15 00:02:55
回答 6查看 46.5K关注 0票数 24

我正在为一个竞争开发一个机器人,它通过sys.stdin接收它的输入,并使用Python的print()作为输出。我有以下几点:

代码语言:javascript
复制
import sys

def main():
    while True:
        line = sys.stdin.readline()
        parts = line.split()
        if len(parts) > 0:
            # do stuff

问题是输入是通过流输入的,并使用上面的内容阻止我打印任何东西,直到流关闭为止。我能做些什么才能让这件事成功?

EN

回答 6

Stack Overflow用户

回答已采纳

发布于 2015-09-03 18:28:27

通过关闭阻塞,您一次只能读取一个字符。因此,无法让readline()在非阻塞上下文中工作。我猜你只是想看一下按键来控制机器人。

我在Linux上使用select.select()没有运气,我创建了一种调整termios设置的方法。因此,这是特定于Linux的,但适用于我:

代码语言:javascript
复制
import atexit, termios
import sys, os
import time


old_settings=None

def init_any_key():
   global old_settings
   old_settings = termios.tcgetattr(sys.stdin)
   new_settings = termios.tcgetattr(sys.stdin)
   new_settings[3] = new_settings[3] & ~(termios.ECHO | termios.ICANON) # lflags
   new_settings[6][termios.VMIN] = 0  # cc
   new_settings[6][termios.VTIME] = 0 # cc
   termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)


@atexit.register
def term_any_key():
   global old_settings
   if old_settings:
      termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)


def any_key():
   ch_set = []
   ch = os.read(sys.stdin.fileno(), 1)
   while ch is not None and len(ch) > 0:
      ch_set.append( ord(ch[0]) )
      ch = os.read(sys.stdin.fileno(), 1)
   return ch_set


init_any_key()
while True:
   key = any_key()
   if key is not None:
      print(key)
   else:
      time.sleep(0.1)

一个更好的视窗或跨平台的答案是:非阻塞控制台输入?

票数 13
EN

Stack Overflow用户

发布于 2018-03-09 11:29:52

您可以使用选择器进行句柄I/O复用:

https://docs.python.org/3/library/selectors.html

试试这个:

代码语言:javascript
复制
#! /usr/bin/python3

import sys
import fcntl
import os
import selectors

# set sys.stdin non-blocking
orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)

# function to be called when enter is pressed
def got_keyboard_data(stdin):
    print('Keyboard input: {}'.format(stdin.read()))

# register event
m_selector = selectors.DefaultSelector()
m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)

while True:
    sys.stdout.write('Type something and hit enter: ')
    sys.stdout.flush()
    for k, mask in m_selector.select():
        callback = k.data
        callback(k.fileobj)

以上代码将保持在行上。

代码语言:javascript
复制
for k, mask in m_selector.select():

在注册事件发生之前,返回一个selector_key实例(k)和被监视事件的掩码。

在上面的例子中,我们只注册了一个事件(输入按键):

代码语言:javascript
复制
m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)

选择器键实例定义如下:

代码语言:javascript
复制
abstractmethod register(fileobj, events, data=None)

因此,寄存器方法将k.data设置为回调函数got_keyboard_data,并在按下Enter键时调用它:

代码语言:javascript
复制
callback = k.data
callback(k.fileobj)

一个更完整的示例(希望更有用)是将来自用户的stdin数据与来自网络的传入连接进行多路复用:

代码语言:javascript
复制
import selectors
import socket
import sys
import os
import fcntl

m_selector = selectors.DefaultSelector()

# set sys.stdin non-blocking
def set_input_nonblocking():
    orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
    fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)

def create_socket(port, max_conn):
    server_addr = ('localhost', port)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.setblocking(False)
    server.bind(server_addr)
    server.listen(max_conn)
    return server

def read(conn, mask):
    global GO_ON
    client_address = conn.getpeername()
    data = conn.recv(1024)
    print('Got {} from {}'.format(data, client_address))
    if not data:
         GO_ON = False

def accept(sock, mask):
    new_conn, addr = sock.accept()
    new_conn.setblocking(False)
    print('Accepting connection from {}'.format(addr))
    m_selector.register(new_conn, selectors.EVENT_READ, read)

def quit():
    global GO_ON
    print('Exiting...')
    GO_ON = False


def from_keyboard(arg1, arg2):
    line = arg1.read()
    if line == 'quit\n':
        quit()
    else:
        print('User input: {}'.format(line))

GO_ON = True
set_input_nonblocking()

# listen to port 10000, at most 10 connections
server = create_socket(10000, 10)

m_selector.register(server, selectors.EVENT_READ, accept)
m_selector.register(sys.stdin, selectors.EVENT_READ, from_keyboard)

while GO_ON:
    sys.stdout.write('>>> ')
    sys.stdout.flush()
    for k, mask in m_selector.select():
        callback = k.data
        callback(k.fileobj, mask)


# unregister events
m_selector.unregister(sys.stdin)

# close connection
server.shutdown()
server.close()

#  close select
m_selector.close()

您可以使用两个终端进行测试。第一个航站楼:

代码语言:javascript
复制
$ python3 test.py 
>>> bla

打开另一个终端并运行:

代码语言:javascript
复制
 $ nc localhost 10000
 hey!

回到第一个

代码语言:javascript
复制
>>> qwerqwer     

结果(在主终端上看到):

代码语言:javascript
复制
$ python3 test.py 
>>> bla
User input: bla

>>> Accepting connection from ('127.0.0.1', 39598)
>>> Got b'hey!\n' from ('127.0.0.1', 39598)
>>> qwerqwer     
User input: qwerqwer

>>> 
票数 9
EN

Stack Overflow用户

发布于 2014-11-06 13:45:16

代码语言:javascript
复制
#-----------------------------------------------------------------------
# Get a character from the keyboard.  If Block is True wait for input,
# else return any available character or throw an exception if none is
# available.  Ctrl+C isn't handled and continues to generate the usual
# SIGINT signal, but special keys like the arrows return the expected 
# escape sequences.
#
# This requires:
#
#    import sys, select
#
# This was tested using python 2.7 on Mac OS X.  It will work on any
# Linux system, but will likely fail on Windows due to select/stdin
# limitations.
#-----------------------------------------------------------------------

def get_char(block = True):
    if block or select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
        return sys.stdin.read(1)
    raise error('NoChar')
票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21791621

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档