5.Using Multiprocessing and ProcessPoolExecutor

multiprocessingProcessPoolExecutor模块

上一章我们用threading模块演示了两个例子。这一章我们将介绍multiprocessing的用法,实现与threading类似的接口。但是,我们将用进程范式。

本章内容包括以下主题:

  • 理解进程的概念
  • 理解多进程通信
  • multiprocessing实现多请求的Fibonacci数列
  • ProcessPoolExecutor实现并行网络爬虫

理解进程的概念

在操作系统中,进程是程序执行和相关资源的容器。所有与程序执行有关的资源都由进程管理——程序的数据区域,子进程,运行状态,还有与其他进程之间的通信。

理解进程模型

进程离不开对资源和信息的控制与操作。操作系统有一个进程控制块( Process Control Block,PCB),里面存储与进行有关的信息。例如,PCB会存储下面的信息:

  • 进程ID:是唯一的整数值(无符号),表示进程在操作系统中的识别号
  • 程序计数器(Program Counter,PC):这包括下一个要被执行的程序指令的地址
  • I/O信息:与进程相关的打开文件和设备列表
  • 内存配置:这里存储了进程和页表(tables of paging)使用和存储的内存空间
  • CPU进程排序(CPU scheduling):这里存储了进程优先级和蹒跚队列(staggering queues)的信息
  • 优先级:定义CPU执行的进程优先级
  • 当前状态:表示进程状态是就绪,等待或运行
  • CPU注册表:存储栈指针和其他信息
进程状态定义

进程的生命周期有三种状态,如下所示:

  • 运行:进程正在使用CPU
  • 就绪:进程在队列中等待CPU调用
  • 等待:进行等待与任务执行相关的I/O操作完成

实现多进程通信

multiprocessing模块允许两种方式实现进程间通信,均是通过信息传递实现的。如前面章节里介绍的,信息传递范式没有同步机制缺乏,数据通过复制在进程间交换。

使用multiprocessing.Pipe

管道pipe是一种可以建立两个节点(endpoint)(两个进程)之间的通信的机制。这种创建通道(channel)的方式是为了实现进程间信息交换。

Python官方文档推荐使用管道实现两个节点的通信,是因为管道不能同时安全地与第三个节点进行通信。

们将实现一个能够创建两个进程的Python程序(生成消费模型),来演示multiprocessing.Pipe对象的用法,进程A和进程B。进程A随机发送1到10范围内的整数值给进程B,进程B把收到的整数值显示到屏幕上。现在,让我们来看看源代码。

首先我们导入一些需要使用的模块,如下所示:

In [ ]:
import os
import random
from multiprocessing import Process, Pipe

这里os模块可以让我们通过os.getpid()获取进程的PID。这个os.getpid()返回的PID可以让整个例子变得透明可见。producer_taskconsumer_task函数运行时,对应进程的PID都会显示出来。

之后的代码中,我们定义生产者producer_task函数,首先通过random.randint(1,10)生成一串随机数。函数的关键点是调用了conn.send(value),这是用主函数里的Pipe生成的连接对象,将作为函数的参数使用。producer_task函数代码如下所示:

In [ ]:
def producer_task(conn):
    value = random.randint(1, 10)
    conn.send(value)
    print('Value [%d] sent by PID [%d]' % (value, os.getpid()))
    conn.close()

Pipe对象使用send方法发生数据之后,不要忘了用close()方法。在连接的通道不再使用之后释放资源是非常重要的事情。

消费者进程要做的事情很简单,就是把收到的信息打印到屏幕上,并显示进程的PID信息。为了从通信的通道中获取整数值,需要调用conn.recv()consumer_task函数代码如下所示:

In [ ]:
def consumer_task(conn):
    print('Value [%d] received by PID [%d]' % (conn.recv(), os.getpid()))

代码的最后一部分就是调用Pipe()对象创建两个进程表示生产者和消费者进程。之后,在用把producer_taskconsumer_task函数分别发送到两个进程中。代码如下所示:

