Python Event用于线程和进程模块之间的同步。threading模块提供了Event事件类,multiprocessing模块也提供了Event事件类。这两个模块的Event类的原理都是类似的:包含一个全局的flag标识属性,默认值为False,提供set(),clear()和wait()方法供用户使用。set()方法将flag设置为True,clear()方法将flag设置为False,wait()方法用于同步(如果flag标识为False,则线程/进程挂起;反之,则执行wait下一条语句)。
threading模块的Event示例代码如下:
import os, sysimport threadingimport timeevent = threading.Event()def my_thread(): print('[%s][%s] begin' % (time.localtime(), threading.currentThread().getName())) event.wait() print('[%s][%s] end' % (time.localtime(), threading.currentThread().getName()))if __name__ == '__main__': t1 = threading.Thread(target=my_thread) t2 = threading.Thread(target=my_thread) t1.setDaemon(True) t2.setDaemon(True) t1.start() t2.start() time.sleep(5) event.set() t1.join() t2.join() print('[%s][%s] end' % (time.localtime(), threading.currentThread().getName()))
multiprocessing模块的Event示例代码如下:
import os, sysimport multiprocessingimport timeevent = multiprocessing.Event()def my_task(): print('[%s][%s] begin' % (time.localtime(), multiprocessing.current_process().name)) event.wait() print('[%s][%s] end' % (time.localtime(), multiprocessing.current_process().name))if __name__ == '__main__': t1 = multiprocessing.Process(target=my_task) t2 = multiprocessing.Process(target=my_task) t1.start() t2.start() time.sleep(5) event.set() t1.join() t2.join() print('[%s][%s] end' % (time.localtime(), multiprocessing.current_process().name))
运行结果如下:
[time.struct_time(tm_year=2017, tm_mon=12, tm_mday=8, tm_hour=10, tm_min=31, tm_sec=30, tm_wday=4, tm_yday=342, tm_isdst=0)][Process-1] begin
[time.struct_time(tm_year=2017, tm_mon=12, tm_mday=8, tm_hour=10, tm_min=31, tm_sec=30, tm_wday=4, tm_yday=342, tm_isdst=0)][Process-2] begin [time.struct_time(tm_year=2017, tm_mon=12, tm_mday=8, tm_hour=10, tm_min=31, tm_sec=35, tm_wday=4, tm_yday=342, tm_isdst=0)][Process-2] end[time.struct_time(tm_year=2017, tm_mon=12, tm_mday=8, tm_hour=10, tm_min=31, tm_sec=35, tm_wday=4, tm_yday=342, tm_isdst=0)][Process-1] end[time.struct_time(tm_year=2017, tm_mon=12, tm_mday=8, tm_hour=10, tm_min=31, tm_sec=35, tm_wday=4, tm_yday=342, tm_isdst=0)][MainProcess] end
我们再来看一个代码示例:
import os, sysimport multiprocessingimport timeevent = multiprocessing.Event()def my_task(): event.wait() print('[%s]: after wait(_flag=%s)' % (multiprocessing.current_process().name, event._flag)) event.clear() print('[%s]: after clear(_flag=%s)' % (multiprocessing.current_process().name, event._flag))if __name__ == '__main__': t1 = multiprocessing.Process(target=my_task) t2 = multiprocessing.Process(target=my_task) t1.start() t2.start() time.sleep(5) print('[%s]: before set(_flag=%s)' % (multiprocessing.current_process().name, event._flag)) event.set() event.wait() t1.join() t2.join() print('[%s]: after wait(_flag=%s)' % (multiprocessing.current_process().name, event._flag))
运行结果:
[MainProcess]: before set(_flag=<Semaphore(value=0)>)
[Process-2]: after wait(_flag=<Semaphore(value=1)>)[Process-1]: after wait(_flag=<Semaphore(value=1)>)
[Process-2]: after clear(_flag=<Semaphore(value=0)>)
[Process-1]: after clear(_flag=<Semaphore(value=0)>) [MainProcess]: after wait(_flag=<Semaphore(value=0)>)看到这里,我想大家一定觉得很奇怪。根据操作系统原理,全局变量在进程之间是不可共享的(在linux下,可以通过共享内存,systemV等IPC额外机制实现进程间通信),为何在上述代码中,全局的Event对象可以在主进程和子进程间共享?通过查看Event实现源码,发现Event对象内部是通过semphore信号量来实现的。熟悉OS的应该知道semphore是跨进程通信的同步机制,最终是由kernel实现跨进程的原子性:
class Event(object): def __init__(self): self._cond = Condition(Lock()) self._flag = Semaphore(0) ...
总结以下:threading和multiprocessing都提供了Event类来实现多线程/多进程之间事件同步。Event类通过semphore实现多进程间原子操作,因此可以保证全局的Event在多进程间共享。