python

超轻量级php框架startmvc

Python多进程multiprocessing用法实例分析

更新时间:2020-05-05 21:06:01 作者:startmvc
本文实例讲述了Python多进程multiprocessing用法。分享给大家供大家参考,具体如下:mutilprocess

本文实例讲述了Python多进程multiprocessing用法。分享给大家供大家参考,具体如下:

mutilprocess简介

像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。

简单的创建进程:


import multiprocessing
def worker(num):
 """thread worker function"""
 print 'Worker:', num
 return
if __name__ == '__main__':
 jobs = []
 for i in range(5):
 p = multiprocessing.Process(target=worker, args=(i,))
 jobs.append(p)
 p.start()

确定当前的进程,即是给进程命名,方便标识区分,跟踪


import multiprocessing
import time
def worker():
 name = multiprocessing.current_process().name
 print name, 'Starting'
 time.sleep(2)
 print name, 'Exiting'
def my_service():
 name = multiprocessing.current_process().name
 print name, 'Starting'
 time.sleep(3)
 print name, 'Exiting'
if __name__ == '__main__':
 service = multiprocessing.Process(name='my_service',
 target=my_service)
 worker_1 = multiprocessing.Process(name='worker 1',
 target=worker)
 worker_2 = multiprocessing.Process(target=worker) # default name
 worker_1.start()
 worker_2.start()
 service.start()

守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了

守护进程:


import multiprocessing
import time
import sys
def daemon():
 name = multiprocessing.current_process().name
 print 'Starting:', name
 time.sleep(2)
 print 'Exiting :', name
def non_daemon():
 name = multiprocessing.current_process().name
 print 'Starting:', name
 print 'Exiting :', name
if __name__ == '__main__':
 d = multiprocessing.Process(name='daemon',
 target=daemon)
 d.daemon = True
 n = multiprocessing.Process(name='non-daemon',
 target=non_daemon)
 n.daemon = False
 d.start()
 n.start()
 d.join(1)
 print 'd.is_alive()', d.is_alive()
 n.join()

最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态

终止进程:


import multiprocessing
import time
def slow_worker():
 print 'Starting worker'
 time.sleep(0.1)
 print 'Finished worker'
if __name__ == '__main__':
 p = multiprocessing.Process(target=slow_worker)
 print 'BEFORE:', p, p.is_alive()
 p.start()
 print 'DURING:', p, p.is_alive()
 p.terminate()
 print 'TERMINATED:', p, p.is_alive()
 p.join()
 print 'JOINED:', p, p.is_alive()

①. == 0 未生成任何错误  ②. 0 进程有一个错误,并以该错误码退出 ③. < 0 进程由一个-1 * exitcode信号结束

进程的退出状态:


import multiprocessing
import sys
import time
def exit_error():
 sys.exit(1)
def exit_ok():
 return
def return_value():
 return 1
def raises():
 raise RuntimeError('There was an error!')
def terminated():
 time.sleep(3)
if __name__ == '__main__':
 jobs = []
 for f in [exit_error, exit_ok, return_value, raises, terminated]:
 print 'Starting process for', f.func_name
 j = multiprocessing.Process(target=f, name=f.func_name)
 jobs.append(j)
 j.start()
 jobs[-1].terminate()
 for j in jobs:
 j.join()
 print '%15s.exitcode = %s' % (j.name, j.exitcode)

方便的调试,可以用logging

日志:


import multiprocessing
import logging
import sys
def worker():
 print 'Doing some work'
 sys.stdout.flush()
if __name__ == '__main__':
 multiprocessing.log_to_stderr()
 logger = multiprocessing.get_logger()
 logger.setLevel(logging.INFO)
 p = multiprocessing.Process(target=worker)
 p.start()
 p.join()

利用class来创建进程,定制子类

派生进程:


import multiprocessing
class Worker(multiprocessing.Process):
 def run(self):
 print 'In %s' % self.name
 return
if __name__ == '__main__':
 jobs = []
 for i in range(5):
 p = Worker()
 jobs.append(p)
 p.start()
 for j in jobs:
 j.join()

python进程间传递消息:


import multiprocessing
class MyFancyClass(object):
 def __init__(self, name):
 self.name = name
 def do_something(self):
 proc_name = multiprocessing.current_process().name
 print 'Doing something fancy in %s for %s!' % \
 (proc_name, self.name)
def worker(q):
 obj = q.get()
 obj.do_something()
if __name__ == '__main__':
 queue = multiprocessing.Queue()
 p = multiprocessing.Process(target=worker, args=(queue,))
 p.start()
 queue.put(MyFancyClass('Fancy Dan'))
 # Wait for the worker to finish
 queue.close()
 queue.join_thread()
 p.join()
