– 基于进程的并行性 – 并发执行(Python教程)(参考资料)
multiprocessing
– 基于流程的并行性
源代码:LIB /多
介绍
multiprocessing
是一个使用类似于的API支持生成进程的包threading
模块。multiprocessing
package提供本地和远程并发,通过使用子进程而不是线程有效地侧面执行全局解释器锁。Dueto,multiprocessing
模块允许程序员在给定的机器上充分利用多个处理器。它可以在Unix和Windows上运行.
multiprocessing
模块还介绍了中没有注册的APIthreading
模块。一个很好的例子就是Pool
这个对象提供了一种方便的方法,可以跨多个输入值并行执行函数,在进程间分配输入数据(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,即子进程可以成功导入该模块。这个使用Pool
,
from multiprocessing import Pooldef f(x): return x*xif __name__ == "__main__": with Pool(5) as p: print(p.map(f, [1, 2, 3]))
将打印到标准输出
[1, 4, 9]
Process
类
在multiprocessing
,通过创建Process
对象,然后调用它start()
方法。Process
遵循threading.Thread
。一个简单的amultiprocess程序的例子是
from multiprocessing import Processdef f(name): print("hello", name)if __name__ == "__main__": p = Process(target=f, args=("bob",)) p.start() p.join()
为了显示所涉及的各个进程ID,这里有一个扩展的例子:
from multiprocessing import Processimport osdef info(title): print(title) print("module name:", __name__) print("parent process:", os.getppid()) print("process id:", os.getpid())def f(name): info("function f") print("hello", name)if __name__ == "__main__": info("main line") p = Process(target=f, args=("bob",)) p.start() p.join()
为了解释为什么if __name__ == "__main__"
部分是必要的,请参阅编程指南.
上下文和启动方法
根据平台,multiprocessing
支持三种启动流程的方法。这些start methods是
- spawn
父进程启动一个新的python解释器进程。子进程只会继承那些破坏进程对象
run()
方法所需的资源。特别是,不会继承父进程中不必要的文件描述符和句柄。与使用fork或forkserver.相比,使用此方法启动进程的速度较慢,可在Unix和Windows上使用。Windows上的默认值
- fork
父进程使用
os.fork()
来分叉Python解释器。子进程在开始时与父进程有效地相似。父进程的所有资源都由子进程继承。请注意,安全分叉amultithreaded进程是有问题的.仅在Unix上可用。Unix上的默认值
- forkserver
当程序启动并选择forkserver启动方法,启动服务器进程。从那时起,无论何时需要新进程,父进程都会连接到服务器并请求它分叉一个新进程。fork服务器进程是单线程的,所以使用
os.fork()
。Nounnecessary资源是继承的.在Unix平台上可用,支持在Unix管道上传递文件描述符.
版本3.4更改:spawn添加到所有unix平台上,并且forkserver添加了很多unix平台.Child进程不再继承Windows上所有父类的继承句柄.
在Unix上使用spawn或forkserver启动方法也会启动semaphore tracker跟踪由程序进程创建的未链接的命名信号的进程。当所有进程都退出时,信号量跟踪器取消链接任何剩余的信号量。通常应该没有,但是如果某个进程被信号杀死,那么可能会有一些“泄露”的信号量。(取消链接命名的semaphoresis是一个严重的问题,因为系统只允许有限的数量,并且在下次重新启动之前它们不会自动取消链接。)
要选择启动方法,请使用主模块的set_start_method()
在if __name__ == "__main__"
子句中。例如:
import multiprocessing as mpdef foo(q): q.put("hello")if __name__ == "__main__": mp.set_start_method("spawn") q = mp.Queue() p = mp.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()
set_start_method()
在程序中不应该多次使用.
或者,你可以使用get_context()
获得一个contextobject。Context对象与multiprocessingmodule具有相同的API,并且允许在同一程序中使用多个start方法.
import multiprocessing as mpdef foo(q): q.put("hello")if __name__ == "__main__": ctx = mp.get_context("spawn") q = ctx.Queue() p = ctx.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()
请注意,与一个上下文相关的对象可能与不同上下文的过程不兼容。特别是使用fork上下文不能传递给使用spawn或forkserver启动方法启动的进程
想要使用特定启动方法的库应该使用get_context()
避免干扰图书馆用户的选择.
警告
"spawn"
和"forkserver"
start方法目前不能与“冻结”的可执行文件一起使用(即,像PyInstaller 和cx_Freeze )在Unix上."fork"
启动方法确实有效.
在进程之间交换对象
multiprocessing
支持进程之间的两种通信通道:
队列
管道
Pipe()
函数返回一对由apipe连接的连接对象,默认为双工(双向)。例如:from multiprocessing import Process, Pipedef f(conn): conn.send([42, None, "hello"]) conn.close()if __name__ == "__main__": parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, "hello"]" p.join()返回的两个连接对象
Pipe()
代表管道的两端。每个连接对象都有send()
和recv()
方法(等等)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的same末端,则管道中的数据可能会损坏。当然,同时使用管道不同端的进程不存在损坏的风险.
进程之间的同步
multiprocessing
包含来自threading
的所有同步原语的等价物。例如,一个人可以使用一个锁来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process, Lockdef f(l, i): l.acquire() try: print("hello world", i) finally: l.release()if __name__ == "__main__": lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
不使用来自不同进程的锁定输出可能会混淆掉
共享状态进程之间
如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。当使用多个进程时尤其如此.
但是,如果你真的需要使用一些共享数据,那么multiprocessing
提供了几种方法.
共享内存
数据可以使用
Value
要么Array
。例如,以下代码from multiprocessing import Process, Value, Arraydef f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i]if __name__ == "__main__": num = Value("d", 0.0) arr = Array("i", range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])会打印
3.1415927[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
"d"
和"i"
创建时使用的参数num
和arr
使用的那种类型array
模块:"d"
表示双精度浮点数和"i"
表示有符号整数。这些共享对象将是进程和thread-safe.为了更灵活地使用共享内存,可以使用
multiprocessing.sharedctypes
支持创建从共享内存分配的任意ctypes对象的模块.
Server process
由
Manager()
控制一个服务器进程,该进程保留Python对象并允许其他进程使用proxies来操作它们.一位经理
Manager()
将支持类型list
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Barrier
,Queue
,Value
和Array
。例如,from multiprocessing import Process, Managerdef f(d, l): d[1] = "1" d["2"] = 2 d[0.25] = None l.reverse()if __name__ == "__main__": with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)会打印
{0.25: None, 1: "1", "2": 2}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存要慢.
使用工作池
Pool
类表示一个workerprocesses池。它有一些方法允许以几种不同的方式将任务卸载到workerprocesses .
例如:
from multiprocessing import Pool, TimeoutErrorimport timeimport osdef f(x): return x*xif __name__ == "__main__": # start 4 worker processes with Pool(processes=4) as pool: # print "[0, 1, 4,..., 81]" print(pool.map(f, range(10))) # print same numbers in arbitrary order for i in pool.imap_unordered(f, range(10)): print(i) # evaluate "f(20)" asynchronously res = pool.apply_async(f, (20,)) # runs in *only* one process print(res.get(timeout=1)) # prints "400" # evaluate "os.getpid()" asynchronously res = pool.apply_async(os.getpid, ()) # runs in *only* one process print(res.get(timeout=1)) # prints the PID of that process # launching multiple evaluations asynchronously *may* use more processes multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) # make a single worker sleep for 10 secs res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("We lacked patience and got a multiprocessing.TimeoutError") print("For the moment, the pool remains available for more work") # exiting the "with"-block has stopped the pool print("Now the pool is closed and no longer available")
请注意,池的方法只能由创建它的进程使用.
注意
这个包中的功能要求__main__
模块由孩子们提供。这包含在编程指南中但是值得指出这里。这意味着一些例子,比如multiprocessing.pool.Pool
示例在交互式解释器中不起作用。例如:
>>> from multiprocessing import Pool>>> p = Pool(5)>>> def f(x):... return x*x...>>> p.map(f, [1,2,3])Process PoolWorker-1:Process PoolWorker-2:Process PoolWorker-3:Traceback (most recent call last):AttributeError: "module" object has no attribute "f"AttributeError: "module" object has no attribute "f"AttributeError: "module" object has no attribute "f"
(如果你试试这个,它实际上会以半随机的方式输出三个完整的回溯,然后你可能会以某种方式对主进程进行tostop。)
参考
multiprocessing
包大多复制了threading
module.
Process
和例外
- class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) -
流程对象表示在单独的流程中运行的活动。该
Process
class具有threading.Thread
.的所有方法的等价物。应始终使用关键字参数调用构造函数。group应该总是
None
;它只是为了兼容threading.Thread
. target是run()
方法调用的可调用对象。它默认为None
,这意味着什么都没有。name是进程名称(详见name
).args是目标调用的参数元组。kwargs是目标调用的关键字参数的adictionary。如果提供,则仅关键字daemon参数设置进程daemon
flagtoTrue
要么False
。如果None
(默认值),此标志将从创建进程中继承.默认情况下,没有参数传递给target.
如果子类重写构造函数,则必须确保它调用基类构造函数(
Process.__init__()
)在做任何其他事情之前进行处理.在版本3.3中更改:添加了daemon论据
run
//()-
表示进程活动的方法.
你可以在子类中覆盖这个方法。标准
run()
方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别从args和kwargs参数中获取顺序和关键字参数.
start
()-
启动进程的活动
每个进程对象最多必须调用一次。它安排对象的
run()
方法在一个单独的过程中调用.
join
( [timeout])-
如果可选参数timeout是
None
(默认值),则方法会阻塞到join()
方法被调用终止。如果timeout是正数,它最多会阻塞timeout秒。注意方法返回None
如果其进程终止或方法超时。检查进程exitcode
确定是否终止了一个过程可以连接多次.
进程无法加入自身,因为这会导致死锁。在启动进程之前尝试加入进程是错误的.
name
-
流程的名称。该名称是用于识别目的的字符串。它没有语义。多个进程可以给出相同的名称.
初始名称由构造函数设置。如果没有为构造函数提供明确的名称,则表示名称’Process-N 1 :N 2 :…:N k ‘是构造的,在N k 是其父母的第N个孩子.
is_alive
()-
返回该过程是否还活着.
粗略地说,从
start()
方法返回,直到子进程终止.
daemon
-
进程的守护进程标志,一个布尔值。这必须在调用
start()
之前设置.初始值是从创建进程继承的.
当一个进程退出时,它会尝试终止所有的守护进程子进程.
请注意,不允许守护进程创建子进程。否则,如果守护进程在其父进程退出时被终止,则守护进程会使其子进行孤立。另外,这些是不 Unix守护进程或服务,如果非守护进程已经退出,它们是正常进程(并且不会被连接)。
另外到
threading.Thread
API,Process
对象还支持以下属性和方法:pid
-
返回进程ID。在生成该过程之前,这将是
None
.
exitcode
-
孩子的退出代码。如果这个过程还没有终止,那将是
None
。负值-N表示孩子被信号终止N.
authkey
-
进程的身份验证密钥(字节字符串).
初始化
multiprocessing
时,使用os.urandom()
.为主进程分配一个随机字符串当创建一个
Process
对象时,它会继承其父进程的身份验证密钥,虽然这可以通过将authkey
设置为另一个字节字符串来改变.查看验证密钥.
sentinel
-
A系统对象的数字句柄,当进程结束时将变为“就绪”.
如果你想使用
multiprocessing.connection.wait()
等待几个事件,你可以使用这个值。否则调用join()
更简单在Windows上,这是一个OS句柄,可用于
WaitForSingleObject
和WaitForMultipleObjects
API调用系列。在Unix上,这是一个文件描述符,可用于select
模块中的原语新版本3.3.
terminate
( )-
终止流程。在Unix上,这是使用
SIGTERM
信号完成的;在Windows上使用TerminateProcess()
。请注意,退出处理程序和最终条款等不会被执行.注意进程的后代进程将not被终止 – 他们将成为孤儿.
警告
如果在关联进程使用管道队列时使用此方法,则管道或队列可能会损坏并可能被其他进程无法使用。类似地,如果进程已获取锁或信号量等,则终止它可能导致其他进程死锁.
kill
()-
和
terminate()
相同但在Unix上使用SIGKILL
信号新版本3.7.
close
()-
关闭
Process
对象,释放与之关联的所有资源。ValueError
如果基础进程仍在运行,则引发此异常。一次close()
返回成功,Process
对象意志ValueError
.版本3.7.
注意
start()
,join()
,is_alive()
,terminate()
和exitcode
方法只能由创建过程对象的进程调用.使用
Process
:>>> import multiprocessing, time, signal>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))>>> print(p, p.is_alive())<Process(Process-1, initial)> False>>> p.start()>>> print(p, p.is_alive())<Process(Process-1, started)> True>>> p.terminate()>>> time.sleep(0.1)>>> print(p, p.is_alive())<Process(Process-1, stopped[SIGTERM])> False>>> p.exitcode == -signal.SIGTERMTrue
- exception
multiprocessing.
ProcessError
-
所有的基类
multiprocessing
exceptions.
- exception
multiprocessing.
BufferTooShort
-
当// @//提供缓冲区对象太小而无法读取消息时
Connection.recv_bytes_into()
引发异常如果
e
是BufferTooShort
的实例e.args[0]
将消息作为字节串.
- exception
multiprocessing.
AuthenticationError
-
出现身份验证错误时引发.
- exception
multiprocessing.
TimeoutError
-
超时到期时超时的方法提示.
管道和队列
当使用多个进程时,通常使用消息传递来进行进程之间的通信,并避免必须使用任何同步原语,如locks.
对于传递消息,可以使用Pipe()
(用于两个连接之间的连接)或队列(允许多个制作者和消费者).
Queue
, SimpleQueue
和JoinableQueue
types是多生产者,多用户FIFO 队列,在标准库的queue.Queue
类上建模。它们的不同之处在于Queue
缺少task_done()
和join()
在Python 2.5中引入的方法queue.Queue
class.
如果你使用JoinableQueue
然后你必须来电话JoinableQueue.task_done()
对于从队列中删除的每个任务,或者这些用于计算未完成任务数量的文件最终可能会溢出,引发异常.
注意也可以使用管理器对象创建共享队列 – 请参阅经理.
导入它们
当一个对象被放入一个队列时,该对象被腌制并且后续的线程会被刷新腌制数据到底层管道。这有一些后果有点令人惊讶,但不应该造成任何实际困难 – 如果他们真的很烦你那么你可以改为使用一个用管理器.
- 创建的队列。一个空队列,在队列的
empty()
方法返回False
和get_nowait()
canreturn而不抬起queue.Empty
. - 如果多个进程将对象排入队列,则可能无序地在另一端接收对象。但是,由相同进程排队的对象将始终处于相对于彼此的预期顺序中.
警告
如果在尝试使用Process.terminate()
时使用os.kill()
或Queue
,那么队列中的数据可能会被破坏。这可能导致任何其他进程在以后尝试使用队列时获得异常.
警告
如上所述,如果子进程已将项目放在队列中(并且它没有使用JoinableQueue.cancel_join_thread
),那么该进程将不会终止,直到所有缓冲的项目都被刷新到管道.
这意味着如果您尝试加入该过程,您可能会遇到死锁,除非您确定已经消耗了已放入队列的所有项目。类似地,如果子进程是非守护进程,则父进程在尝试加入所有非守护进程的子进程时可能会在退出时挂起.
请注意,使用管理器创建的队列没有此问题。请参阅编程指南.
multiprocessing.
Pipe
([duplex] )-
返回一对
(conn1, conn2)
的Connection
代表管子末端的物体如果duplex是
True
(默认值)然后管道是双向的。如果duplex是False
管道是单向的:conn1
只能用于接收信息,而conn2
只能用于发送信息.
- class
multiprocessing.
Queue
( [maxsize] ) -
返回使用管道和几个锁/信号量实现的进程共享队列。当一个进程首先将一个项目放入队列时,会启动一个feederthread,它将对象从缓冲区传输到管道中.
通常
queue.Empty
和queue.Full
来自标准库queue
模块的异常被提升到信号超时Queue
执行queue.Queue
的所有方法除外fortask_done()
和join()
.qsize
()-
返回队列的大致大小。由于多线程处理/多处理语义,这个数字不可靠.
注意这可能会在Mac OS X等Unix平台上引起
NotImplementedError
sem_getvalue()
没有实现.
empty
()-
如果队列是空的,请回复
True
,否则False
。由于多线程/多处理语义,这是不可靠的.
full
()-
返回
True
如果队列已满,False
否则。由于多线程/多处理语义,这是不可靠的.
put
(obj [, block [, timeout]])-
把obj放入队列。如果可选参数block是
True
(默认值)和timeout是None
(默认值),则必要时阻止untila空闲插槽可用。如果timeout是一个正数,如果nofree插槽在那段时间内可用,它最多会阻塞timeout秒并引发queue.Full
异常。否则(block是False
如果一个空闲插槽可以立即使用,将一个项目放在队列中,否则提起queue.Full
异常(timeout在这种情况下被忽略).
put_nowait
(obj)-
相当于
put(obj, False)
.
get
( [block [, timeout] ] )-
从队列中删除并返回一个项目。如果可选的args block是
True
(默认值)和timeout是None
(默认值),则在项目可用之前阻止if必要。如果timeout是一个正数,它最多会阻塞timeout秒,如果在那段时间内没有可用的项目,则会引发queue.Empty
异常。否则(阻止是False
),如果一个项目立即可用,则返回一个项目,否则提高queue.Empty
异常(在这种情况下timeout被忽略).
get_nowait
()-
相当于
get(False)
.
multiprocessing.Queue
还有一些queue.Queue
中没有的附加方法。对于大多数代码,这些方法通常是不必要的:close
()-
表示当前进程不再有数据放在此队列中。一旦将所有buffereddata刷新到管道,后台线程将退出。当队列被垃圾收集时自动调用.
join_thread
()-
加入后台主题。这只能在调用
close()
之后使用。它会阻塞,直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道.默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。这个过程可以调用
cancel_join_thread()
来join_thread()
什么也不做.
cancel_join_thread
()-
预防
join_thread()
阻挡。特别是,这可以防止后台线程在处理时自动连接 – 请参阅join_thread()
.此方法的更好名称可能是
allow_exit_without_flush()
。它可能会导致enqueueddata丢失,你几乎肯定不需要使用它。只有当你需要当前进程立即退出而不等待将入队数据刷新到底层管道时,它才真正存在,并且你不关心丢失的数据.
注意
此类的功能需要在主机操作系统上运行共享的信号量实现。没有一个,这个类的功能将被禁用,并尝试实例化
Queue
会导致ImportError
。Sebpo-3770了解更多信息。对于下面列出的任何专用队列类型也是如此.
- class
multiprocessing.
SimpleQueue
-
这是一个简化的
Queue
类型,非常接近锁定Pipe
.empty
()-
返回
True
如果队列是空的,False
除此以外。
get
()-
从队列中删除并返回一个项目.
put
(item)-
放item进入队列
- class
multiprocessing.
JoinableQueue
([maxsize] ) -
JoinableQueue
,一个Queue
子类,是一个task_done()
和join()
方法的队列task_done
// ()-
表示以前排队的任务已完成。由queueconsumers使用。每个
get()
用于获取任务,后续调用task_done()
告诉队列任务处理完成.如果
join()
当前正在阻止,它将在处理完所有项时恢复(意味着task_done()
调用已被put()
进入队列的每个项目).提出一个
ValueError
如果被叫的次数多于队列中放置的物品的数量.
join
()-
阻止,直到队列中的所有物品都被取出并处理完毕.
每当项目被添加到thequeue时,未完成任务的数量就会增加。每当消费者打电话
task_done()
表示该项目已被检索,所有工作都已完成。当未完成任务的数量降至零时,join()
unblocks.
Miscellaneous
multiprocessing.
active_children
()-
返回当前进程中所有活孩子的清单.
调用它具有“加入”任何已经完成的过程的副作用.
multiprocessing.
cpu_count
()-
返回系统中的CPU数量.
这个数字不等于当前进程可以使用的CPU数量。使用
len(os.sched_getaffinity(0))
可以获得可用的CPU数量//可以提高
NotImplementedError
.也可以看看
os.cpu_count()
multiprocessing.
current_process
()-
退回
Process
对应当前进程的对象.的类比
threading.current_thread()
.
multiprocessing.
freeze_support
()-
添加对使用
multiprocessing
已被冻结以生成Windows可执行文件。(已经用 py2exe ,测试了PyInstaller 和cx_Freeze.)需要在
if __name__ =="__main__"
主模块的行。例如:from multiprocessing import Process, freeze_supportdef f(): print("hello world!")if __name__ == "__main__": freeze_support() Process(target=f).start()
如果
freeze_support()
行被省略,那么尝试运行frozenexecutable会引发RuntimeError
.调用
freeze_support()
在Windows以外的任何操作系统上调用时无效。另外,如果Windows上的Python解释器正常运行该模块(该程序尚未冻结),那么freeze_support()
没有效果.
multiprocessing.
get_all_start_methods
()-
返回支持的启动方法列表,第一个是默认值。可能的启动方法是
"fork"
,"spawn"
和"forkserver"
。仅在Windows上"spawn"
isavailable。在Unix上"fork"
和"spawn"
都是支持的,"fork"
是默认的.新版本3.4.
multiprocessing.
get_context
(method=None)-
返回一个与
multiprocessing
module//如果method是
None
然后返回默认上下文。否则如果指定的启动方法method应该是"fork"
,"spawn"
,"forkserver"
.ValueError
不可用.新版本3.4.
multiprocessing.
get_start_method
(allow_none=False)-
返回用于启动进程的启动方法的名称.
如果启动方法尚未修复且allow_none如果为false,则start方法固定为默认值并返回名称。如果启动方法尚未修复且allow_none为真,则
None
返回.返回值可以是
"fork"
,"spawn"
,"forkserver"
或None
."fork"
是Unix上的默认值,而"spawn"
是Windows上的默认值.版本3.4.
multiprocessing.
set_executable
()-
设置启动子进程时要使用的Python解释器的路径。(默认使用
sys.executable
)。嵌入器可能需要一些像set_executable(os.path.join(sys.exec_prefix, "pythonw.exe"))
在他们可以创建子进程之前
版本3.4更改:现在在
"spawn"
使用启动方法.
multiprocessing.
set_start_method
(method)-
设置应该用于启动子进程的方法.method可以是
"fork"
,"spawn"
或"forkserver"
.请注意,这应该被调用一次,它应该被保护在
if __name__ == "__main__"
主要模块的条款.新版本3.4.
注意
multiprocessing
不包含threading.active_count()
, threading.enumerate()
,threading.settrace()
, threading.setprofile()
,threading.Timer
的类似物,或threading.local
.
连接对象
连接对象允许发送和接收可选对象或字符串。它们可以被认为是面向消息的连接套接字.
连接对象通常使用Pipe
– 也可以看看听众和客户.
- class
multiprocessing.connection.
Connection
-
send
(obj)-
将对象发送到连接的另一端,应该使用
recv()
.该对象必须是可选择的。非常大的泡菜(大约32 MiB +,虽然它取决于操作系统)可能会引起
ValueError
异常
recv
// ()-
使用
send()
。阻止直到有东西要收到。引发EOFError
如果没有什么可以收到而另一端被关闭了
fileno
()-
返回连接使用的文件描述符或句柄.
close
()-
关闭连接.
当连接被垃圾收集时自动调用.
poll
([timeout] )-
返回是否有任何数据可供阅读
如果没有指定timeout那么它将立即返回。如果timeout是一个数字,然后指定以秒为单位的最大时间。如果timeout是
None
则使用无限超时.注意可以使用
multiprocessing.connection.wait()
.
send_bytes
(buffer[, offset[, size] ] )-
从发送字节数据字节对象作为一个完整的信息
如果offset然后给出数据从中的那个位置读取buffer。如果size然后给出从缓冲区读取的许多字节。非常大的缓冲区(大约32 MiB +,虽然它取决于操作系统)可能会引发
ValueError
异常
recv_bytes
( [maxlength] )-
返回从连接另一端发送的字节数据的完整消息作为字符串。阻止直到有东西接收。起见
EOFError
如果没有什么东西可以接收而另一端已经关闭了如果指定了maxlength并且消息长于maxlength那么
OSError
就会被提起来连接将不再可读.改版3.3:这个功能用来举起
IOError
,现在是OSError
.
recv_bytes_into
(buffer[, offset] )-
读入buffer从连接另一端发送的字节数据的完整消息,并返回消息中的字节数。阻止直到有东西要收到。引发
EOFError
如果没有什么可以收到而另一端是封闭的.buffer必须是一个可写的字节对象。如果offset然后给出消息将从该位置写入缓冲区。偏移量必须是一个非负整数,小于buffer的长度(以字节为单位).
如果缓冲区太短,那么
BufferTooShort
异常israised和完整的消息可用作e.args[0]
其中e
是异常实例.
更改版本3.3:现在可以使用
Connection.send()
和Connection.recv()
.版本3.3中的新功能:连接对象现在支持上下文管理协议 – 请参阅上下文管理器类型.
__enter__()
返回连接对象,__exit__()
调用close()
.
例如:
>>> from multiprocessing import Pipe>>> a, b = Pipe()>>> a.send([1, "hello", None])>>> b.recv()[1, "hello", None]>>> b.send_bytes(b"thank you")>>> a.recv_bytes()b"thank you">>> import array>>> arr1 = array.array("i", range(5))>>> arr2 = array.array("i", [0] * 10)>>> a.send_bytes(arr1)>>> count = b.recv_bytes_into(arr2)>>> assert count == len(arr1) * arr1.itemsize>>> arr2array("i", [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv()
方法会自动取消收到的数据,这可能会带来安全风险,除非您可以信任发送邮件的过程.
因此,除非使用Pipe()
生成连接对象,否则在执行某种身份验证后,只应使用recv()
和send()
方法。见认证密钥.
警告
如果进程在尝试读取或写入管道时被杀死,则管道中的数据可能会被破坏,因为可能无法确定消息边界所在的位置.
同步原语
通常,同步原语在多进程程序中不像在多线程程序中那样必要。请参阅threading
module.
请注意,也可以使用managerobject创建同步原语 – 请参阅经理.
- class
multiprocessing.
Barrier
(parties[, action[, timeout] ] ) -
屏障对象:的克隆
threading.Barrier
.3.3版本中的新功能
- class
multiprocessing.
BoundedSemaphore
([value] ) -
一个有界的信号量对象:的密切类比
threading.BoundedSemaphore
.存在与其近似类比的独立差异:它
acquire
method’sfirst参数名为block,与Lock.acquire()
.一致注意
在Mac OS X上,这与
Semaphore
无法区分,因为sem_getvalue()
未在该平台上实现.
- class
multiprocessing.
Condition
( [lock] ) -
一个条件变量:
threading.Condition
.的别名如果指定了lock那么它应该是
Lock
或RLock
来自multiprocessing
.改变了版本3.3:
wait_for()
方法被添加了
- class
multiprocessing.
Event
-
克隆
threading.Event
.
- class
multiprocessing.
Lock
-
一个非递归的锁定对象:一个非常类似的
threading.Lock
。一旦进程或线程获得了一个锁,后续尝试从任何进程或线程获取它将阻塞直到它被释放;任何进程或线程都可以释放它。适用于线程的threading.Lock
的概念和行为在multiprocessing.Lock
中复制,因为它适用于进程或线程,除非另有说明.注意
Lock
实际上是一个工厂函数,它返回用adefault上下文初始化的multiprocessing.synchronize.Lock
的实例.Lock
支持上下文管理器协议,因此可以使用在with
陈述中acquire
(block=True, timeout=None)-
获取锁定,阻止或无阻塞.
与block参数设置为
True
(默认值),方法callwill阻塞,直到锁处于解锁状态,然后将其设置为locked并返回True
。请注意,第一个参数的名称与threading.Lock.acquire()
.中的名称不同于block参数设置为
False
,方法调用不阻塞。如果锁当前处于锁定状态,则返回False
;否则将锁设置为锁定状态并返回True
.当使用timeout的正浮点值调用时,只要锁定无法获取锁定,最多可以阻止timeout指定的秒数。对timeout等于timeout零。调用timeout的价值
None
(默认值)将超时时间设置为无限期。注意治疗阴性或None
的值timeout与threading.Lock.acquire()
中的实现行为不同。如果timeout参数设置为block并且因此被赋值,则False
参数没有实际应用。返回True
如果已经获得锁定,或者如果超时时间已经过去False
.
release
()-
解锁。这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程.
行为与
threading.Lock.release()
除了在未锁定的锁上调用时,ValueError
被养了
- class
multiprocessing.
RLock
-
递归锁定对象:的近似模拟
threading.RLock
。必须由获取它的进程或线程释放递归锁。一旦进程或线程获得了递归锁,相同的处理器线程可以再次获取它而不会阻塞;该过程或线程必须在每次获取时释放一次.注意
RLock
实际上是一个工厂函数,它返回用adefault上下文初始化的multiprocessing.synchronize.RLock
的实例.RLock
支持上下文管理器协议因此可用于with
陈述acquire
(block=True, timeout=None)-
获取锁,阻塞或无阻塞.
用block参数设置调用至
True
,阻止直到锁处于解锁状态(不属于任何进程或线程),取消锁定已被当前进程或线程拥有。然后当前进程或线程取得锁的所有权(如果它还没有所有权)并且锁内的递归级别增加1,导致返回值为True
。请注意,与threading.RLock.acquire()
的实现相比,第一个参数的行为有几个不同,从参数本身的名称开始.当用block参数设置为
False
,不要阻塞。如果锁已被另一个进程或线程获取(因而拥有),则当前进程或线程不占用所有权,并且锁中的递归级别不会更改,从而导致返回值为False
。如果锁处于解锁状态,则当前进程或线程获取所有权并递增递归级别,导致返回值为True
.使用和timeout参数与
Lock.acquire()
中的相同。请注意,timeout的某些行为与threading.RLock.acquire()
.
release
()-
释放锁定,递减递归级别。如果在递减级别之后递归级别为零,则将锁定重置为已解锁(未被任何进程或线程拥有),并且如果任何其他进程或线程被阻塞等待锁定被解锁,则允许其中一个进程继续。如果在递减之后递归级别仍为非零,则锁定保持锁定并由调用进程正确读取
只有在调用进程或线程拥有锁时才调用此方法。对
AssertionError
如果此方法由所有者以外的处理器线程调用,或者锁定处于未锁定(无主)状态,则引发此方法。请注意,在这种情况下引发的异常类型与threading.RLock.release()
.
- class
multiprocessing.
Semaphore
([value] ) -
信号量对象:的密切类比
threading.Semaphore
.存在与其近似类比的独立差异:它
acquire
method’sfirst参数名为block,与一致Lock.acquire()
.
注意
在Mac OS X上,sem_timedwait
不支持,所以调用acquire()
超时将使用睡眠循环模拟该函数行为.
注意
如果由产生的SIGINT信号按Ctrl-C 通过调用BoundedSemaphore.acquire()
, Lock.acquire()
,RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
要么 Condition.wait()
然后电话会立即中断并且KeyboardInterrupt
会被提出来
这与threading
当等效的阻塞调用正在进行时,SIGINT将被签名.
注意
该软件包的某些功能需要在主机操作系统上运行共享的信号量实现。没有一个,multiprocessing.synchronize
模块将被禁用,尝试导入它将导致ImportError
。Seebpo-3770了解更多信息.
经理
管理器提供了一种创建可在不同进程之间共享的数据的方法,包括在不同机器上运行的进程之间通过网络共享。管理器对象控制管理shared objects。其他进程可以使用proxies来访问共享对象.
-
返回一个开始的
SyncManager
可用于在进程之间共享对象的对象。返回的managerobject对应于一个衍生的子进程,并且有一些方法可以创建共享对象并返回相应的代理.
一旦它们被垃圾收集或者父进程退出,它们就会被关闭。经理类定义在multiprocessing.managers
module:
- class
multiprocessing.managers.
BaseManager
( [address [, authkey] ]) -
创建一个BaseManager对象.
一旦创建一个应该调用
start()
或get_server().serve_forever()
以确保管理器对象引用一个启动管理器进程.address是管理器进程侦听新连接的地址。如果address是
None
则选择任意一个.authkey是用于检查服务器进程的传入连接的有效性的验证密钥。如果authkey是
None
然后使用current_process().authkey
。否则使用authkey并且它必须是字节串.start
( [initializer [, initargs] ] )-
启动子进程以启动管理器。如果initializer不是
None
那么子进程会在启动时调用initializer(*initargs)
.
get_server
()-
返回一个
Server
对象,它表示对Manager进行控制的实际服务器。Server
对象支持serve_forever()
方法:>>> from multiprocessing.managers import BaseManager>>> manager = BaseManager(address=("", 50000), authkey=b"abc")>>> server = manager.get_server()>>> server.serve_forever()
Server
还有address
属性
connect
// ()-
将本地管理器对象连接到远程管理器进程:
>>> from multiprocessing.managers import BaseManager>>> m = BaseManager(address=("127.0.0.1", 5000), authkey=b"abc")>>> m.connect()
register
(typeid [, callable [, proxytype [, exposed [, method_to_typeid [, create_method] ] ] ] ] )-
一种类方法,可以用来注册类型或可以与经理类一起调用.
typeid是一个“类型标识符“用于识别共享对象的特定类型。这必须是一个字符串.
callable是一个可调用的,用于为这个typeidentifier创建对象。如果使用
connect()
方法,或者如果create_method参数是False
那么这可以保留为None
.proxytype是
BaseProxy
的子类,用于为共享对象创建代理这个 typeid。如果None
则会自动创建一个proxyclass。exposed用于指定一个方法名称序列,代码应该允许使用
BaseProxy._callmethod()
来访问这个typeid。(如果exposed是None
则使用proxytype._exposed_
而不是它存在。)如果没有指定暴露列表,则可以访问共享对象的所有“公共方法”。(这里的“公共方法”是指任何具有__call__()
方法的属性,其名称不以"_"
开头。)method_to_typeid是用于指定那些暴露的返回类型的映射应返回代理的方法。它映射方法名称totypeid字符串。(如果method_to_typeid是
None
则使用proxytype._method_to_typeid_
而不是它存在。)如果amethod的名称不是这个映射的关键字,或者映射是None
然后方法返回的对象将被value.create_method确定是否应该使用名称typeid这可以用来告诉服务器进程创建一个newshared对象并为它返回一个代理。默认情况下它是
True
.
BaseManager
实例也有一个只读属性:address
-
管理员使用的地址.
更改版本3.3: Manager对象支持上下文管理协议 – 请参阅上下文管理器类型.
__enter__()
启动服务器进程(如果尚未启动),然后返回管理器对象。__exit__()
classshutdown()
.以前的版本
__enter__()
如果它还没有启动,则没有启动管理器的服务器进程.
- class
multiprocessing.managers.
SyncManager
-
BaseManager
的子类可以用于进程的同步。这种类型的对象由multiprocessing.Manager()
.它的方法创建并返回代理对象,以便跨进程同步多种常用数据类型。这主要包括共享列表和词典.
Barrier
(parties [, action[, timeout] ] )-
创建一个共享的
threading.Barrier
对象并为它返回aproxy .版本3.3.
BoundedSemaphore
( [value])-
创建一个共享
threading.BoundedSemaphore
对象和为它返回aproxy .
Condition
([lock] )-
创建共享
threading.Condition
对象并返回代理forit.如果lock然后它应该是
threading.Lock
要么threading.RLock
宾语。改版3.3:
wait_for()
方法被添加了
Event
()-
创建共享
threading.Event
对象并返回它的代理.
Lock
()-
创建共享
threading.Lock
对象并返回它的代理.
Namespace
()-
创建一个共享
Namespace
对象并为其返回一个代理.
Queue
( [maxsize])-
创建一个共享的
queue.Queue
对象并为它返回一个代理.
RLock
()-
创建一个共享
threading.RLock
对象并返回它的代理.
Semaphore
( [value])-
创建共享
threading.Semaphore
object并返回一个代理forit.
Array
(typecode, sequence)-
创建一个数组并为它返回一个代理.
Value
(typecode, value)-
创建一个具有可写
value
属性的对象,并为其返回一个代理.
dict
()dict
(mapping)dict
(sequence)-
创建一个共享的
dict
对象并为其返回一个代理.
list
( )list
(sequence)-
创建共享
list
对象并返回它的代理.
版本3.6更改:共享对象能够嵌套。例如,共享容器(如共享列表)可以包含其他共享对象,这些对象将由
SyncManager
.
- class
multiprocessing.managers.
Namespace
-
可以使用
SyncManager
.注册的类型。命名空间对象没有公共方法,但是具有可写属性。它表示显示其属性的值.
但是,当使用代理命名空间对象时,以
"_"
开头的属性将是代理的属性,而不是其他属性:>>> manager = multiprocessing.Manager()>>> Global = manager.Namespace()>>> Global.x = 10>>> Global.y = "hello">>> Global._z = 12.3 # this is an attribute of the proxy>>> print(Global)Namespace(x=10, y="hello")
自定义管理器
要创建一个自己的管理器,可以创建一个BaseManager
的子类,并使用register()
classmethod向管理器类注册新的类型或可编辑器。例如:
from multiprocessing.managers import BaseManagerclass MathsClass: def add(self, x, y): return x + y def mul(self, x, y): return x * yclass MyManager(BaseManager): passMyManager.register("Maths", MathsClass)if __name__ == "__main__": with MyManager() as manager: maths = manager.Maths() print(maths.add(4, 3)) # prints 7 print(maths.mul(7, 8)) # prints 56
使用远程管理器
可以在一台计算机上运行管理服务器,并让客户端从其他计算机上使用它(假设所涉及的防火墙允许它).
运行以下命令为远程客户端可以访问的单个共享队列创建服务器:
>>> from multiprocessing.managers import BaseManager>>> from queue import Queue>>> queue = Queue()>>> class QueueManager(BaseManager): pass>>> QueueManager.register("get_queue", callable=lambda:queue)>>> m = QueueManager(address=("", 50000), authkey=b"abracadabra")>>> s = m.get_server()>>> s.serve_forever()
一个客户端可以按如下方式访问服务器:
>>> from multiprocessing.managers import BaseManager>>> class QueueManager(BaseManager): pass>>> QueueManager.register("get_queue")>>> m = QueueManager(address=("foo.bar.org", 50000), authkey=b"abracadabra")>>> m.connect()>>> queue = m.get_queue()>>> queue.put("hello")
另一个客户端也可以使用它:
>>> from multiprocessing.managers import BaseManager>>> class QueueManager(BaseManager): pass>>> QueueManager.register("get_queue")>>> m = QueueManager(address=("foo.bar.org", 50000), authkey=b"abracadabra")>>> m.connect()>>> queue = m.get_queue()>>> queue.get()"hello"
>>> from multiprocessing import Process, Queue>>> from multiprocessing.managers import BaseManager>>> class Worker(Process):... def __init__(self, q):... self.q = q... super(Worker, self).__init__()... def run(self):... self.q.put("local hello")...>>> queue = Queue()>>> w = Worker(queue)>>> w.start()>>> class QueueManager(BaseManager): pass...>>> QueueManager.register("get_queue", callable=lambda: queue)>>> m = QueueManager(address=("", 50000), authkey=b"abracadabra")>>> s = m.get_server()>>> s.serve_forever()
代理对象
代理是引用的对象一个共享的对象,它(大概)生活在一个不同的过程中。据说共享对象是代理的指示对象。多个代理对象可能具有相同的指示对象.
代理对象具有调用其指示对象的相应方法的方法(尽管并非所有指示对象的方法都必须通过代理可用)。通过这种方式,代理可以像它的指示对象一样使用:
>>> from multiprocessing import Manager>>> manager = Manager()>>> l = manager.list([i*i for i in range(10)])>>> print(l)[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]>>> print(repr(l))<ListProxy object, typeid "list" at 0x...>>>> l[4]16>>> l[2:5][4, 9, 16]
请注意,将str()
应用于代理将返回指示对象的表示,而应用repr()
将返回代理的表示.
代理对象的一个重要特性是它们是可选择的,因此它们可以在进程之间被绕过。因此,指示对象可以包含代理对象。这允许嵌套这些管理列表,dicts和其他代理对象:
>>> a = manager.list()>>> b = manager.list()>>> a.append(b) # referent of a now contains referent of b>>> print(a, b)[<ListProxy object, typeid "list" at ...>] []>>> b.append("hello")>>> print(a[0], b)["hello"] ["hello"]
同样,dict和list代理可以互相嵌套:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])>>> d_first_inner = l_outer[0]>>> d_first_inner["a"] = 1>>> d_first_inner["b"] = 2>>> l_outer[1]["c"] = 3>>> l_outer[1]["z"] = 26>>> print(l_outer[0]){"a": 1, "b": 2}>>> print(l_outer[1]){"c": 3, "z": 26}
如果标准(非代理)list
要么 dict
对象包含在指示对象中,对这些可变值的修改将不会通过管理器传播,因为代理无法知道何时修改其中包含的值。但是,将值存储在容器代理中(触发__setitem__
在代理对象上)确实通过管理器传播,因此为了有效地修改这样的项,可以将修改后的值重新分配给容器代理:
# create a list proxy and append a mutable object (a dictionary)lproxy = manager.list()lproxy.append({})# now mutate the dictionaryd = lproxy[0]d["a"] = 1d["b"] = 2# at this point, the changes to d are not yet synced, but by# updating the dictionary, the proxy is notified of the changelproxy[0] = d
对于大多数用例来说,这种方法可能不如使用嵌套的代理对象方便,但也可以对同步进行一定程度的控制.
注意
代理类型在multiprocessing
什么都不做,以支持价值比较。所以,例如,我们有:
>>> manager.list([1,2,3]) == [1,2,3]False
在进行比较时,应该使用指示物的副本.
- class
multiprocessing.managers.
BaseProxy
-
代理对象是
BaseProxy
._callmethod
(methodname[, args[, kwds] ] )-
调用并返回代理的指示方法的结果.
如果
proxy
是一个代理,其引用是obj
然后表达式proxy._callmethod(methodname, args, kwds)
将在管理器的进程中评估表达式
getattr(obj, methodname)(*args, **kwds)
.
返回的值将是调用结果的副本或新共享对象的代理 – 请参阅method_to_typeid
BaseManager.register()
.如果通过该调用引发异常,则由
_callmethod()
重新引发。如果在管理器的进程中引发了一些其他异常,则将其转换为RemoteError
异常并由_callmethod()
.请特别注意,如果methodname没有exposed.
会引发异常
_callmethod()
:>>> l = manager.list(range(10))>>> l._callmethod("__len__")10>>> l._callmethod("__getitem__", (slice(2, 7),)) # equivalent to l[2:7][2, 3, 4, 5, 6]>>> l._callmethod("__getitem__", (20,)) # equivalent to l[20]Traceback (most recent call last):...IndexError: list index out of range
_getvalue
()-
返回指示物的副本.
如果指示物是不可取消的,则会引发异常.
__repr__
()-
返回代理对象的表示.
__str__
()-
返回指示物的表示.
进程池
人们可以创建一个流程池来执行提交给它的任务Pool
class
- class
multiprocessing.pool.
Pool
( [processes [, initializer [, initargs [, maxtasksperchild [, context] ] ] ] ] ) -
一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,并具有并行映射实现.
processes是要使用的工作进程数。如果processes是
None
然后使用os.cpu_count()
返回的数字如果initializer不是
None
那么每个工作进程在启动时都会调用initializer(*initargs)
.maxtasksperchild是工作进程在退出之前可以完成的任务数,并用新的工作进程替换,以便释放未使用的资源。默认的maxtasksperchild是
None
,只要pool.context可用于指定用于启动工作进程的上下文,它就是工作进程的存在。通常使用函数
multiprocessing.Pool()
或上下文对象的Pool()
方法创建池。在这两种情况下context都是恰当的.注意池对象的方法只能由创建池的进程调用.
新版本3.2:maxtasksperchild
版本3.4中的新功能:context
注意
Pool
中的工作进程通常用于完成池的工作队列。在其他系统中发现的频繁模式(例如Apache,mod_wsgi等)来释放工作人员所拥有的资源是允许池中的工作者在退出,清理并生成新进程以替换旧工作之前仅完成一项工作。一。maxtasksperchild的Pool
论证将这种能力暴露给最终用户.apply
(func [, args [, kwds] ] )-
用func和关键字参数args调用kwds。它阻止直到结果准备好了。鉴于此块,
apply_async()
更适合并行执行工作。另外,func只在游泳池的一名工人中执行.
apply_async
(func [, args [, kwds[, callback [, error_callback] ] ] ] )-
一个变种
apply()
返回结果对象的方法如果callback如果指定它应该是一个可调用的接受单个参数。当结果准备就绪时callback应用于,即使呼叫失败,在这种情况下error_callback取而代之的是
如果指定了error_callback那么它应该是一个可调用的,它可以接受一个参数。如果目标函数失败,则使用异常实例调用error_callback。
回调应立即完成,否则处理结果的线程将被阻止.
map
(func, iterable [, chunksize])-
A平行等价的
map()
内置函数(虽然它只支持iterable参数)。它会阻塞,直到结果准备好此方法将可迭代切换为多个块,并将其作为单独的任务提交给进程池。这些块的(近似)大小可以通过设置chunksize一个正整数.
注意它可能导致很长的迭代次数的高内存使用。考虑使用
imap()
或imap_unordered()
明确chunksize提高效率的选择.
map_async
(func, iterable [, chunksize [, callback [, error_callback]]] )-
返回结果对象的
map()
方法的变种.如果指定了callback那么它应该是一个可调用的,接受单个参数。当结果准备就绪时callback应用于,即使呼叫失败,在这种情况下应用error_callback。
如果error_callback是如果指定它应该是一个可调用的whichaccepts一个参数。如果目标函数失败,则使用异常实例调用error_callback。
应该立即完成回调,否则处理结果的线程将被阻止.
imap
(func, iterable [, chunksize] )-
map()
.的一个更懒的版本chunksize参数与使用的相同通过
map()
方法。对于使用chunksize的较大值的非常长的迭代,可以使作业完成 比使用1
.的默认值更快,如果chunksize是
1
然后next()
由imap()
方法返回的迭代器的方法有一个可选的timeout参数:next(timeout)
如果在multiprocessing.TimeoutError
内无法返回结果会引发timeout秒.
imap_unordered
(func, iterable [, chunksize])-
和
imap()
一样来自迭代器的结果应该被认为是任意的。(只有当只有一个工人程序才能保证“正确”的顺序。)
starmap
(func, iterable [, chunksize] )-
喜欢
map()
,除了iterable的元素应该是作为参数解压缩的迭代.Hence iterable of
[(1,2), (3, 4)]
结果[func(1,2),func(3,4)]
.版本3.3.
starmap_async
(func, iterable [, chunksize [, callback[, error_callback] ] ] )-
starmap()
和map_async()
的组合迭代iterable迭代和调用func与iterables unpacked.Returns结果对象.新版本3.3.
close
( )-
防止将更多任务提交到池中。一旦所有任务完成,工人进程将退出.
terminate
()-
立即停止工作流程而不完成未完成的工作。当池对象被垃圾收集时
terminate()
会立即响起
join
// ()-
等待工人进程退出。在使用
close()
之前必须调用terminate()
或join()
.
版本3.3中的新内容:池对象现在支持上下文管理协议 – 请参阅 Context Manager Types .
__enter__()
返回pool对象,__exit__()
调用terminate()
.
- class
multiprocessing.pool.
AsyncResult
-
Pool.apply_async()
和Pool.map_async()
.get
返回的结果类( [timeout] )-
到达时返回结果。如果timeout不是
None
并且结果没有在timeout秒内到达那么multiprocessing.TimeoutError
被提出。如果远程调用raisean异常,那么该异常将被get()
.
wait
( [timeout] )重新调整-
等到结果可用或直到timeout秒通过
ready
()-
返回通话是否已完成.
successful
()-
返回调用是否完成而不引发异常。威利斯
AssertionError
如果结果没有准备好
以下示例演示了如何使用池:
from multiprocessing import Poolimport timedef f(x): return x*xif __name__ == "__main__": with Pool(processes=4) as pool: # start 4 worker processes result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) print(next(it)) # prints "0" print(next(it)) # prints "1" print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
监听器和客户端
通常在进程之间传递消息是使用队列或使用Connection
由Pipe()
.
返回的对象然而,multiprocessing.connection
模块允许一些额外的灵活性。它基本上提供了一个面向消息的高级API,用于处理套接字或Windows命名管道。它还支持digestauthentication使用hmac
模块,同时用于轮询多个连接.
multiprocessing.connection.
deliver_challenge
(connection, authkey )-
将随机生成的消息发送到连接的另一端并等待回复.
如果回复与使用authkey的消息的摘要匹配,则将欢迎消息发送给另一个连接结束。否则
AuthenticationError
被抬起来
multiprocessing.connection.
answer_challenge
(connection, authkey)-
接收消息,用authkey作为密钥来计算消息的摘要,然后发送摘要
//如果没有收到欢迎信息,那么
AuthenticationError
就会被提起来
multiprocessing.connection.
Client
(address [, family [, authkey] ] )-
尝试建立与使用地址address的监听器的连接,返回
Connection
.连接的类型由family参数,但这通常可以省略,因为它通常可以从address的格式推断出来。(参见地址格式)
如果给出authkey而不是None,它应该是一个字节字符串,并将用作密钥基于HMAC的身份验证挑战。如果authkey为None,则无法进行身份验证。如果验证失败,则会引发
AuthenticationError
。查看验证密钥.
- class
multiprocessing.connection.
Listener
( [address [, family [, backlog [, authkey] ] ] ] ) -
绑定套接字或Windows命名管道的包装器,它正在“监听”连接.
address是由侦听器对象的绑定套接字或命名管道使用的地址。
注意
如果使用的地址为“0.0.0.0”,则该地址将不是Windows上的可连接端点。如果你需要一个可连接的终点,你应该使用’127.0.0.1′.
family是要使用的套接字(或命名管道)的类型。这可以是字符串
"AF_INET"
(对于TCP套接字),"AF_UNIX"
(对于Unixdomain套接字)或"AF_PIPE"
(对于Windows命名管道)。其中只有第一个保证可用。如果family是None
那么家庭是从address的格式推断出来的。如果address也是None
则选择默认值。此默认值是系列,它被认为是最快的。见地址格式。注意,如果family是"AF_UNIX"
并且地址是None
那么套接字将在使用tempfile.mkstemp()
.创建的aprivate临时目录中创建如果监听器对象使用套接字然后backlog(默认情况下为1)一旦被绑定就传递给套接字的
listen()
方法如果给出authkey而不是None,它应该是一个字节字符串,并将用作基于HMAC的身份验证质询的密钥。如果authkey为None,则无法进行身份验证。如果验证失败,则会引发
AuthenticationError
。查看验证密钥.accept
()-
接受listenerobject的绑定套接字或命名管道上的连接,并返回一个
Connection
对象。如果尝试验证失败,则AuthenticationError
被引发
close
// ()-
关闭侦听器对象的绑定套接字或命名管道。当监听器被垃圾收集时,这会被自动调用。但是建议明确地称之为.
监听器对象具有以下只读属性:
address
-
监听器对象使用的地址.
last_accepted
-
上次接受连接的地址。如果这是不可用的那么它是
None
.
版本3.3中的新功能:监听器对象现在支持上下文管理协议 – 请参阅上下文管理器类型.
__enter__()
返回thelistener对象,__exit__()
叫close()
.
multiprocessing.connection.
wait
(object_list, timeout=None)-
等到object_list准备好了。返回object_list准备好了。如果timeout然后呼叫阻塞最多那么多秒。如果timeout是
None
然后它将无限期地阻塞。负超时相当于零超时.对于Unix和Windows,一个对象可以出现在object_list if if is
- 一个可读的
Connection
对象; - 一个连接且可读的
socket.socket
对象;或者 -
sentinel
Process
对象的
属性当有数据可以从中读取时,连接或套接字对象就绪,或者另一端已关闭.
Unix:
wait(object_list, timeout)
几乎相当于select.select(object_list, [], [], timeout)
。区别在于,如果select.select()
被信号中断,它会错误地OSError
,错误号为EINTR
,而wait()
则不会.Windows :object_list中的一个项必须是一个可以等待的整数句柄(根据Win32函数的文档使用的定义
WaitForMultipleObjects()
)或它可以是一个带有fileno()
方法的对象,它返回asocket句柄或管道句柄。(请注意,管道把手和套筒把手是不是等待把手。)3.3版本中的新功能.
- 一个可读的
以下服务器代码创建一个使用"secret password"
asan身份验证密钥的侦听器。然后它等待连接并将一些数据发送到客户端:
from multiprocessing.connection import Listenerfrom array import arrayaddress = ("localhost", 6000) # family is deduced to be "AF_INET"with Listener(address, authkey=b"secret password") as listener: with listener.accept() as conn: print("connection accepted from", listener.last_accepted) conn.send([2.25, None, "junk", float]) conn.send_bytes(b"hello") conn.send_bytes(array("i", [42, 1729]))
以下代码连接到服务器并从服务器接收一些数据:
from multiprocessing.connection import Clientfrom array import arrayaddress = ("localhost", 6000)with Client(address, authkey=b"secret password") as conn: print(conn.recv()) # => [2.25, None, "junk", float] print(conn.recv_bytes()) # => "hello" arr = array("i", [0, 0, 0, 0, 0]) print(conn.recv_bytes_into(arr)) # => 8 print(arr) # => array("i", [42, 1729, 0, 0, 0])
以下代码使用wait()
towait对于来自多个进程的消息:
import time, randomfrom multiprocessing import Process, Pipe, current_processfrom multiprocessing.connection import waitdef foo(w): for i in range(10): w.send((i, current_process().name)) w.close()if __name__ == "__main__": readers = [] for i in range(4): r, w = Pipe(duplex=False) readers.append(r) p = Process(target=foo, args=(w,)) p.start() # We close the writable end of the pipe now to be sure that # p is the only process which owns a handle for it. This # ensures that when p closes its handle for the writable end, # wait() will promptly report the readable end as being ready. w.close() while readers: for r in wait(readers): try: msg = r.recv() except EOFError: readers.remove(r) else: print(msg)
认证密钥
当使用Connection.recv
,收到的数据是自动取消的。不幸的是,从不受信任的来源中删除数据是一种安全风险。因此Listener
和Client()
使用hmac
模块提供摘要认证.
认证密钥是一个字节字符串,可以被认为是一个密码:一旦一个连接建立两端将要求另一方知道认证密钥。(证明bothends正在使用相同的密钥不涉及在连接时发送密钥。)
如果请求验证但没有指定验证密钥,则返回值使用current_process().authkey
(见Process
)。此值将由当前进程创建的任何Process
对象自动继承。这意味着(默认情况下)多进程程序的所有进程将共享单个身份验证密钥,可在设置它们之间的连接时使用。
使用os.urandom()
.
记录
有一些支持日志记录的可用。但请注意logging
包不使用进程共享锁,因此可以(取决于处理程序类型)来自不同进程的消息混淆.
multiprocessing.
get_logger
()-
返回
multiprocessing
。如有必要,将创建一个新的.首次创建时,记录器具有级别
logging.NOTSET
和nodefault处理程序。发送到此记录器的消息默认不会传播到根记录器.请注意,在Windows上,子进程只会继承父进程记录器的级别 – 记录器的任何其他自定义都不会被继承.
multiprocessing.
log_to_stderr
( )-
这个函数执行对
get_logger()
的调用,但除了返回get_logger创建的记录器之外,还添加了一个处理程序,使用格式sys.stderr
将outputout输出到"[%(levelname)s/%(processName)s] %(message)s"
.
下面是打开日志记录的示例会话:
>>> import multiprocessing, logging>>> logger = multiprocessing.log_to_stderr()>>> logger.setLevel(logging.INFO)>>> logger.warning("doomed")[WARNING/MainProcess] doomed>>> m = multiprocessing.Manager()[INFO/SyncManager-...] child process calling self.run()[INFO/SyncManager-...] created temp directory /.../pymp-...[INFO/SyncManager-...] manager serving at "/.../listener-...">>> del m[INFO/MainProcess] sending shutdown message to manager[INFO/SyncManager-...] manager exiting with exitcode 0
有关完整的日志记录级别表,请参阅logging
module.
multiprocessing.dummy
模块
multiprocessing.dummy
复制multiprocessing
但不仅仅是threading
module.
编程指南
使用multiprocessing
.
所有启动方法
以下适用于所有启动方法.
避免共享状态
Picklability
确保代理方法的参数是可选择的.
代理的安全性
不要使用多个线程中的代理对象,除非你用锁来保护它.
(使用same代理的不同进程从来没有问题。)
加入僵尸进程
在Unix上,当一个进程完成但尚未加入时,它就变成了一个僵尸。永远不应该有很多,因为每次都有一个新的进程开始(或active_children()
被称为)所有尚未加入的已完成的流程将被加入。同时调用完成过程的Process.is_alive
将加入这个过程。即便如此,明确加入你开始的所有过程也许是很好的实践.
更好地继承比pickle / unpickle
使用时spawn要么 forkserver从multiprocessing
需要可以选择,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问其他地方创建的共享资源的进程可以从祖先进程继承它.
避免终止进程
使用
Process.terminate
停止进程的方法可能导致进程当前正在使用的任何共享资源(例如锁,信号量,管道和队列)被破坏或不可用于其他进程.因此,最好只考虑使用带有输出缓冲的“文件类对象”的
Process.terminate
从不使用任何共享资源的进程.
加入使用队列的进程
请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目都由“馈线”线到下面的管道。(子进程可以调用队列的
Queue.cancel_join_thread
方法来避免这种行为。)这意味着每当你使用队列时,你需要确保已经放入队列的所有项目在加入进程之前,最终将删除队列。否则,您无法确定队列中的输入项的进程是否将终止。还要记住,非守护进程会自动加入.
会出现死锁的示例如下:
from multiprocessing import Process, Queuedef f(q): q.put("X" * 1000000)if __name__ == "__main__": queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()这里的修复方法是交换最后两行(或者只是删除
p.join()
行).
明确地将资源传递给子进程
在Unix上使用fork在start方法中,子进程可以使用全局资源来使用在父进程中创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数.
使代码(可能)与Windows和其他启动方法兼容,这也确保了只要子进程是仍然活着的对象将不会在父进程中被垃圾收集。如果在对象是在父进程中进行垃圾收集时释放了一些资源,这可能很重要.
例如
from multiprocessing import Process, Lockdef f(): ... do something using "lock" ...if __name__ == "__main__": lock = Lock() for i in range(10): Process(target=f).start()应该改写为
from multiprocessing import Process, Lockdef f(l): ... do something using "l" ...if __name__ == "__main__": lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
小心更换sys.stdin
带有“像对象一样的文件”
multiprocessing
最初无条件地调用:os.close(sys.stdin.fileno())在
multiprocessing.Process._bootstrap()
方法中 – 这导致了进程中的进程问题。这已经改为:sys.stdin.close()sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)这解决了进程相互冲突的根本问题,导致错误的文件描述符错误,但是引入了一个潜在的危险应用程序取代
sys.stdin()
。这种危险是如果多个进程调用close()
在这个类文件对象上,可能导致同一个数据被多次刷新到对象,导致损坏.如果您编写类似文件的对象并实现自己的缓存,则可以通过在附加到缓存时存储pid并在pid更改时丢弃缓存来使其成为fork-safe。例如:
@propertydef cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache有关更多信息,请参阅bpo-5155,bpo-5313和bpo-5331
spawn和forkserver启动方法
还有一些额外的限制哪个不适用于fork启动方法.
更多的可挑选性
全局变量
主模块的安全导入
确保主模块可以通过新的Python解释器安全地导入没有引起意外的副作用(例如启动新进程).
例如,使用spawn或forkserver启动方法运行以下模块将失败,并且
RuntimeError
:from multiprocessing import Processdef foo(): print("hello")p = Process(target=foo)p.start()而应该使用
if__name__ == "__main__":
来保护程序的“入口点”,如下所示:from multiprocessing import Process, freeze_support, set_start_methoddef foo(): print("hello")if __name__ == "__main__": freeze_support() set_start_method("spawn") p = Process(target=foo) p.start()(
freeze_support()
行可以省略如果程序运行正常而不是冻结。)这允许新生成的Python解释器安全地导入模块,然后运行模块的
foo()
功能适用类似的限制如果在mainmodule中创建了一个池或管理器
示例
演示如何创建和使用自定义管理器和代理:
from multiprocessing import freeze_supportfrom multiprocessing.managers import BaseManager, BaseProxyimport operator##class Foo: def f(self): print("you called Foo.f()") def g(self): print("you called Foo.g()") def _h(self): print("you called Foo._h()")# A simple generator functiondef baz(): for i in range(10): yield i*i# Proxy type for generator objectsclass GeneratorProxy(BaseProxy): _exposed_ = ["__next__"] def __iter__(self): return self def __next__(self): return self._callmethod("__next__")# Function to return the operator moduledef get_operator_module(): return operator##class MyManager(BaseManager): pass# register the Foo class; make `f()` and `g()` accessible via proxyMyManager.register("Foo1", Foo)# register the Foo class; make `g()` and `_h()` accessible via proxyMyManager.register("Foo2", Foo, exposed=("g", "_h"))# register the generator function baz; use `GeneratorProxy` to make proxiesMyManager.register("baz", baz, proxytype=GeneratorProxy)# register get_operator_module(); make public functions accessible via proxyMyManager.register("operator", get_operator_module)##def test(): manager = MyManager() manager.start() print("-" * 20) f1 = manager.Foo1() f1.f() f1.g() assert not hasattr(f1, "_h") assert sorted(f1._exposed_) == sorted(["f", "g"]) print("-" * 20) f2 = manager.Foo2() f2.g() f2._h() assert not hasattr(f2, "f") assert sorted(f2._exposed_) == sorted(["g", "_h"]) print("-" * 20) it = manager.baz() for i in it: print("<%d>" % i, end=" ") print() print("-" * 20) op = manager.operator() print("op.add(23, 45) =", op.add(23, 45)) print("op.pow(2, 94) =", op.pow(2, 94)) print("op._exposed_ =", op._exposed_)##if __name__ == "__main__": freeze_support() test()
使用Pool
:
import multiprocessingimport timeimport randomimport sys## Functions used by test code#def calculate(func, args): result = func(*args) return "%s says that %s%s = %s" % ( multiprocessing.current_process().name, func.__name__, args, result )def calculatestar(args): return calculate(*args)def mul(a, b): time.sleep(0.5 * random.random()) return a * bdef plus(a, b): time.sleep(0.5 * random.random()) return a + bdef f(x): return 1.0 / (x - 5.0)def pow3(x): return x ** 3def noop(x): pass## Test code#def test(): PROCESSES = 4 print("Creating pool with %d processes\n" % PROCESSES) with multiprocessing.Pool(PROCESSES) as pool: # # Tests # TASKS = [(mul, (i, 7)) for i in range(10)] + \ [(plus, (i, 8)) for i in range(10)] results = [pool.apply_async(calculate, t) for t in TASKS] imap_it = pool.imap(calculatestar, TASKS) imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) print("Ordered results using pool.apply_async():") for r in results: print("\t", r.get()) print() print("Ordered results using pool.imap():") for x in imap_it: print("\t", x) print() print("Unordered results using pool.imap_unordered():") for x in imap_unordered_it: print("\t", x) print() print("Ordered results using pool.map() --- will block till complete:") for x in pool.map(calculatestar, TASKS): print("\t", x) print() # # Test error handling # print("Testing error handling:") try: print(pool.apply(f, (5,))) except ZeroDivisionError: print("\tGot ZeroDivisionError as expected from pool.apply()") else: raise AssertionError("expected ZeroDivisionError") try: print(pool.map(f, list(range(10)))) except ZeroDivisionError: print("\tGot ZeroDivisionError as expected from pool.map()") else: raise AssertionError("expected ZeroDivisionError") try: print(list(pool.imap(f, list(range(10))))) except ZeroDivisionError: print("\tGot ZeroDivisionError as expected from list(pool.imap())") else: raise AssertionError("expected ZeroDivisionError") it = pool.imap(f, list(range(10))) for i in range(10): try: x = next(it) except ZeroDivisionError: if i == 5: pass except StopIteration: break else: if i == 5: raise AssertionError("expected ZeroDivisionError") assert i == 9 print("\tGot ZeroDivisionError as expected from IMapIterator.next()") print() # # Testing timeouts # print("Testing ApplyResult.get() with timeout:", end=" ") res = pool.apply_async(calculate, TASKS[0]) while 1: sys.stdout.flush() try: sys.stdout.write("\n\t%s" % res.get(0.02)) break except multiprocessing.TimeoutError: sys.stdout.write(".") print() print() print("Testing IMapIterator.next() with timeout:", end=" ") it = pool.imap(calculatestar, TASKS) while 1: sys.stdout.flush() try: sys.stdout.write("\n\t%s" % it.next(0.02)) except StopIteration: break except multiprocessing.TimeoutError: sys.stdout.write(".") print() print()if __name__ == "__main__": multiprocessing.freeze_support() test()
一个示例显示如何使用队列将任务提供给一组workerprocesses并收集结果:
import timeimport randomfrom multiprocessing import Process, Queue, current_process, freeze_support## Function run by worker processes#def worker(input, output): for func, args in iter(input.get, "STOP"): result = calculate(func, args) output.put(result)## Function used to calculate result#def calculate(func, args): result = func(*args) return "%s says that %s%s = %s" % \ (current_process().name, func.__name__, args, result)## Functions referenced by tasks#def mul(a, b): time.sleep(0.5*random.random()) return a * bdef plus(a, b): time.sleep(0.5*random.random()) return a + b###def test(): NUMBER_OF_PROCESSES = 4 TASKS1 = [(mul, (i, 7)) for i in range(20)] TASKS2 = [(plus, (i, 8)) for i in range(10)] # Create queues task_queue = Queue() done_queue = Queue() # Submit tasks for task in TASKS1: task_queue.put(task) # Start worker processes for i in range(NUMBER_OF_PROCESSES): Process(target=worker, args=(task_queue, done_queue)).start() # Get and print results print("Unordered results:") for i in range(len(TASKS1)): print("\t", done_queue.get()) # Add more tasks using `put()` for task in TASKS2: task_queue.put(task) # Get and print some more results for i in range(len(TASKS2)): print("\t", done_queue.get()) # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put("STOP")if __name__ == "__main__": freeze_support() test()
评论被关闭。