协同程序和任务 – 异步I / O(Python教程)(参考资料)
协同程序和任务
本节概述了与协同程序和任务一起使用的高级asyncio API .
协同程序
使用async / await语法声明的协同程序是编写asyncio应用程序的首选方法。例如,下面的代码片段(需要Python 3.7+)打印“hello”,等待1秒,然后打印“world”:
>>> import asyncio>>> async def main():... print("hello")... await asyncio.sleep(1)... print("world")>>> asyncio.run(main())helloworld
请注意,只是调用一个协程将不会安排它执行:
>>> main()<coroutine object main at 0x1053bb7c8>
-
等待协程。以下代码片段在等待1秒后打印“你好”,然后在等待another2秒:
import asyncioimport timeasync def say_after(delay, what): await asyncio.sleep(delay) print(what)async def main(): print(f"started at {time.strftime("%X")}") await say_after(1, "hello") await say_after(2, "world") print(f"finished at {time.strftime("%X")}")asyncio.run(main())
预期输出:
started at 17:13:52helloworldfinished at 17:13:55
-
asyncio.create_task()
与asyncio一起运行coroutines的函数Tasks
.让我们修改上面的例子并运行两个
say_after
coroutines concurrently:async def main(): task1 = asyncio.create_task( say_after(1, "hello")) task2 = asyncio.create_task( say_after(2, "world")) print(f"started at {time.strftime("%X")}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime("%X")}")
请注意,预期输出现在显示该片段比以前快1秒:
started at 17:14:32helloworldfinished at 17:14:34
Awaitables
我们说对象是等待对象如果它可以用在await
表达。许多asyncio API被设计为接受awaitables.
有三种主要类型awaitable对象: coroutines , 任务,和期货.
Python协程是awaitables,因此可以从其他协程中等待:
import asyncioasync def nested(): return 42async def main(): # Nothing happens if we just call "nested()". # A coroutine object is created but not awaited, # so it *won"t run at all*. nested() # Let"s do it differently now and await it: print(await nested()) # will print "42".asyncio.run(main())
重要
在本文档中,术语“coroutine”可以用于两个密切相关的概念:
- 一个 coroutine function:
async def
功能; - 一个 coroutine object:通过调用coroutine function.
Tasks
当一个协程包装使用Task这样的函数进入asyncio.create_task()
自动调度即可快速运行:
import asyncioasync def nested(): return 42async def main(): # Schedule nested() to run soon concurrently # with "main()". task = asyncio.create_task(nested()) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await taskasyncio.run(main())
期货
A Future
是特殊的低级等待对象表示异步操作的最终结果
当一个Future对象是awaited这意味着协程将一直等到Future在其他地方被解析.
需要asyncio中的未来对象以允许基于回调的代码与async / await一起使用.
通常没有必要在应用程序级代码创建Future对象.
可以等待有时会被库和一些asyncioAPI暴露的未来对象:
async def main(): await function_that_returns_a_future_object() # this is also valid: await asyncio.gather( function_that_returns_a_future_object(), some_python_coroutine() )
返回Future对象的低级函数的一个很好的例子loop.run_in_executor()
.
运行asyncio程序
创建任务
asyncio.
create_task
(coro)-
包裹coro 协程进入
Task
并安排执行。返回Task对象.任务在
get_running_loop()
,RuntimeError
返回的循环中执行,如果没有正在运行的循环当前线程则被引发这个函数已经添加到Python 3.7 。在PHP 3.7之前,低级
asyncio.ensure_future()
功能可以用来代替:async def coro(): ...# In Python 3.7+task = asyncio.create_task(coro())...# This works in all Python versions but is less readabletask = asyncio.ensure_future(coro())...
版本3.7.
睡眠
- coroutine
asyncio.
sleep
(delay, result=None, *, loop=None) -
阻挡delayseconds.
如果result提供,当协程完成时它返回给调用者.
sleep()
总是暂停当前任务,允许其他任务运行loop参数已弃用并计划在Python 3.10.中删除
协程示例每秒显示当前日期5秒:
import asyncioimport datetimeasync def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1)asyncio.run(display_date())
同时运行任务
- awaitable
asyncio.
gather
(*aws, loop=None, return_exceptions=False) -
在序列中运行等待对象aws concurrently.
如果在aws中任何等待的是协程,它会被自动调度为任务.
如果所有等待都成功完成,则结果是返回值的anaggregate列表。结果值的顺序与中的等待顺序相对应aws.
如果return_exceptions是
False
(默认),firstraised异常立即传播到gather()
上的任务。aws序列中的其他等待将不会被取消并将继续运行.如果return_exceptions是
True
,异常被视为成功结果,并在结果列表中汇总.如果
gather()
是cancelled,所有提交的等待(尚未完成)也是cancelled.如果aws序列中的任何任务或未来是cancelled,它就好像它被提升了
CancelledError
–gather()
来电是不在这种情况下取消。这是为了防止一个提交的任务/未来的取消导致其他任务/期货被取消.示例:
import asyncioasync def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}")async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), )asyncio.run(main())# Expected output:## Task A: Compute factorial(2)...# Task B: Compute factorial(2)...# Task C: Compute factorial(2)...# Task A: factorial(2) = 2# Task B: Compute factorial(3)...# Task C: Compute factorial(3)...# Task B: factorial(3) = 6# Task C: Compute factorial(4)...# Task C: factorial(4) = 24
更改版本3.7:如果gather本身被取消,取消传播无论return_exceptions.
屏蔽取消
- awaitable
asyncio.
shield
(aw, *, loop=None) -
保护等待对象来自
cancelled
.如果aw是一个协程,它被自动安排为一个任务.
声明:
res = await shield(something())
相当于:
res = await something()
except如果包含它的协程被取消,那么运行的任务
something()
没有取消。从something()
的角度来看,取消没有发生。虽然它的来电仍然被取消,所以“等待”的表达仍然提出了CancelledError
.如果
something()
被其他方式取消(即从内部取消)也会取消shield()
.如果希望完全忽略取消(不推荐)
shield()
功能应与try / exceptclause结合使用,如下所示:try: res = await shield(something())except CancelledError: res = None
超时
- coroutine
asyncio.
wait_for
(aw, timeout, *, loop=None) -
等待aw 等待完成超时.
如果aw是一个协程,它自动被安排为一个任务.
timeout可以是
None
或者一个浮点数或第二个等号的int数。如果timeout是None
,阻止直到将来完成.如果发生超时,它会取消任务并引发
asyncio.TimeoutError
.为了避免任务
cancellation
,请将其包装在shield()
.该函数将等到将来实际取消,所以总等待时间可能超过timeout.
如果等待取消,未来aw也被取消了
loop参数已弃用并计划在Python 3.10.中删除
例:
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print("yay!")async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print("timeout!")asyncio.run(main())# Expected output:## timeout!
更改版本3.7:什么时候 aw由于超时被取消,
wait_for
等待aw被取消。以前,它立刻提起asyncio.TimeoutError
Waiting Primitives
- coroutine
asyncio.
wait
(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) -
跑等待的物品 在里面 aws同时设置并阻塞,直到return_when.
指定的条件如果在aws中任何等待是一个协程,它会被自动调度为一个任务。将协同程序对象传递给
wait()
直接被弃用,因为它导致令人困惑的行为.返回两组任务/期货:
(done, pending)
.用法:
done, pending = await asyncio.wait(aws)
loop参数已弃用并计划在Python 3.10.中删除
timeout(浮点数或整数),如果指定,可用于控制返回前等待的最大秒数.
注意此函数不会引发
asyncio.TimeoutError
在超时发生时未完成的未来或任务在第二组中简单回复.return_when表示此函数何时返回。它必须是以下常量之一:
常量 说明 FIRST_COMPLETED
当任何未完成或取消时,该功能将返回. FIRST_EXCEPTION
当任何结束时通过引发异常来返回该函数。如果没有未来会引发异常,则相当于 ALL_COMPLETED
.ALL_COMPLETED
当所有未完成或被取消时,该功能将返回. 当发生超时时
wait_for()
,wait()
不会取消未来.注意
wait()
将协同程序自动调度为任务,然后返回(done, pending)
sets中隐式创建的Task对象。因此,以下代码将无法正常工作:async def foo(): return 42coro = foo()done, pending = await asyncio.wait({coro})if coro in done: # This branch will never be run!
以下是如何修复上面的代码片段:
async def foo(): return 42task = asyncio.create_task(foo())done, pending = await asyncio.wait({task})if task in done: # Everything will work as expected now.
直接将协程对象传递给
wait()
已弃用.
从其他线程调度
asyncio.
run_coroutine_threadsafe
(coro, loop)-
提交协程到给定的事件循环。Thread-safe.
返回
concurrent.futures.Future
等待来自另一个OS线程的结果.这个函数是从一个不同的OS线程调用的循环正在运行。示例:
# Create a coroutinecoro = asyncio.sleep(1, result=3)# Submit the coroutine to a given loopfuture = asyncio.run_coroutine_threadsafe(coro, loop)# Wait for the result with an optional timeout argumentassert future.result(timeout) == 3
如果在协程中引发异常,则会通知返回的Future。它也可以用来取消事件循环中的任务:
try: result = future.result(timeout)except asyncio.TimeoutError: print("The coroutine took too long, cancelling the task...") future.cancel()except Exception as exc: print(f"The coroutine raised an exception: {exc!r}")else: print(f"The coroutine returned: {result!r}")
参见文档的并发和多线程部分.
与其他asyncio不同函数这个函数需要loop参数明确传递
新版本3.5.1.
Introspection
任务对象
- class
asyncio.
Task
(coro, *, loop=None) -
一个
Future-like
运行Python的对象 coroutine 。不是thread-safe.任务用于在事件循环中运行协同程序。如果一个协程在Future上等待,则该任务暂停执行协程并等待Future的完成。当未来是done,包裹的协程的执行恢复.
事件循环使用协作调度:事件循环一次运行任务。当一个Task等待完成aFuture时,事件循环运行其他任务,回调或执行操作.
使用高级
asyncio.create_task()
函数创建任务,或者使用低级loop.create_task()
或ensure_future()
函数。任务的手动实例化不鼓励取消正在运行的任务使用
cancel()
方法。调用它会导致Task在包裹的协同程序中抛出一个CancelledError
异常。如果在取消期间正在等待Futureobject的协程,则Future对象将被取消.cancelled()
可以用来检查任务是否被取消。如果包装的协程没有压缩True
异常并且实际上被取消,方法返回CancelledError
。asyncio.Task
继承自Future
所有的API除了Future.set_result()
和Future.set_exception()
.任务支持
contextvars
模块。创建Taskis时,它会复制当前上下文,然后在复制的上下文中运行其coroutine .更改版本3.7:添加了对
contextvars
模块的支持cancel
// ()-
请求取消任务。
这个安排
CancelledError
在事件循环的下一个循环中被抛到包装的协程的异常.协程有机会通过用
try
……except CancelledError
…finally
因此,不像Future.cancel()
,Task.cancel()
并不保证任务将被取消,虽然完全抑制取消并不常见,并且积极劝阻.以下示例说明协同程序如何拦截取消请求:
async def cancel_me(): print("cancel_me(): before sleep") try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print("cancel_me(): cancel sleep") raise finally: print("cancel_me(): after sleep")async def main(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now")asyncio.run(main())# Expected output:## cancel_me(): before sleep# cancel_me(): cancel sleep# cancel_me(): after sleep# main(): cancel_me is cancelled now
result
()-
返回任务的结果.
如果任务是done,包装的coroutineis的结果返回(或者如果协程引发异常,则会重新提升。)
如果任务已经cancelled,这个方法引起了
CancelledError
exception.如果Task的结果还没有,这个方法引起了
InvalidStateError
例外。
exception
()-
返回Task的例外.
如果包装的协程引发异常,则返回异常。如果包装的协程正常返回,则此方法返回
None
.如果任务已经cancelled,则此方法会引发
CancelledError
例外如果任务还没有done,这个方法提出了
InvalidStateError
异常
add_done_callback
// (callback, *, context=None )-
添加一个回调,当任务为done.
此方法仅应用于基于低级回调的代码.
有关详细信息,请参阅
Future.add_done_callback()
的文档.
remove_done_callback
(callback)-
从回调列表中删除callback.
此方法仅应用于基于低级回调的代码.
有关详细信息,请参阅
Future.remove_done_callback()
的文档.
get_stack
(*, limit=None)-
返回此任务的堆栈帧列表.
如果未完成包装协程,则会返回挂起的堆栈。如果协程已成功完成或被取消,则返回一个空列表。如果协程被异常终止,则返回回溯帧列表.
框架总是从最旧到最新排序.
只有一个堆叠框架可用于悬挂的协程
可选limit参数设置framesto返回的最大数量;默认情况下,返回所有可用的帧。返回列表的排序因whethera堆栈或返回的追溯而有所不同:返回最新的astack帧,但返回最旧的追溯帧。(这与回溯模块的行为相匹配。)
print_stack
(*, limit=None, file=None)-
打印此任务的堆栈或回溯.
对于由
get_stack()
.limit参数传递给
get_stack()
直接file参数是输出写入的I / O流;默认输出写入
sys.stderr
.
- classmethod
all_tasks
(loop=None) -
返回一组事件循环的所有任务.
默认情况下,返回当前事件循环的所有任务。如果loop是
None
,get_event_loop()
function用于获取当前循环这个方法是弃用并将在Python 3.9中删除。使用
asyncio.all_tasks()
功能代替
- classmethod
current_task
(loop=None) -
返回当前正在运行的任务或
None
.如果loop是
None
,get_event_loop()
函数用于获取当前循环这个方法是弃用并将被删除inPython3.9。使用
asyncio.current_task()
functioninstead.
基于生成器的协同程序
注意
支持基于生成器的协程弃用 andis计划在Python 3.10中删除.
Generator-based coroutines preate async / await语法。它们是使用yield from
表达式来awaiton Futures和其他协程的Python生成器.