In [ ]:
if __name__ == '__main__':
    producer_conn, consumer_conn = Pipe()
    consumer = Process(target=consumer_task, args=(consumer_conn,))
    producer = Process(target=producer_task, args=(producer_conn,))

    consumer.start()
    producer.start()

    consumer.join()
    producer.join()

进程定义之后,再使用start()方法初始化执行,然后用join()方法使得主函数进程等待生产者和消费者进程执行完毕后再结束。

运行multiprocessing_pipe.py程序,结果如下图所示: 源代码

理解multiprocessing.Queue

上一节我们分析了管道的概念,通过创建通道来建立进程间的通信。现在,我们将分析如何高效地实现这种通信,这就要用到multiprocessing中的Queue对象。multiprocessing.Queue的接口与queue.Queue非常相似。但是,multiprocessing.Queue的内部实现机制不一样,有一个内部线程叫供给线程(feeder thread),通过把队列的数据缓存传输数据到目标进程相关的管道中。PipeQueue对象机制都是有消息传递范式,用户不用考虑同步机制。

虽然使用multiprocessing.Queue不用考虑同步机制,比如Locks互斥锁,但是这些机制在进程内部的缓存和管道之间进行通信的时候还是会用到。

multiprocessing实现多请求的Fibonacci数列

让我们用进程来替代线程实现多请求的Fibonacci数列。

multiprocessing_fibonacci.py代码用了multiprocessing模块,首先我们导入必要的模块,代码如下所示:

In [ ]:
import sys
import logging
import time
import os
import random
from multiprocessing import Process, Queue, Pool, cpu_count, current_process, Manager

这里导入的模块很多在上一章都用过,但是有几个模块需要说明一下:

  • cpu_count:这个函数获取电脑的CPU的数量
  • current_process:这个函数可以获取当前进程的信息,比如进程名称
  • Manager:这个对象提供代理的形式在不同的进程之间共享Python对象。

后面的代码,我们会看到第一个函数有点不一样:它是随机从1到20的整数中随机抽取15个数。这些值将作为键插入到fibo_dict,一个Manager对象生成的字典。

虽然消息传递的方式更常用。但是,有时候,我们需要在不同进程之间共享一些数据,就像我们在fibo_dict字典里看到的一样。

producer_task函数代码如下所示:

In [ ]:
def producer_task(q, fibo_dict):
    for i in range(15):
        value = random.randint(1, 20)
        fibo_dict[value] = None
        logger.info("Producer [%s] putting value [%d] into queue.. "
                    % (current_process().name, value))
        q.put(value)

然后我们定义一个函数为fibo_dict中的每个键计算Fibonacci数列。和上一章线程函数最明显的不同就是这里把fibo_dict作为函数的参数使用。

consumer_task函数代码如下所示:

In [ ]:
def consumer_task(q, fibo_dict):
    while not q.empty():
        value = q.get(True, 0.05)
        a, b = 0, 1
        for item in range(value):
            a, b = b, a + b
            fibo_dict[value] = a
        logger.info("consumer [%s] getting value [%d] from queue..."
                    % (current_process().name, value))

继续看后面的代码,我们进入了主函数部分。这里定义了几个变量:

  • data_queue:根据Python的标准可以安全处理进程的multiprocessing.Queue
  • number_of_cpus:就是前面用过的multiprocessing.cpu_count函数
  • fibo_dictManager对象生成的字典,进程的最终结果会插入到里面
In [ ]:
if __name__ == '__main__':
    data_queue = Queue()
    number_of_cpus = cpu_count()
    manager = Manager()
    fibo_dict = manager.dict()

然后,我们新建一个producer对象,用producer_task函数处理data_queue中的随机数,代码如下所示:

In [ ]:
    producer = Process(target=producer_task, args=(data_queue, fibo_dict))
    producer.start()
    producer.join()

我们会看到Process类初始化的签名与threading模块的Thread类的签名一样。它会接受由worker进程并行地执行目标函数,和函数对应的参数。然后,我们开启进程并调用join()函数,让主进程在producer进程执行完毕后再结束。

