4.Using the threading and concurrent.futures Modules

4.用threadingconcurrent.futures模块

在上一章,我们总结了并行思想可以解决的一些问题。这一章,我们将用Python的threading模块实现每个问题的解决方案。

本章内容包括以下主题:

  • 线程的定义
  • threading_thread的选择
  • threading实现多请求的Fibonacci数列
  • concurrent.futures模块实现网络爬虫

线程的定义

线程(thread)是一个进程(process)中不同执行单元(execution line)。如果把一个程序比喻成蜂巢(hive),那么向蜂巢里收集花粉(pollen)这件事就是一个进程。这个收集花粉的进程是由若干只为了解决花粉不足问题的工蜂(worker bee)同时工作来完成。这些工蜂扮演的角色就是线程,在进程内部执行并共享资源以完成各自的任务。

线程属于同一个进程并共享同一块内存。因此,开发者的任务就是控制和使用这块内存。

使用线程的优缺点

使用线程之前,需要考虑线程具有的一些优缺点,而且线程与具体的编程语言和操作系统有密切关联。

线程的优点主要有:

  • 线程在同一个进程内的通信速度,数据定位速度,和信息共享速度都是非常快的
  • 创建线程的成本比创建进程低,因为它不需要复制主进程上下文所包含的全部信息
  • 通过处理器缓存优化内存接入速度可以高效使用数据局部存储(data locality,thread local storage,TLS)

线程的缺点主要有:

  • 数据共享允许快速通信。但是,有时会让经验不足的开发者犯错。
  • 数据共享限制了解决方案的灵活性。比如,迁移到分布性架构时,线程是十分令人头疼的事情。通常,线程会限制算法的扩展性。

在Python程序语言里,由于GIL的存在,使用带有CPU限制的线程可能影响程序的性能。

线程类型

用两种线程:内核线程(kernel thread)与用户线程(user thread)。内核线程是由操作系统创建和管理的线程。线程上下文的切换(Context Switch),调度,和结束都是通过操作系统的内核管理的。而用户线程的所有状态是由开发者控制的。

我们可以对比两者的优缺点。

内核线程的优点主要有:

  • 一个内核线程常只轻量进程(lightweight processes)。因此如果一个内核线程阻塞,其他内核线程还可以运行
  • 内核线程可以在不同的CPU上运行

内核线程的缺点主要有:

  • 创建与同步操作开销都较大
  • 实现方式与平台有关

用户线程的优点主要有:

  • 创建与同步操作开销都较小
  • 实现方式与平台无关

用户线程的缺点主要有:

  • 一个进程内的所有用户线程只能与一个内核线程关联。因此,如果一个用户线程阻塞,其他用户线程都不能执行
  • 用户线程不运行在不同的CPU上

线程状态定义

线程生命周期中一共五个状态,如下所示:

  • 创建:主进程创建一个线程,创建之后,会被发送到线程队列准备执行
  • 执行:这个阶段,线程使用CPU
  • 就绪:这个阶段,线程在线程队列里准备就绪即将执行
  • 阻塞:这个阶段,线程被阻塞,可能是等待I/O操作完成,这个阶段不使用CPU
  • 结束:这个阶段,释放执行时占用的资源,结束线程的生命周期

threading_thread的选择

