– 启动并行任务 – 并发执行(Python教程)(参考资料)
concurrent.futures
– 启动并行任务
版本3.2.
新增源代码: Lib / concurrent / futures / thread.py和Lib / concurrent / futures / process.py
concurrent.futures
模块提供了一个同步执行callables的高级接口.
可以使用ThreadPoolExecutor
使用线程执行异步执行,也可以使用ProcessPoolExecutor
。两者都实现了相同的接口,由抽象Executor
类。
执行者对象
- class
concurrent.futures.
Executor
-
一个抽象类,提供异步执行调用的方法。它不应该直接使用,而是通过它的具体子类.
submit
(fn, *args, **kwargs)-
安排可调用的,fn,以
fn(*args **kwargs)
并返回Future
表示可执行的执行的对象.with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
map
(func, *iterables, timeout=None, chunksize=1)-
相近
map(func, *iterables)
除了:- /iterables是立即收集而不是懒惰地收集;
- func是异步执行的,并且可以同时调用func几次调用
返回的迭代器提出
concurrent.futures.TimeoutError
如果__next__()
调用结果后timeout秒从原始调用到Executor.map()
.timeout可以是int或float。如果没有指定timeout或None
,则等待时间没有限制.如果func调用引发异常,那么当从iterator
中检索到它的值时,该异常将被调用。当使用
ProcessPoolExecutor
时,这个方法会调整iterables分成几个块,它作为单独的任务提交给池。这些块的(近似)大小可以通过设置chunksize到正整数。对于很长的迭代,使用chunksize的大值可以显着提高性能,而默认大小为1.对于ThreadPoolExecutor
, chunksize没有效果.在版本3.5中更改:添加了chunksize参数
shutdown
// (wait=True )-
向执行者发出信号,表示当目前待处理的期货完成执行时,它应该释放它正在使用的任何资源。打电话给
Executor.submit()
和Executor.map()
停机后制作RuntimeError
.如果wait是
True
然后这个方法将不会返回,直到所有的待处理期货都完成执行并且已经释放了与执行者相关的资源。如果wait是False
然后这个方法将立即返回,并且当所有待处理的期货完成执行时,与执行者相关的资源将会出现。无论wait,整个Python程序都不会退出,直到所有未完成的期货都执行完毕.如果使用
with
语句,你可以避免显式调用这个方法,这将关闭Executor
(好像Executor.shutdown()
被调用wait设置为True
):import shutilwith ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, "src1.txt", "dest1.txt") e.submit(shutil.copy, "src2.txt", "dest2.txt") e.submit(shutil.copy, "src3.txt", "dest3.txt") e.submit(shutil.copy, "src4.txt", "dest4.txt")
ThreadPoolExecutor
ThreadPoolExecutor
是Executor
使用线程池异步执行调用的子类.
当可调用与Future
腰部相关的另一个Future
的结果时,就会发生死锁。例如:
import timedef wait_on_b(): time.sleep(5) print(b.result()) # b will never complete because it is waiting on a. return 5def wait_on_a(): time.sleep(5) print(a.result()) # a will never complete because it is waiting on b. return 6executor = ThreadPoolExecutor(max_workers=2)a = executor.submit(wait_on_b)b = executor.submit(wait_on_a)
And:
def wait_on_future(): f = executor.submit(pow, 5, 2) # This will never complete because there is only one worker thread and # it is executing this function. print(f.result())executor = ThreadPoolExecutor(max_workers=1)executor.submit(wait_on_future)
- class
concurrent.futures.
ThreadPoolExecutor
(max_workers=None, thread_name_prefix=””, initializer=None, initargs=()) -
一个
Executor
子类,最多使用一个池max_workers异步执行调用的线程initializer是一个可选的可调用函数,在每个工作线程的开头调用;initargs是传递给初始化器的参数元组。应该 initializer提出异常,所有当前正在裁员的工作都会提高
BrokenThreadPool
,以及任何向工作组提交更多工作的企图.在版本3.5中更改:如果max_workers是
None
如果没有给出,它将默认为机器上的处理器数量,乘以5
,假设ThreadPoolExecutor
通常用于重叠I / O而不是CPU工作,并且工人的数量应该高于ProcessPoolExecutor
.版本3.6中新增:thread_name_prefix添加了一个参数,允许用户控制池创建的工作线程的
threading.Thread
名称,以便于调试.更改版本3.7:添加了initializer和initargs参数
ThreadPoolExecutor示例
import concurrent.futuresimport urllib.requestURLS = ["http://www.foxnews.com/", "http://www.cnn.com/", "http://europe.wsj.com/", "http://www.bbc.co.uk/", "http://some-made-up-domain.com/"]# Retrieve a single page and report the URL and contentsdef load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read()# We can use a with statement to ensure threads are cleaned up promptlywith concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print("%r generated an exception: %s" % (url, exc)) else: print("%r page is %d bytes" % (url, len(data)))
ProcessPoolExecutor
ProcessPoolExecutor
类是一个Executor
子类,它使用一个进程池来异步执行调用.ProcessPoolExecutor
使用multiprocessing
模块,它可以左右全局解释器锁但也意味着可以执行和返回只有可选择的对象.
__main__
模块必须可由工作子进程导入。这意味着ProcessPoolExecutor
将无法在交互式解释器中工作.
调用Executor
或Future
来自可调用的方法提交到ProcessPoolExecutor
会导致在deadlock.
- class
concurrent.futures.
ProcessPoolExecutor
(max_workers=None, mp_context=None, initializer=None, initargs=()) -
Executor
使用最多max_workers进程池异步执行调用的子类。如果max_workers是None
或未通过,它将默认为机器上的处理器数量。如果max_workers低于或等于0
,则aValueError
将被提升.mp_context可以是多处理上下文或无。它将用于发射工人。如果mp_context是None
或没有给出,则使用defaultmultiprocessing上下文.initializer是一个可选的可调用函数,在每个工作进程的开始时调用;initargs是传递给初始化器的参数元组。应该 initializer提出异常,所有正在进行的工作都将提高
BrokenProcessPool
,以及任何向工作池提交更多工作的尝试.更改版本3.3:当其中一个工作进程突然终止,现在出现了
BrokenProcessPool
错误。以前,行为是未定义的,但执行者或其未来的操作经常会冻结或死锁.更改版本3.7:添加了mp_context参数以允许用户控制start_method对于由池创建的工作进程
添加了initializer和initargs arguments.
ProcessPoolExecutor示例
import concurrent.futuresimport mathPRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419]def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return Truedef main(): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print("%d is prime: %s" % (number, prime))if __name__ == "__main__": main()
未来对象
Future
类封装了callable的异步执行.Future
实例由Executor.submit()
.
- class
concurrent.futures.
Future
-
创建封装可调用的异步执行。
Future
实例是由Executor.submit()
创建的,不应该直接创建,除了测试.cancel
()-
取消通话。如果当前正在执行调用并且无法取消,则该方法将返回
False
,否则呼叫将被取消,方法将返回True
.
cancelled
()-
如果呼叫被成功取消,则返回
True
.
running
()-
返回
True
如果电话当前正在执行且无法解决.
done
()-
如果呼叫成功取消或结束运行,请回复
True
result
(timeout=None)-
返回调用返回的值。如果呼叫尚未完成,则此方法将等待timeout秒。如果呼叫未在timeout秒内完成,则会引发
concurrent.futures.TimeoutError
。timeout可以bean int或float。如果没有指定timeout或None
,则等待时间有限.如果在完成之前取消将来取消,那么
CancelledError
将被提出。如果提出调用,这个方法会引发同样的异常.
exception
(timeout=None)-
返回通话引发的异常。如果呼叫尚未完成,则此方法将等到timeout秒。如果呼叫没有在timeout秒内完成,则会引发
concurrent.futures.TimeoutError
。timeout可以bean int或float。如果没有指定timeout或None
,等待时间就没有了.如果未来在完成之前被取消那么
CancelledError
将被提升.如果通话完成而没有提高,则
None
被退回.
add_done_callback
(fn)-
将可调用的fn附加到将来。fn当未来被取消或结束时,将以未来作为唯一的论点来召唤.
添加的callables按照添加的顺序调用,并且总是在属于添加它们的进程的线程中调用。如果可赎回提出
Exception
子类,它将被记录和签名。如果callable引发BaseException
子类,则行为未定义.如果将来已经完成或被取消,fn将立即考虑
以下
Future
方法用于单元测试和Executor
实施.set_running_or_notify_cancel
()-
这个方法只能由
Executor
在执行与Future
和unittests相关的工作之前的实现如果方法返回
False
那么Future
被取消,即。Future.cancel()
被叫了回来真的。任何等待Future
完成的线程(即通过as_completed()
或wait()
)都会被唤醒如果方法返回
True
然后Future
未被取消并且已经处于运行状态,即呼叫Future.running()
将返回 真正.此方法只能调用一次,在
Future.set_result()
要么Future.set_exception()
被叫了
set_result
(result)-
将与
Future
关联的工作结果设置为result.此方法仅应由
Executor
实现使用andunit tests.
set_exception
(exception)-
将与
Future
关联的工作结果设置为Exception
exception.此方法仅应由
Executor
implements andunit tests使用
模块函数
concurrent.futures.
wait
(fs, timeout=None, return_when=ALL_COMPLETED)-
等待
Future
由Executor
给出的实例(可能由不同的fs实例创建)完成。返回一组named2元组。第一集,名为done
,包含在等待完成之前完成(已完成或已取消)的期货。第二集,名为not_done
,包含未完成的期货.timeout可用于控制返回前等待的最大秒数。timeout可以是int或float。如果没有指定timeout或
None
,等待时间没有限制.return_when表示此功能何时返回。它必须是以下常量之一:
常量 描述 FIRST_COMPLETED
当任何未来完成或被取消时,该函数将返回. FIRST_EXCEPTION
当任何结束时通过引发异常来返回该函数。如果没有未来会引发异常,则相当于 ALL_COMPLETED
.ALL_COMPLETED
所有功能完成或取消后,该功能将返回.
concurrent.futures.
as_completed
(fs, timeout=None)-
返回
Future
实例(可能是由不同的Executor
实例)由给出fs这使得期货完整(已完成或被取消)。由fs给出的任何复制的期货将被退回一次。任何在as_completed()
被称为将首先产生。返回的iteratorraises一个concurrent.futures.TimeoutError
如果__next__()
被调用,结果在timeout秒后原始调用as_completed()
. timeout可以是int或float。如果timeout未指定或None
,等待时间没有限制.
也可以看看
- PEP 3148 – 期货 – 异步执行计算
- 描述此功能的提议包含在Python标准库中.
异常类
- exception
concurrent.futures.
CancelledError
-
在取消未来时引发.
- exception
concurrent.futures.
TimeoutError
-
当未来的操作超过给定的超时时引发
- exception
concurrent.futures.
BrokenExecutor
-
来自
RuntimeError
当执行器由于某种原因而被破坏时,会引发此异常类,并且不能用于提交或执行新任务.新版本3.7.
- exception
concurrent.futures.thread.
BrokenThreadPool
-
来自
BrokenExecutor
当ThreadPoolExecutor
的一个工人初始化失败时,会引发这个异常类.版本3.7.
- exception
concurrent.futures.process.
BrokenProcessPool
-
来自
BrokenExecutor
(原RuntimeError
),当一个ProcessPoolExecutor
的工作者以非清洁方式终止时(例如,如果它是从外面被杀死的话),就会引发这个异常类.版本3.3.