后面的代码是定义一个consumer_list,里面保存一个已经初始化的消费者进程列表。创建这个列表的目的是为了在所有的worker进程启动后对这列进程调用join()函数。如果直接在循环体中对每个项目调用join()函数,那么可能只有第一个worker会执行任务,因为下一个迭代会被阻塞等待这个worker进程运行完毕,最终可能下一个worker进程永远不会执行。代码如下所示:

In [ ]:
    consumer_list = []
    for i in range(number_of_cpus):
        consumer = Process(target=consumer_task, args=(data_queue, fibo_dict))
        consumer.start()
        consumer_list.append(consumer)

    [consumer.join() for consumer in consumer_list]

    logger.info(fibo_dict)

运行代码,fibo_dict的最终结果如下图所示: 源代码

ProcessPoolExecutor实现并行网络爬虫

就像concurrent.futures模块提供了ThreadPoolExecutor,也用创建和控制多进程的手段,那就是ProcessPoolExecutor类。ProcessPoolExecutor类,也是在concurrent.futures里面,将被用来实现我们并行网络爬虫。示例代码在process_pool_executor_web_crawler.py里面。

这个代码导入的模块前面的例子类似,比如requestsManager模块。与上一章多线程爬虫的例子些许不同是,我们用函数参数的形式发送任务数据;具体函数签名如下所示:

group_urls_task函数定义如下所示:

In [ ]:
def group_urls_task(urls, result_dict, html_link_regex)

crawl_task函数定义如下所示:

In [ ]:
def crawl_task(url, html_link_regex)

然我们再看一点儿有差异的代码。在主函数里面,我们定义了一个Manager类型,这样就可以共享队列,而不只是字段里的进程处理结果了。我们使用Manager.Queue对象来定义一个队列urls,存放需要被抓取的URL链接。result_dict字典,我们将用Manager.dict()对象,目的是要通过代理来管理字典。

In [ ]:
if __name__ == '__main__':
    manager = Manager()
    urls = manager.Queue()
    urls.put('http://www.sina.com')
    urls.put('http://cn.bing.com/')
    urls.put('https://coding.net/')
    urls.put('http://github.com/')
    urls.put('http://mail.126.com/')
    result_dict = manager.dict()

然后,我们定义一个正则表达式来过滤页面中的链接,获取电脑CPU的数量,代码如下所示:

In [ ]:
    html_link_regex = \
        re.compile('<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')

    number_of_cpus = cpu_count()

在最后一部分,我们会看到连续使用concurrent.futures模块的API。和我们在上一章里使用的ThreadPoolExecutor完全一样。但是,ProcessPoolExecutor通过内部机制的改变避开了GIL对CPU的限制,而且也不需要对代码做过多的调整。两个ProcessPoolExecutor的worker进程数量都等于电脑CPU的数量。第一个ProcessPoolExecutor把URL任务并行插入字典result_dict,并把对应的字典值设置为None。代码如下所示:

In [ ]:
    with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_cpus) as group_link_processes:
        for i in range(urls.qsize()):
            group_link_processes.submit(group_urls_task, urls, result_dict, html_link_regex)

第二个ProcessPoolExecutor执行网页链接抓取。代码如下所示:

In [ ]:
    with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_cpus) as crawler_link_processes:
        future_tasks = {crawler_link_processes.submit(crawl_task, url, html_link_regex): url for url in result_dict.keys()}
        for future in concurrent.futures.as_completed(future_tasks):
            result_dict[future.result()[0]] = future.result()[1]

concurrent.futures实现多线程范式到多进程会更简单。

运行代码,结果如下图所示: 源代码

本章小结

在这一章,我们介绍了进程的概念,并用多进程方法实现多请求的Fibonacci数列和并行网络爬虫。

下一章,我们将用著名的parallel python模块研究多进程问题,这个模块不是Python自带的模块。我们将介绍进程间通信的概念,以及如果用管道实现进程间通信。