Python标准库里提供了两个线程模块:_thread模块(这个Python模块提供了一个线程的底层API,具体文档见https://docs.python.org/3.5/library/_thread.html#module-_thread)和threading模块(这个Python模块提供了一个线程的高层API,具体文档见https://docs.python.org/3.5/library/threading.html#module-threading)。这个threading模块通过了比_thread模块更友好的接口,使用起来更方便。开发者根据自己口味选择。如果开发者觉得用线程的底层API更舒服,可以实现自己的线程池(thread pool),互斥锁和其他原始特征,那么他可以选择_thread模块。否则,还是用threading模块最方便。

threading实现多请求的Fibonacci数列

现在是时候回到现实。我们的任务是在Fibonacci数列有多个输入请求时并行的执行。处于教学的目的,我们把输入值设置成四个,每个输入值分配一个线程运行函数,让worker与要执行的任务(task)完美地一一对应。算法如下所示:

  1. 首先,用列表储存需要计算的四个值,然后把值发送到一个允许线程同步使用的结构中。
  2. 当等待计算的值被发送到可同步使用的结构中之后,计算Fibonacci数列的线程需要一个信号告诉它们数据已经准备就绪了。我们用一个叫Condition的线程同步机制(Thread synchronization mechanism)。(Condition机制是一个Python对象,用来提供多个线程之间数据使用的同步机制,具体文档查看https://docs.python.org/3/library/threading.html#condition-objects。)
  3. 当每个线程完成对应Fibonacci数列的计算之后,把结果保存到词典中。

那么,现在让我们演示最有趣的代码和注释内容。

代码开始的时候,我们使用Unicode编码格式,然后导入loggingthreadingQueue模块。另外,我们在例子中已经定义了主要的数据结构。我们定义了一个字典fibo_dict,把每个整数(作为用户输入)存储为键,对应Fibonacci数列的计算作为字典的值。我们还用queue模块中的Queue模块声明了一个Queue对象shared_queue,用来存储计算Fibonacci数列线程之间的共享数据,线程把数据插入Queue对象。最后,我们定义最后一个数据结构——一个带四个元素的Python的list对象,模拟程序即将接收到的值。代码如下所示:

In [ ]:
#coding: utf-8
import logging, threading

from queue import Queue

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(message)s')

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)

fibo_dict = {}
shared_queue = Queue()
input_list = [3, 10, 5, 7]

下载源代码

你可以在Packt网站https://www.packtpub.com/,用你的账号下载所有书籍的示例代码。如果你从其他地方买的书,你可以先用https://www.packtpub.com/books/content/support搜索书籍名称,然后用你的邮箱地址注册一个账号下载代码。

下面一行代码,我们将定义一个threading模块里的Condition对象。这个对象可以通过一个具体的条件来同步资源使用的状态。

In [ ]:
queue_condition = threading.Condition()

使用Condition对象是为了控制队列的创建,以及里面发生的过程。

代码的另一个部分是定义线程即将执行的函数fibonacci_taskfibonacci_task函数的参数是condition对象,用来控制fibonacci_task使用shared_queue。我们使用with语句(关于with语句更多的信息,请参考https://docs.python.org/3/reference/compound_stmts.html#the-with-statement)来简化上下文的形式。如果不使用with语句,我们就需要显式地对线程锁进行获取和释放。使用with语句,我们就可以在程序一开始就获取线程锁,然后在程序结束的时候自动退出线程锁。在fibonacci_task函数的with语句后面是一个逻辑表达式,告诉当前的线程,“当shared_queue为空时,一直等待”。等待就是用condition对象的wait()方法是实现的。线程会等待,直到它被告知shared_queue可以使用时才停止。一旦条件得到满足,就会立即计算Fibonacci数列的值并存储到字典fibo_dict中。最后,我们调用task_done()方法,确认某一个排队的任务已经被抽取出来执行了。代码如下所示:

In [ ]:
def fibonacci_task(condition):
    with condition:
        while shared_queue.empty():
            logger.info("[%s] - waiting for elements in queue.."
                % threading.current_thread().name)
            condition.wait()
        else:
            value = shared_queue.get()
            a, b = 0, 1
            for item in range(value):
                a, b = b, a + b
                fibo_dict[value] = a
        shared_queue.task_done()
        logger.debug("[%s] fibonacci of key [%d] with result [%d]" %
            (threading.current_thread().name, value, fibo_dict[value]))

我们定义的第二个函数是queue_task是由线程执行的,把线程计算的结果添加到shared_queue里。我们会发现函数把condition作为线程同步接入shared_queue队列的参数。线程会把input_list中的每个元素都存储到shared_queue队列中。

当元素都存储到shared_queue队列之后,函数会告诉线程负责计算Fibonacci数列的队列已经准备好了。这个函数用condition.notifyAll()完成,如下所示:

In [ ]:
def queue_task(condition):
    logging.debug('Starting queue_task...')
    with condition:
        for item in input_list:
            shared_queue.put(item)
        logging.debug("Notifying fibonacci_task threads that the queue is ready to consume..")
        condition.notifyAll()

在后面的代码里,我们创建一个shared_queue准备条件已经满足的四线程组。然后我们使用一个允许自定义函数的thread类构造器(constructor),这个线程将用target参数设置要执行函数,在args里面放入要执行函数的参数,如下所示:

In [ ]:
threads = [threading.Thread(daemon=True, target=fibonacci_task,
                            args=(queue_condition,)) for i in range(4)]

然后,我们开启准备计算Fibonacci数列的线程。代码如下所示:

In [ ]:
[thread.start() for thread in threads]

紧接着,我们再创建并执行一个控制shared_queue的线程。代码如下所示:

In [ ]:
prod = threading.Thread(name='queue_task_thread', daemon=True, 
                        target=queue_task, args=(queue_condition,))
prod.start()

最后,我们对所有线程调用join()方法来计算Fibonacci数列。这么做的目的是让main线程等待这些线程全部执行完Fibonacci数列,这样它就不会在其他线程还没有结束之前就停止。代码如下所示:

In [ ]:
[thread.join() for thread in threads]

程序的执行结果如下图所示: 可以看出,一开始fibonacci_task线程被创建并初始化,紧接着进入等待状态。同时,queue_task被创建,然后生成shared_queue。最后,queue_task告诉fibonacci_task线程可以执行任务了。

还会看到,fibonacci_task线程执行并不是自然顺序,每次运行线程执行顺序都会变化。这就是线程的特点:不确定性。

源代码

concurrent.futures模块并行抓取网页

下面我们将用代码来实现一个并行网络爬虫。这里我们会用一个非常有意思的Python功能,concurrent.futures模块中的ThreadPoolExecutor。在前面的例子中,我们用非常原始的线程功能实现了parallel_fibonacci.py。而且我们是通过手工操作创建和初始化多个线程。在线程较多的项目中,这些做很难管理。通常,都会使用线程池解决线程的状态管理问题。线程池是一种用来保存若干线程的结构,在进程里使用之前创建。其目的是重用线程,这样可以避免重复创建线程——降低资源消耗。

下面的内容基本和上一节一样,我们需要一个通过多个阶段来执行不同任务的算法,这些任务是彼此相关的。我们将演示并行网络爬虫的代码。

导入模块并设置日志文件之后,我们用Python的re模块新建一个正则表达式,我们将在抓取阶段用这个表达式从网页源代码中筛选出所有链接。代码如下所示:

In [ ]:
html_link_regex = re.compile('<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')

紧接着,我们用一个同步队列来模拟多个输入URL任务。然后,新建一个字典result_dict来存储结果,把URL和对应页面的链接列表存在字典中。代码如下所示:

In [ ]:
urls = queue.Queue()
urls.put('http://www.sina.com')
urls.put('http://cn.bing.com/')
urls.put('https://coding.net/')
urls.put('http://github.com/')
urls.put('http://mail.126.com/')

result_dict = {}

之后我们定义一个函数group_urls_task,从同步队列中抽取URL加入到字典result_dict里,URL作为字典result_dict的键。还有一个细节要注意是同步队列的get()函数用了两个参数。一个参数是True用来阻塞同步队列接入。另一个参数是0.05秒,避免在同步队列中没有任务时等待太长时间。有时候,你不会愿意花太多时间阻塞线程来等待任务出现。代码如下所示:

In [ ]:
def group_urls_task(urls):
    try:
        url = urls.get(True, 0.05)
        result_dict[url] = None
        logger.info("[%s] putting url [%s] in dictionary..." % (
            threading.current_thread().name, url))
    except queue.Empty:
        logging.error('Nothing to be done, queue is empty')

现在,我们用函数crawl_task实现每个URL的页面的抓取。基本上抓取阶段就是获取URL对应页面上的所有链接。函数返回值是一个元组,第一个元素是URL,第二个元素是URL对应页面上的所有链接构成的列表。这里我们用requests模块获取URL对应页面的内容。代码如下所示:

In [ ]:
def crawl_task(url):
    links = []
    try:
        request_data = requests.get(url)
        logger.info("[%s] crawling url [%s] ..." % (
            threading.current_thread().name, url))
        links = html_link_regex.findall(request_data.text)
    except:
        logger.error(sys.exc_info()[0])
        raise
    finally:
        return (url, links)

进一步分析代码,我们会看到里面创建了一个concurrent.futures模块中的ThreadPoolExecutor对象。在ThreadPoolExecutor对象的构造器里,设置了max_workers参数。这个参数在线程池里定义了供执行器(executor)使用的线程数量。在前面将URL从同步队列移除并作为键增加到字典result_dict这个阶段,都是用这个三个worker线程完成的。这个数量可以根据问题的需要进行设置。完成ThreadPoolExecutor对象的定义之后,我们用with语句保证线程的结束例程(ending routine),这些例程将会在with语句范围结束后立即执行。在ThreadPoolExecutor对象的范围内,我们在同步队列中重复它,以submit方法对同步队列中的URL执行一个引用。总之,submit方法为执行安排了一个可调用函数,然后返回一个带有创建执行的例程安排(scheduling)的Future对象。submit方法接收一个可调用函数和函数的参数;在我们的例子里,可调用函数就是group_urls_task,函数的参数就是同步队列urls。当这些参数被调用之后,线程池中的worker线程会以一种并行,异步的方式执行函数预订(booking)。代码如下所示:

In [ ]:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as group_link_threads:
    for i in range(urls.qsize()):
        group_link_threads.submit(group_urls_task, urls)

之后我们又定义了一个新的ThreadPoolExecutor对象;但是这一次我们想用前面group_urls_task函数生成的字典result_dict的键来执行抓取阶段的任务。代码会有些不同:

In [ ]:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as crawler_link_threads:
    future_tasks = {crawler_link_threads.submit(crawl_task, url): url for url in result_dict.keys()}
    for future in concurrent.futures.as_completed(future_tasks):
        result_dict[future.result()[0]] = future.result()[1]

我们映射了一个临时字典future_tasks。里面包含了submit方法进行的函数预订,参数是result_dict的每个URL键。也就是说,我们用future_tasks为每个键创建了一个入口(entry)。映射之后,我们需通过concurrent.futures.as_completed(fs, timeout=None)方法来寻找future_tasks里入口,通过一个循环来获取前面函数预订的结果。这个调用会返回一个由Future对象实例组成的迭代器(iterator)。因此,我们可以迭代出每一个已经被处理好的预订结果。在ThreadPoolExecutor对象的最后,我们用Future对象result()方法获取抓取线程的结果。在这个例子的抓取阶段,Future对象返回的结果是元组。用这种方式我们获得的future_tasks最终结果如下图所示: 我们再一次看到线程池中的线程出现的顺序并不是自然顺序,这是由于线程具有不确定性。重要的是通过打印result_dict的项目来展示最终结果。

源代码

本章小结

在这一章里,我们重点介绍了线程的概念。我们通过threadingconcurrent.futures模块实现了上一章里提到的两个问题。通过问题的解决展示了两个模块的原理和灵活性。

下一章,我们将重点介绍如何用multiprocessingProcessPoolExecutor解决这两个问题。