import multiprocessing
import time
class Consumer(multiprocessing.Process):
 def __init__(self, task_queue, result_queue):
 multiprocessing.Process.__init__(self)
 self.task_queue = task_queue
 self.result_queue = result_queue
 def run(self):
 proc_name = self.name
 while True:
 next_task = self.task_queue.get()
 if next_task is None:
 # Poison pill means shutdown
 print '%s: Exiting' % proc_name
 self.task_queue.task_done()
 break
 print '%s: %s' % (proc_name, next_task)
 answer = next_task()
 self.task_queue.task_done()
 self.result_queue.put(answer)
 return
class Task(object):
 def __init__(self, a, b):
 self.a = a
 self.b = b
 def __call__(self):
 time.sleep(0.1) # pretend to take some time to do the work
 return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
 def __str__(self):
 return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
 # Establish communication queues
 tasks = multiprocessing.JoinableQueue()
 results = multiprocessing.Queue()
 # Start consumers
 num_consumers = multiprocessing.cpu_count() * 2
 print 'Creating %d consumers' % num_consumers
 consumers = [ Consumer(tasks, results)
 for i in xrange(num_consumers) ]
 for w in consumers:
 w.start()
 # Enqueue jobs
 num_jobs = 10
 for i in xrange(num_jobs):
 tasks.put(Task(i, i))
 # Add a poison pill for each consumer
 for i in xrange(num_consumers):
 tasks.put(None)
 # Wait for all of the tasks to finish
 tasks.join()
 # Start printing results
 while num_jobs:
 result = results.get()
 print 'Result:', result
 num_jobs -= 1

Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

进程间信号传递:


import multiprocessing
import time
def wait_for_event(e):
 """Wait for the event to be set before doing anything"""
 print 'wait_for_event: starting'
 e.wait()
 print 'wait_for_event: e.is_set()->', e.is_set()
def wait_for_event_timeout(e, t):
 """Wait t seconds and then timeout"""
 print 'wait_for_event_timeout: starting'
 e.wait(t)
 print 'wait_for_event_timeout: e.is_set()->', e.is_set()
if __name__ == '__main__':
 e = multiprocessing.Event()
 w1 = multiprocessing.Process(name='block', 
 target=wait_for_event,
 args=(e,))
 w1.start()
 w2 = multiprocessing.Process(name='nonblock', 
 target=wait_for_event_timeout, 
 args=(e, 2))
 w2.start()
 print 'main: waiting before calling Event.set()'
 time.sleep(3)
 e.set()
 print 'main: event is set'

Python多进程,一般的情况是Queue来传递。

Queue:


from multiprocessing import Process, Queue
def f(q):
 q.put([42, None, 'hello'])
if __name__ == '__main__':
 q = Queue()
 p = Process(target=f, args=(q,))
 p.start()
 print q.get() # prints "[42, None, 'hello']"
 p.join()

多线程优先队列Queue:


import Queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
 def __init__(self, threadID, name, q):
 threading.Thread.__init__(self)
 self.threadID = threadID
 self.name = name
 self.q = q
 def run(self):
 print "Starting " + self.name
 process_data(self.name, self.q)
 print "Exiting " + self.name
def process_data(threadName, q):
 while not exitFlag:
 queueLock.acquire()
 if not workQueue.empty():
 data = q.get()
 queueLock.release()
 print "%s processing %s" % (threadName, data)
 else:
 queueLock.release()
 time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1
# Create new threads
for tName in threadList:
 thread = myThread(threadID, tName, workQueue)
 thread.start()
 threads.append(thread)
 threadID += 1
# Fill the queue
queueLock.acquire()
for word in nameList:
 workQueue.put(word)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
 pass
# Notify threads it's time to exit
exitFlag = 1
# Wait for all threads to complete
for t in threads:
 t.join()
print "Exiting Main Thread"

多进程使用Queue通信的例子


import time
from multiprocessing import Process,Queue
MSG_QUEUE = Queue(5)
def startA(msgQueue):
 while True:
 if msgQueue.empty() > 0:
 print ('queue is empty %d' % (msgQueue.qsize()))
 else:
 msg = msgQueue.get()
 print( 'get msg %s' % (msg,))
 time.sleep(1)
def startB(msgQueue):
 while True:
 msgQueue.put('hello world')
 print( 'put hello world queue size is %d' % (msgQueue.qsize(),))
 time.sleep(3)
if __name__ == '__main__':
 processA = Process(target=startA,args=(MSG_QUEUE,))
 processB = Process(target=startB,args=(MSG_QUEUE,))
 processA.start()
 print( 'processA start..')

主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。

Python 多进程 multiprocessing