队列 – 异步I / O(Python教程)(参考资料)
队列
asyncio队列的设计类似于queue
模块的类。尽管asyncio队列不是线程安全的,但它们被设计为专门用于异步/等待代码.
请注意,asyncio队列的方法没有timeout参数;使用asyncio.wait_for()
使用atimeout.
另见下面的示例部分.
队列
- class
asyncio.
Queue
(maxsize=0, *, loop=None) -
先进先出(FIFO)队列
如果maxsize小于或等于零,队列大小是无限的。如果是大于的整数
0
, 然后await put()
当队列达到maxsize直到一个项被get()
.删除不同于标准库线程
queue
,队列的大小总是已知的,可以通过调用qsize()
方法返回//这个课是不是线程安全的.
maxsize
-
队列中允许的项目数量
empty
()-
返回
True
如果队列是空的,False
除此以外。
- coroutine
get
() -
从队列中删除并返回一个项目。如果队列是空的,请等待一个项目可用.
get_nowait
()-
如果一个项目立即可用,则返回一个项目,否则加注
QueueEmpty
.
- coroutine
join
() -
阻止,直到队列中的所有项目都被收到并处理完毕.
每当项目添加到队列时,未完成任务的计数就会增加。每当消费者协程调用时,计数就会下降
task_done()
表示该项目已被检索,并且其上的所有工作都已完成。当未完成任务的数量减少到零时,join()
unblocks.
qsize
()-
返回队列中的项目数.
优先级队列
- class
asyncio.
PriorityQueue
-
Queue
的变种按优先级顺序检索条目(最低的第一个).条目通常是形式的元组
(priority_number, data)
.
LIFO Queue
- class
asyncio.
LifoQueue
-
Queue
的一个变体,它首先检索最近的addedentries(last in,first out).
例外
- exception
asyncio.
QueueEmpty
-
在
get_nowait()
方法调用空队列时引发此异常.
- exception
asyncio.
QueueFull
-
在
put_nowait()
方法被调用到队列已经到达maxsize.
时引发异常例子
队列可用于在几个并发任务之间分配工作负载:
import asyncioimport randomimport timeasync def worker(name, queue): while True: # Get a "work item" out of the queue. sleep_for = await queue.get() # Sleep for the "sleep_for" seconds. await asyncio.sleep(sleep_for) # Notify the queue that the "work item" has been processed. queue.task_done() print(f"{name} has slept for {sleep_for:.2f} seconds")async def main(): # Create a queue that we will use to store our "workload". queue = asyncio.Queue() # Generate random timings and put them into the queue. total_sleep_time = 0 for _ in range(20): sleep_for = random.uniform(0.05, 1.0) total_sleep_time += sleep_for queue.put_nowait(sleep_for) # Create three worker tasks to process the queue concurrently. tasks = [] for i in range(3): task = asyncio.create_task(worker(f"worker-{i}", queue)) tasks.append(task) # Wait until the queue is fully processed. started_at = time.monotonic() await queue.join() total_slept_for = time.monotonic() - started_at # Cancel our worker tasks. for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) print("====") print(f"3 workers slept in parallel for {total_slept_for:.2f} seconds") print(f"total expected sleep time: {total_sleep_time:.2f} seconds")asyncio.run(main())
评论被关闭。