使用asyncio进行开发 – 异步I / O(Python教程)(参考资料)
使用asyncio进行开发
异步编程与经典的“顺序”编程不同.
本页列出了常见的错误和陷阱,并解释了如何避免它们.
调试模式
默认情况下,asyncio在生产模式下运行。为了简化开发,asyncio有debug mode.
有几种方法可以启用asyncio调试模式:
- 将
PYTHONASYNCIODEBUG
环境变量设置为1
. - 使用
-X
dev
Python命令行选项. - Passing
debug=True
到asyncio.run()
. - 调用
loop.set_debug()
.
除了启用调试模式外,还要考虑:
-
将 asyncio logger 的日志级别设置为
logging.DEBUG
,例如,可以在启动应用程序时运行以下代码片段:logging.basicConfig(level=logging.DEBUG)
-
配置
warnings
模块以显示ResourceWarning
警告。一种方法是使用-W
default
命令行选项.
启用调试模式时:
并发和多线程
事件循环在线程(通常是主线程)中运行,并在其线程中执行所有回调和任务。当一个Task在事件循环中运行时,没有其他任务可以在同一个线程中运行。当一个Taskexecutes an await
表达式,运行的任务被挂起,事件循环执行下一个任务.
要从另一个OS线程调度回调,应该使用loop.call_soon_threadsafe()
方法。例:
loop.call_soon_threadsafe(callback, *args)
几乎所有的asyncio对象都不是线程安全的,除非有从Task或回调外部使用它们的代码,否则这通常不是问题。如果需要这样的代码来调用低级别的asyncio API,那么应该使用loop.call_soon_threadsafe()
方法,例如:
loop.call_soon_threadsafe(fut.cancel)
要从不同的OS线程调度一个协程对象,run_coroutine_threadsafe()
应该使用函数。它返回一个concurrent.futures.Future
来访问结果:
async def coro_func(): return await asyncio.sleep(1, 42)# Later in another OS thread:future = asyncio.run_coroutine_threadsafe(coro_func(), loop)# Wait for the result:result = future.result()
要处理信号和执行子进程,事件循环必须在主线程中出现
// loop.run_in_executor()
//可以与concurrent.futures.ThreadPoolExecutor
一起使用来在不同的OS线程中执行阻塞代码而不会阻塞事件循环运行的OS线程.
运行阻塞代码
阻塞(CPU不应该直接调用代码。例如,如果函数执行CPU密集型计算1秒钟,则所有并发asyncio任务和IO操作将延迟1秒
执行程序可用于在不同的线程中运行任务,甚至可以在不同的进程中运行,以避免阻塞使用事件循环阻止OS线程。见loop.run_in_executor()
更详细的方法
登录
asyncio使用logging
模块和所有日志记录都是通过"asyncio"
logger.
默认日志级别是logging.INFO
,可以轻松调整:
logging.getLogger("asyncio").setLevel(logging.WARNING)
检测永不停息的协程
当调用协程函数但未等待(例如coro()
而不是await coro()
)或者协程没有安排asyncio.create_task()
时,asynciowill发出RuntimeWarning
:
import asyncioasync def test(): print("never scheduled")async def main(): test()asyncio.run(main())
输出:
test.py:7: RuntimeWarning: coroutine "test" was never awaited test()
在调试模式下输出:
test.py:7: RuntimeWarning: coroutine "test" was never awaitedCoroutine created at (most recent call last) File "../t.py", line 9, in <module> asyncio.run(main(), debug=True) < .. > File "../t.py", line 7, in main test() test()
通常的解决方法是等待协程或调用asyncio.create_task()
函数:
async def main(): await test()
检测从未检索到的异常
如果Future.set_exception()
虽然调用了Future对象,但异常永远不会传播到用户代码。在这种情况下,当theFuture对象被垃圾收集时,asyncio会发出一条日志消息.
未处理异常的示例:
import asyncioasync def bug(): raise Exception("not consumed")async def main(): asyncio.create_task(bug())asyncio.run(main())
输出:
Task exception was never retrievedfuture: <Task finished coro=<bug() done, defined at test.py:3> exception=Exception("not consumed")>Traceback (most recent call last): File "test.py", line 4, in bug raise Exception("not consumed")Exception: not consumed
启用调试模式以获取创建任务的跟踪:
asyncio.run(main(), debug=True)
在调试模式下输出:
Task exception was never retrievedfuture: <Task finished coro=<bug() done, defined at test.py:3> exception=Exception("not consumed") created at asyncio/tasks.py:321>source_traceback: Object created at (most recent call last): File "../t.py", line 9, in <module> asyncio.run(main(), debug=True)< .. >Traceback (most recent call last): File "../t.py", line 4, in bug raise Exception("not consumed")Exception: not consumed
评论被关闭。