Streams- – 异步I / O(Python教程)(参考资料)
流
Streams是高级异步/等待就绪原语,用于处理网络连接。Streams允许发送和接收数据而不使用回调或低级协议和传输.
以下是使用asynciostreams编写的TCP echo客户端示例:
import asyncioasync def tcp_echo_client(message): reader, writer = await asyncio.open_connection( "127.0.0.1", 8888) print(f"Send: {message!r}") writer.write(message.encode()) data = await reader.read(100) print(f"Received: {data.decode()!r}") print("Close the connection") writer.close() await writer.wait_closed()asyncio.run(tcp_echo_client("Hello World!"))
另请参阅下面的示例部分.
流函数
以下顶级asyncio函数可用于创建和使用流:
- coroutine
asyncio.
open_connection
(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None) -
建立网络连接并返回一对
(reader, writer)
objects.返回reader和writer对象是
StreamReader
和StreamWriter
classes.loop参数是可选的,当从协程等待这个函数时,总是可以自动确定.
limit确定使用的缓冲区大小限制
StreamReader
实例。默认情况下limit设置为64 KiB.其余的参数直接传递给
loop.create_connection()
.版本3.7中的新功能:ssl_handshake_timeout参数.
- coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True) -
启动套接字服务器.
每当建立新的客户端连接时,都会调用client_connected_cb回调。它收到了
(reader, writer)
pairas两个参数,StreamReader
和StreamWriter
classes.client_connected_cb可以是一个简单的可调用或协程功能;如果它是一个协程功能,它将自动安排为
Task
.loop参数是可选的,并且当从协程中等待这个方法时,总是可以自动确定.
limit确定被使用的缓冲区大小限制
StreamReader
实例。默认情况下limit设置为64 KiB.其余的参数直接传递给
loop.create_server()
.版本3.7中的新功能:ssl_handshake_timeout和start_servingparameters.
Unix套接字
- coroutine
asyncio.
open_unix_connection
(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None) -
建立一个Unix套接字连接并返回一对
(reader, writer)
.相近
open_connection()
但是在Unix套接字上运行.另见
loop.create_unix_connection()
.可用性:Unix .
版本3.7中新增: ssl_handshake_timeout参数
更改版本3.7: path参数现在可以是类似路径的对象
- coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True) -
启动一个Unix套接字服务器.
类似于
start_server()
但适用于Unix套接字.参见
loop.create_unix_server()
.文档//可用性:Unix.
新版本3.7: ssl_handshake_timeout和start_serving参数
更改版本3.7: path参数现在可以是路径类对象.
StreamReader
- class
asyncio.
StreamReader
-
表示读者对象提供API从IO流中读取数据.
不建议直接实例化StreamReader对象;使用
open_connection()
和start_server()
instead.- coroutine
read
(n=-1) -
读到n字节。如果没有提供n,或设置为
-1
读到EOF并返回所有读取的字节.如果收到EOF并且内部缓冲区为空,则返回一个空的
bytes
object.
- coroutine
readline
() -
读一行,其中“line”是一个字节发送序列
\n
.如果收到EOF并且找不到
\n
,则该方法返回部分读取数据.如果收到EOF并且内部缓冲区为空,则返回一个空的
bytes
object.
- coroutine
readexactly
(n) -
准确地说nbytes
//如果在
IncompleteReadError
之前达到EOF,则调出n。使用IncompleteReadError.partial
属性获取部分读取的数据.
- coroutine
readuntil
(separator=b”\n”) -
从流中读取数据,直到找到separator.
成功后,数据和分隔符将从内部缓冲区中删除(消耗)。返回的数据最后将包括这些参与者.
如果读取的数据量超过配置的流限制,则会引发
LimitOverrunError
异常,并将数据保留在内部缓冲区中并可以读取再次如果在找到完整的分隔符之前达到EOF,则会引发
IncompleteReadError
异常,并重置内部缓冲区。IncompleteReadError.partial
属性可以包含分隔符的一部分.新版本3.5.2.
at_eof
()-
返回
True
如果缓冲区是空的并且feed_eof()
被调用的话
- coroutine
StreamWriter
- class
asyncio.
StreamWriter
-
表示一个提供API的编写器对象,用于写入IO流的数据.
不建议实例化StreamWriter objectsdirectly;使用
open_connection()
和start_server()
instead.can_write_eof
()-
返回True如果基础传输支持
write_eof()
方法,False否则
write_eof
// ()-
关闭流后的写入结束缓冲的writedata是冲的.
transport
-
返回底层的asyncio transport.
get_extra_info
(name, default=None)-
获取可选的运输信息;看
BaseTransport.get_extra_info()
了解详情.
writelines
(data)-
写一个列表(或任何可迭代的)字节到流.
该方法不受流量控制。调用
writelines()
之后应该drain()
.
- coroutine
drain
() -
等到适合继续写入流中。例如:
writer.write(data)await writer.drain()
这是一种与undersIO写缓冲区交互的流控制方法。当缓冲区的大小达到高水印时,drain()阻塞直到缓冲区的大小耗尽到低水位线并且可以恢复写入。当没有什么可以等待时,
drain()
立即返回.
close
( )-
关闭溪流
is_closing
()-
返回
True
如果溪流关闭或正在关闭的过程中版本3.7.
例子
使用流的TCP echo客户端
TCP echo客户端使用asyncio.open_connection()
function:
import asyncioasync def tcp_echo_client(message): reader, writer = await asyncio.open_connection( "127.0.0.1", 8888) print(f"Send: {message!r}") writer.write(message.encode()) data = await reader.read(100) print(f"Received: {data.decode()!r}") print("Close the connection") writer.close()asyncio.run(tcp_echo_client("Hello World!"))
参见
TCP echo客户端协议示例使用低级loop.create_connection()
方法
TCP回送服务器使用流
TCP回显服务器使用asyncio.start_server()
功能:
import asyncioasync def handle_echo(reader, writer): data = await reader.read(100) message = data.decode() addr = writer.get_extra_info("peername") print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") writer.write(data) await writer.drain() print("Close the connection") writer.close()async def main(): server = await asyncio.start_server( handle_echo, "127.0.0.1", 8888) addr = server.sockets[0].getsockname() print(f"Serving on {addr}") async with server: await server.serve_forever()asyncio.run(main())
获取HTTP头
简单示例查询命令行上传递的URL的HTTP头:
import asyncioimport urllib.parseimport sysasync def print_http_headers(url): url = urllib.parse.urlsplit(url) if url.scheme == "https": reader, writer = await asyncio.open_connection( url.hostname, 443, ssl=True) else: reader, writer = await asyncio.open_connection( url.hostname, 80) query = ( f"HEAD {url.path or "/"} HTTP/1.0\r\n" f"Host: {url.hostname}\r\n" f"\r\n" ) writer.write(query.encode("latin-1")) while True: line = await reader.readline() if not line: break line = line.decode("latin1").rstrip() if line: print(f"HTTP header> {line}") # Ignore the body, close the socket writer.close()url = sys.argv[1]asyncio.run(print_http_headers(url))
用法:
python example.py http://example.com/path/page.html
或使用HTTPS:
python example.py https://example.com/path/page.html
注册一个打开的套接字以等待使用流的数据
使用open_connection()
函数
import asyncioimport socketasync def wait_for_data(): # Get a reference to the current event loop because # we want to access low-level APIs. loop = asyncio.get_running_loop() # Create a pair of connected sockets. rsock, wsock = socket.socketpair() # Register the open socket to wait for data. reader, writer = await asyncio.open_connection(sock=rsock) # Simulate the reception of data from the network loop.call_soon(wsock.send, "abc".encode()) # Wait for data data = await reader.read(100) # Got data, we are done: close the socket print("Received:", data.decode()) writer.close() # Close the second socket wsock.close()asyncio.run(wait_for_data())
等待套接字接收数据的协同程序还
使用协议注册一个打开的套接字等待数据示例使用低级协议和loop.create_connection()
方法。
观察读取事件的文件描述符示例使用低级loop.add_reader()
方法来查看文件描述符.
评论被关闭。