6.Utilizing Parallel Python

用Parallel Python模块

上一章我们用multiprocessingProcessPoolExecutor模块演示了两个例子。这一章我们将介绍命名队列(named pipe)的用法,以及如何用 Parallel Python (PP)模块的进程解决问题。

本章内容包括以下主题:

  • 理解进程间通信概念
  • 介绍PP模块
  • 用PP在SMP架上计算Fibonacci数列
  • 用PP实现并行网络爬虫

理解进程间通信

进程间通信(Interprocess communication,IPC)实现了进程之间的信息交换机制。

IPC的实现方式有一些,通常它们都需要根据系统运行时环境选择架构。例如,有时所有进程都运行在同一个机器上,我们可以使用不同的通信方式,比如共享内存,消息队列和管道。如果进程运行在分布式集群环境中,我们可以用套接字和远程过程调用(Remote Procedure Call,RPC)。

第五章,用multiprocessing和ProcessPoolExecutor模块里面,我们用普通管道实现进程通信。我们还介绍了有共同父进程的进程间通信。但是,有时候无关进程(非相同父进程)之间的通信也有需求。那么,有没有一种方法可以像之前那样利用进程的地址空间实现彼此间的通信呢?然而,进程是不可能直接连接无关进程的地址空间的。因此,我们必须引入一个命名管道(named pipe)的机制。

命名管道简介

在像Linux这样的POSIX系统中,我们知道几乎任何内容都可以当作文件。我们要处理每个任务都可以看成是一个文件,我们还可以用一个文件描述器(file descriptor)来操作文件。

文件描述器是一种允许用户对文件读/写操作进行编程的机制。通常一个文件对应唯一的文件描述器。具体请查看文件描述器文档

用Python的命名管道

在Python用命名管道非常简单,我们将用两个示例实现命名管道的单向通信方式。第一个程序是write_to_named_pipe.py,其功能是在管道里写入22字节的消息,内容是一个进程PID的字符串。第二个程序是read_from_named_pipe.py,可以完成信息读取并显示信息的内容和进程的PID。

运行read_from_named_pipe.py之后,命令行会显示如下结果:

I pid [<The PID of reader process>] received a message => Hello from pid [the PID of writer process].
写入命名管道

在Python里面,命名管道通过系统调用实现。我们将在下面对的write_to_named_pipe.py代码进行逐行分析。

首先我们导入系统调用的模块:

In [ ]:
import os

我们在主函数里创建了一个命名管道和一个特殊文件,FIFO,用来存储信息。在第一行代码我们把命名管道名称设置为:

In [ ]:
named_pipe = "my_pipe"

然后,我们验证命名管道是否已经存在。如果不存在我们就用os.mkfifo来创建一个:

In [ ]:
if not os.path.exists(named_pipe):
    os.mkfifo(named_pipe)

这里os.mkfifo实现了一个具有FIFO功能的特殊文件,用来向命名管道读写信息。

现在,我们在调用write_message函数传递named_pipe参数和Hello from pid [%d]信息。这个函数将信息写到文件里,这个文件将作为参数被命名管道接收。write_message函数定义如下:

In [ ]:
def write_message(input_pipe, message):
    fd = os.open(input_pipe, os.O_WRONLY)
    os.write(fd, (message % str(os.getpid())))
    os.close(fd)

我们观察函数的第一行会看到,这里使用了os.open系统命令,当操作成功后,会返回一个文件描述器,允许我们对FIFO文件里的数据进行读写。我们还可以通过标记flag对FIFO文件的编辑模式进行控制。如下所示:

In [ ]:
fd = os.open(input_pipe, os.O_WRONLY)

命名管道成功打开之后,就可以向里面写信息了,我们把进程的PID作为信息写进去:

In [ ]:
os.write(fd, (message % os.getpid()))

最后记得用os.close()把通信通道关闭。这样使用的计算机资源就释放了:

In [ ]:
os.close(fd)
读取命名管道

我们用程序read_from_named_pipe.py实现命名管道的信息读取,同样适用os模块进行操作。在主函数里触发进程,过程很简单。首先定义一个命名管道的名称,如下所示:

In [ ]:
named_pipe = "my_pipe"

然后,我们调用read_message函数,会读取write_to_named_pipe.py里写入命名管道的信息。代码如下所示:

In [ ]:
def read_message(input_type):
    fd = os.open(input_pipe, os_RONLY)
    message = ("I pid [%d] received a message => %s" % (os.getpid(), os.read(fd, 22))
    os.close(fd)
    return message

os.open和前面用法相同。这里的新用法是os.read,按指定字节读取信息。本例中使用的是22个字节。信息读取之后,函数就会返回信息。最后记得用os.close关闭信道,释放资源。

文件描述器是否可以打开是需要检验的。开发者可以根据自己需求,对文件描述器和命名管道的相关异常进行处理。

最后,我们可以看到两个程序的输入结果,如下图所示:

探索Parallel Python库

前面的例子直接利用系统调用用一种底层机制实现了进程间通信。这在Linux和Unix环境下处理进程间通信的必然手段。现在,我们将用一个Python模块PP来建立IPC,不仅是同一个机器上的进程,还包括分布式计算网络中的不同机器IPC。

PP模块文档不太丰富,可以在其官网的FAQ里查看信息。API中介绍了许多关于此模块具体用法。

用PP最大的优势是模块提供了高层的抽象。主要特性如下所示:

  • 自动发现进程数量实现负载均衡
  • 在运行阶段可以分配处理器
  • 运行阶段可以负载均衡
  • 可以通过网络自动寻找资源

PP模块通过两种方式实现了并行。第一种方式是在一个机器上有多CPU或多核心时,利用SMP架构。第二种方式是通过网络把任务分配到各个机器中,形成云计算模式。这两种情形,进程间的信息交换通过调用高度抽象函数实现,这样我们就不用担心管道和套接字的底层细节了。通过参数和函数就可以简单地实现交换信息,具体在下面的示例中介绍。

在PP模块里有一个Server类,我们可以用它来封装和发放本地与远程进程间的任务。在初始化时(__init__)有一些重要的参数需要注意,如下所示:

  • ncpus:这个参数允许我们设置worker进程的数量。如果这个值没设置,默认就会查看本机CPU/核心数量,然后创建对应数量的worker进程执行任务。
  • ppservers:这个参数是一个包含机器名称或IP地址的元组,并行Python执行服务器(Parallel Python Execution Servers,PPES)。PPES由网络中具有ppservers.py功能的机器构成,运行并等待任务执行。相关的参数信息请见文档

Server类的实例有一些方法,submit方法可以向目标机器发放任务。submit函数签名如下所示:

submit(self, func, args=(), depfuncs=(), modules=(),
    callback=None, callbackargs=(), group='default',
        globals=None)

submit方法主要参数介绍如下:

  • func:本机CPU或远程服务器要执行的函数
  • argsfunc函数的参数
  • modules:函数执行需要导入的远程代码或func函数执行需要导入的进程。例如,如果任务分配函数用了time模块,那么参数就要设置为modules=('time', )
  • callback:这是我们后面要用的回调函数。当func参数的函数获取进程结果时,回调函数就是对结果进行处理。

其他参数将在后面的内容里进一步介绍。

用PP在SMP架构实现多输入的Fibonacci数列

现在让我们开始动手吧!让我们用PP模块在SMP上架构实现多输入的Fibonacci数列。我将用一个双核四线程的笔记本来运行程序。

这里需要导入的模块只有两个osppos仅用来获取进程的PID。定义一个input_list模拟多个输入,一个result_dict字段存放最终结果。代码如下所示:

In [ ]:
import os
import pp

input_list = [4, 3, 8, 6, 10]
result_dict = {}

之后,我们定义一个函数fibo_task来并行执行进程。它将作为Server类里submit方法的func参数。这个函数和上一章的版本没太多变化,唯一不同的时返回值现在是一个元组,封装了两个元素,一个是输入参数,另一个是包含进程PID和Fibonacci计算值的字符串。函数定义如下:

In [ ]:
def fibo_task(value):
    a, b = 0, 1
    for item in range(value):
        a, b = b, a + b
    message = "the fibonacci calculated by pid %d was %d" \
        % (os.getpid(), a)
    return (value, message)

下一步是定义callback回调函数,我们定义成aggregate_results。这个回调函数会在fibo_task完成任务时执行。其实现非常简单,就是显示进程运行的状态信息,把fibo_task运行的结果作为输入对应的值写入字典result_dict里。代码如下所示:

In [ ]:
def aggregate_results(result):
    print "Computing results with PID [%d]" % os.getpid()
    result_dict[result[0]] = result[1]

现在,我们定义了好两个函数,就创建一个Server类的实例来分配任务。

In [ ]:
job_server = pp.Server()

在之前的例子中,我们用的都是数值参数。下面我们将用另一种参数。

有了Server实例,我们就对input_list进行迭代,然后通过submit分配fibo_task任务,把input_list的输入值传入args的元组中,modules参数设置为需要导入的os模块,callback参数设置为aggregate_results。代码如下所示:

In [ ]:
for item in input_list:
    job_server.submit(
        fibo_task, (item,), modules=('os',), callback=aggregate_results)

最后,我们需要等待所有被分配的任务运行完毕。调用wait方法即可:

In [ ]:
job_server.wait()

还有一种方式,不需要用callback函数也可以获取执行函数。submit方法返回一个pp._Task对象类型,里面包含了进程运行完成后的结果。

通过打印result_dict字典显示结果:

In [ ]:
print "Main process PID [%d]" % os.getpid()
for key, value in result_dict.items():
    print "For input %d, %s" % (key, value)

最终结果如下图所示:

源代码

用PP实现分布式网络爬虫

用PP实现了本地进程的任务分配之后,我们再来看看分布式并行的方法。下面我们用三个机器来执行:

  • Iceman-Thinkad-X220: Ubuntu 13.10
  • Iceman-Q47OC-500P4C: Ubuntu 12.04 LTS
  • Asgard-desktop: Elementary OS

我们将用PP把任务分配到三台电脑上运行。这里还用之前的网络爬虫来演示。代码在web_crawler_pp_cluster.py文件中,我们把待处理的URL放在input_list里面,然后分配一个本地或远程的进程执行任务,最后用callback回调函数把每个URL的抓取的前三个链接保存起来。

下面我们一步步分析解决问题的过程。首先,我们导入必要的模块,定义几个数据结构。和上一节类似,我们新建一个input_list列表存放URL,一个字典result_dict存放最终抓取结果。

In [ ]:
import os
import re
import requests
import pp

url_list = ['http://www.google.com/', 'http://gizmodo.uol.com.br/',
            'https://github.com/', 'http://br.search.yahoo.com/',
            'http://www.python.org/', 'http://www.python.org/psf/']

result_dict = {}

现在,我们定义回调函数aggregate_results,只要把上一节的显示Fibonacci计算结果的回调函数稍作修改就可以了。我们只改变了字典保存信息的组织结构,里面包含进程PID,进程所在电脑的名称,以及抓取的前三个链接。代码如下所示:

In [ ]:
def aggregate_results(result):
    print "Computing results in main process PID [%d]" % os.getpid()
    message = "PID %d in hostname [%s] the following links were found: %s"\
        % (result[2], result[3], result[1])
    result_dict[result[0]] = message

紧接着,我们定义crawl_task函数,后面分配到Server类的实例中。和上一节的任务函数类似,其目的就是为了从URL对应的页面中抓取所有链接的前三个。唯一不同的是返回元组的结构,如下所示:

In [ ]:
def crawl_task(url):
    html_link_regex = \
        re.compile('<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')

    request_data = requests.get(url)
    # limit to the first 03 links
    links = html_link_regex.findall(request_data.text)[:3]
    return (url, links, os.getpid(), os.uname()[1])

在任务函数和回调函数写完之后,我们就应该用Server实例向网络中的电脑分配任务了。我们将在Server类初始化阶段定义一些参数。首先就是网络中准备运行任务的电脑IP地址。在我们的例子中,本机之外的两台电脑IP地址用元组封装成一个ppservers变量:

In [ ]:
ppservers = ("192.168.25.21", "192.168.25.9")

如果你不想用具体的IP地址,或者电脑太多写得麻烦,你可以在ppservers元组中使用*通配符。

定义了ppservers元组之后,我们创建Server实例:

In [ ]:
job_dispatcher = pp.Server(ncpus=1, ppservers=ppservers, socket_timeout=60000)

这里和上一节的设置有些差异。首先,我们把ncpus参数设置成1。这样PP模块在本机上分配任务只用一个进程,其他任务都分配给网络中的另外两台电脑。第二个参数ppservers是之前创建的IP地址元组。最后一个参数socket_timeout是进程运行等待时限(按秒计算),这里设置成60000,是为了演示过程中不会因为长时间未完成而关闭通道。

Server实例创建之后,我们来分配任务。用一个循环遍历每个URL,通过Server实例的submit方法把URL分配给每个机器:

In [ ]:
for url in url_list:
    job_dispatcher.submit(crawl_task, (url,),
                          modules=('os', 're', 'requests',),
                          callback=aggregate_results)

这里和前面Fibonacci数列的submit方法最大的不同,就是进程执行需要导入的模块。

你可能会问为什么PP模块不需要放在modules参数里。其实,PP运行环境已经默认帮我们导入了pp。毕竟,远程节点还是需要的。

分配完任务,我们就用wait方法等待任务完成。这里使用了Server类的一个方法print_stats,会显示一些有趣的统计信息。代码如下所示:

In [ ]:
job_dispatcher.wait()

for key, value in result_dict.items():
    print "** For url %s, %s\n" % (key, value)

print job_dispatcher.print_stats()

运行程序之前,我们还需要在远程机器上配置ppserver.py功能,执行ppserver.py -a -d命令即可,其中-a表示自动发现选项,允许服务器发现那些IP地址没有在ppservers元组中设置的客户端机器。-d参数是调试模式,可以显示服务器运行过程中的所有日志文件。

下面让我们看看运行的结果:

  • 首先,主节点创建和分配任务,运行结果如下所示。这里会看到里面有一些有趣的统计信息,比如分配到每个节点上的任务数量,完成任务使用的总时间,每个任务使用的平均时间,以及对应节点的IP地址和等待的时限。还有一个有意思的地方是,回调函数只在主节点进程上运行。因此,需要注意的是,不要把回调函数做得太复杂,否则会占用主节点过多的资源。
  • 然后,对两台机器进行ppserver.py初始化并处理任务,如下面截图所示。

    • 在机器iceman-Q47OC-500P4C上运行的结果如下所示:

    • 在机器asgard-desktop上运行的结果如下所示:

源代码

本章小结

本章我们首先通过底层的命名管道实现了IPC,然后使用PP模块演示了两个问题的处理方法,它提供了高层的抽象,让IPC问题和分布式进程处理都更加简单。PP非常适合建立简单、小型、并行、分布式的Python应用。

下一章我们将使用大名鼎鼎的Celery模块来实现并行分布式任务。