Skip to content

Latest commit

 

History

History
1249 lines (882 loc) · 71.2 KB

File metadata and controls

1249 lines (882 loc) · 71.2 KB

十三、并发

并行处理是软件工程领域最广泛的主题之一。本书中的大部分章节也涵盖了广阔的领域,几乎所有章节都可以作为单独一本书的主题。但是并发本身的主题是如此之大,以至于它可以占据几十个位置,我们仍然无法讨论它的所有重要方面和模型。

这就是为什么我不想愚弄你们,从一开始,我们几乎不会触及这个话题的表面。本章的目的是说明为什么在应用程序中可能需要并发,何时使用它,以及在 Python 中可能使用的最重要的并发模型是什么:

  • 多线程
  • 多处理
  • 异步编程

我们还将讨论一些允许您在代码中实现这些模型的语言特性、内置模块和第三方软件包。但我们不会详细介绍它们。将本章内容作为进一步研究和阅读的切入点。它在这里指导您了解基本思想,并帮助您决定是否确实需要并发,如果需要,哪种方法最适合您的需要。

为什么并发?

在我们回答问题为什么并发之前,我们需要问什么是并发?

第二个问题的答案可能会让一些人感到惊讶,他们过去认为这是并行处理的同义词。但并发性与并行性不同。并发性不是应用程序实现的问题,而是程序、算法或问题的特性。并行性只是解决并发问题的可能方法之一。

莱斯利·兰波特(Leslie Lamport)在 1976 年发表的《分布式系统中的时间、时钟和事件顺序》论文中说:

“如果两个事件都不能对另一个产生因果影响,则两个事件是同时发生的。”

通过将事件外推到程序、算法或问题,我们可以说,如果某个事件可以完全或部分分解为与顺序无关的组件(单元),那么它就是并发的。这些单元可以相互独立地处理,处理的顺序不影响最终结果。这意味着它们也可以同时或并行处理。如果我们以这种方式处理信息,那么我们实际上是在处理并行处理。但这仍然不是强制性的。

以分布式方式工作,最好使用多核处理器或计算集群的功能,是并发问题的自然结果。无论如何,这并不意味着这是有效处理并发的唯一方法。在许多用例中,并发问题可以通过同步方式以外的方式处理,但不需要并行执行。

因此,一旦我们知道了并发的真正含义,就应该解释一下这是怎么回事了。当问题同时发生时,它给你机会以一种特殊的、最好是更有效的方式来处理它。

我们通常习惯于通过执行一系列步骤以经典方式处理问题。这就是我们大多数人使用同步算法思考和处理信息的方式,同步算法一次只做一件事,一步一步。但这种处理信息的方式并不适合解决大规模问题,或者当您需要同时满足多个用户或软件代理的需求时:

  • 处理作业的时间受单个处理单元(单机、CPU 核心等)的性能限制
  • 在程序完成前一个输入的处理之前,您无法接受和处理新的输入

因此,一般来说,在以下情况下,同时处理并发问题是最好的方法:

  • 问题的规模如此之大,以至于在可接受的时间内或在可用资源范围内处理它们的唯一方法是将执行分配给多个可以并行处理工作的处理单元
  • 您的应用程序需要保持响应能力(接受新输入),即使它尚未完成对旧输入的处理

这涵盖了并发处理是合理选择的大多数情况。第一组问题肯定需要并行处理解决方案,因此通常使用多线程和多处理模型来解决。第二组不一定需要并行处理,因此实际的解决方案实际上取决于问题的细节。请注意,此组还包括应用程序需要独立地为多个客户端(用户或软件代理)提供服务,而无需等待其他客户端成功提供服务的情况。

另一件值得一提的事情是,前面两组并不是排他性的。通常,您需要保持应用程序的响应能力,同时无法在单个处理单元上处理输入。这就是为什么可能同时使用不同的、看似可选的或冲突的并发方法的原因。这在 web 服务器的开发中尤其常见,在 web 服务器的开发中,可能需要使用异步事件循环或多个进程结合的线程,以便利用所有可用资源并在高负载下保持低延迟。

多线程

开发人员通常认为线程是一个复杂的主题。虽然这句话完全正确,但 Python 提供了高级类和函数,可以简化线程的使用。CPython 的线程实现附带了一些不方便的细节,使得它们不如其他语言有用。对于一些您可能想要解决的问题集,它们仍然是完全可以解决的,但对于 C 或 Java 中的问题集来说,它们就没有那么多了。在本节中,我们将讨论 CPython 中多线程的局限性,以及 Python 线程是可行解决方案的常见并发问题。

什么是多线程?

Thread 是执行线程的缩写。程序员可以将他或她的工作拆分为同时运行并共享相同内存上下文的线程。除非您的代码依赖于第三方资源,否则多线程处理不会在单核处理器上加速,甚至会增加线程管理的开销。多线程将受益于多处理器或多核机器,并将在每个 CPU 核上并行执行每个线程,从而使程序更快。请注意,这是一条适用于大多数编程语言的一般规则。在 Python 中,多核 CPU 上多线程的性能优势有一些限制,但我们将在后面讨论。为了简单起见,现在让我们假设这句话是真的。

线程之间共享相同的上下文这一事实意味着您必须保护数据不受并发访问。如果两个线程在没有任何保护的情况下更新相同的数据,则会发生争用情况。这被称为竞争危险,由于每个线程运行的代码对数据状态做出错误假设,因此可能会出现意外结果。

锁机制有助于保护数据,线程编程始终是确保线程以安全的方式访问资源的问题。这可能非常困难,线程编程通常会导致难以调试的 bug,因为它们很难重现。最糟糕的问题是,由于糟糕的代码设计,两个线程锁定了一个资源,并试图获取另一个线程锁定的资源。他们将永远等待对方。这就是所谓的死锁,很难调试。可重入锁定帮助确保线程不会通过两次尝试锁定资源而被锁定。

然而,当线程使用为其构建的工具来满足孤立的需求时,它们可能会提高程序的速度。

多线程通常在系统内核级别得到支持。当机器只有一个单核处理器时,系统使用时间选择机制。在这里,CPU 从一个线程切换到另一个线程的速度如此之快,以至于出现了线程同时运行的错觉。这也是在处理级别完成的。没有多个处理单元的并行显然是虚拟的,在这样的硬件上运行多个线程不会带来性能提升。无论如何,有时即使代码必须在单个内核上执行,用线程实现代码仍然很有用,我们稍后将看到一个可能的用例。

当您的执行环境有多个处理器或多个 CPU 核供其处置时,一切都会发生变化。即使使用了 timeslicing,进程和线程也分布在 CPU 之间,从而能够更快地运行程序。

Python 如何处理线程

与其他一些语言不同,Python 使用多个内核级线程,每个线程都可以运行任何解释器级线程。但是 CPython 语言的标准实现有一个主要的限制,这使得线程在许多上下文中不太可用。访问 Python 对象的所有线程都由一个全局锁序列化。之所以这样做,是因为许多解释器内部结构以及第三方 C 代码都不是线程安全的,需要加以保护。

该机制称为全局解释器锁GIL),其在 Python/C API 级别的实现细节已经在第 7 章其他语言的 Python 扩展发布 GIL一节中讨论过。删除 GIL 是一个偶尔出现在 python 开发人员电子邮件列表上的主题,开发人员多次假定了这个主题。不幸的是,直到现在,还没有人能够提供一个合理而简单的解决方案,让我们摆脱这一限制。我们很快就不可能在这方面看到任何进展。更安全的假设是吉尔将永远留在 CPython。因此,我们需要学会如何面对它。

那么,Python 中的多线程有什么意义呢?

当线程只包含纯 Python 代码时,使用线程来加速程序没有什么意义,因为 GIL 将序列化它。但请记住,GIL 只是强制要求在任何时候只有一个线程可以执行 Python 代码。实际上,全局解释器锁在许多阻塞系统调用上被释放,并且可以在不使用任何 Python/capi 函数的 C 扩展部分中释放。这意味着,多个线程可以在某些第三方扩展中并行执行 I/O 操作或 C 代码。

对于使用外部资源或涉及 C 代码的非纯代码块,多线程在等待第三方资源返回结果时非常有用。这是因为明确释放 GIL 的休眠线程可以在结果返回时待机并醒来。最后,每当程序需要提供响应接口时,多线程就是答案,即使它使用了 timeslice。该程序可以与用户交互,同时在所谓的后台进行一些繁重的计算。

注意,GIL 并不存在于 Python 语言的每个实现中。它是 CPython、Stackless Python 和 PyPy 的限制,但 Jython 和 IronPython 中不存在(参见第 1 章Python 的当前状态。虽然 PyPy 的无 GIL 版本有一些发展,但在编写本书时,它仍处于实验阶段,缺乏文档。它基于软件事务内存,称为 PyPy STM。真的很难说它何时(或是否)将作为一个生产就绪的解释器正式发布。一切似乎都表明这不会很快发生。

什么时候使用螺纹?

尽管存在 GIL 限制,线程在某些情况下还是非常有用的。他们可以在以下方面提供帮助:

  • 构建响应接口
  • 授权工作
  • 构建多用户应用程序

建立响应性接口

假设您要求系统通过图形用户界面将文件从一个文件夹复制到另一个文件夹。任务可能会被推到后台,主线程会不断刷新界面窗口。通过这种方式,您可以获得整个流程进展的实时反馈。您还可以取消该操作。这比原始的cpcopyshell 命令更令人不快,在所有工作完成之前不会提供任何反馈。

响应界面还允许用户同时处理多个任务。例如,Gimp 将允许您在过滤另一张图片时处理一张图片,因为这两个任务是独立的。

当试图实现这样的响应界面时,一个好的方法是尝试将长时间运行的任务推到后台,或者至少尝试向用户提供持续的反馈。实现这一点的最简单方法是使用线程。在这种情况下,它们不是为了提高性能,只是为了确保用户仍然可以操作界面,即使它需要处理一些数据更长的时间。

如果这样的后台任务执行大量 I/O 操作,您仍然可以从多核 CPU 中获得一些好处。这是一个双赢的局面。

委托工作

如果您的进程依赖于第三方资源,那么线程可能真的会加速一切。

让我们考虑一个函数,该函数在文件夹中索引文件,并将生成的索引推送到数据库中。根据文件的类型,该函数调用不同的外部程序。例如,一个专用于 PDF,另一个专用于 OpenOffice 文件。

通过执行正确的程序,然后将结果存储到数据库中,您的函数可以为每个转换器设置一个线程,并通过队列将要执行的作业推送到每个转换器,而不是按顺序处理每个文件。函数所花费的总时间将更接近最慢转换器的处理时间,而不是所有工作的总和。

转换器线程可以从一开始就初始化,负责将结果推送到数据库的代码也可以是使用队列中可用结果的线程。

请注意,这种方法在某种程度上是多线程和多处理的混合。如果您将工作委托给外部进程(例如,使用subprocess模块中的run()函数),您实际上是在多个进程中工作,因此这具有多进程的症状。但是在我们的场景中,我们正在等待单独线程中的处理结果,因此从 Python 代码的角度来看,它仍然主要是多线程的。

线程的另一个常见用例是对外部服务执行多个 HTTP 请求。例如,如果您想从一个远程 web API 获取多个结果,那么同步获取可能需要很多时间。如果您在发出新请求之前等待以前的每个响应,那么您将花费大量时间等待外部服务响应,并且每个此类请求都会增加额外的往返时间延迟。如果您正在与一个高效的服务(例如 Google Maps API)进行通信,那么它很可能会同时为您的大多数请求提供服务,而不会影响单独请求的响应时间。然后在单独的线程中执行多个查询是合理的。请记住,在执行 HTTP 请求时,大部分时间都花在从 TCP 套接字读取数据上。这是一个阻塞 I/O 操作,因此 CPython 将在执行recv()C 功能时释放 GIL。这样可以大大提高应用程序的性能。

多用户应用

线程还用作多用户应用程序的并发基础。例如,web 服务器会将用户请求推送到一个新线程中,然后变为空闲状态,等待新的请求。为每个请求指定一个专用线程简化了大量工作,但需要开发人员负责锁定资源。但是,当所有共享数据都被推送到处理并发问题的关系数据库中时,这不是问题。因此,多用户应用程序中的线程的行为几乎类似于独立的进程。它们处于相同的过程中,只是为了简化它们在应用程序级别的管理。

例如,web 服务器将能够将所有请求放入一个队列中,并等待线程向其发送工作。此外,它允许内存共享,这可以提高一些工作并减少内存负载。两个非常流行的 Python WSGI 兼容 Web 服务器:Gunicorn(参考http://gunicorn.org/uWSGI(参考https://uwsgi-docs.readthedocs.org ),允许您以一种通常遵循此原则的方式使用线程工作程序服务 HTTP 请求。

在多用户应用程序中使用多线程实现并发比使用多处理成本更低。单独的进程需要花费更多的资源,因为每个进程都需要加载一个新的解释器。另一方面,拥有太多的线程也很昂贵。我们知道 GIL 对于 I/O 广泛的应用程序来说并不是一个问题,但总有一段时间需要执行 Python 代码。由于无法使用裸线程并行所有应用程序部分,因此永远无法利用具有多核 CPU 和单个 Python 进程的计算机上的所有资源。这就是为什么最佳解决方案通常是多处理和多线程的混合,多个工作线程(进程)与多个线程一起运行。幸运的是,许多符合 WSGI 的 web 服务器都允许这样的设置。

但是在多线程处理多线程之前,请考虑这样的方法是否真的值得所有的成本。这种方法使用多处理来提高资源利用率,另外使用多线程来提高并发性,这应该比运行多个进程轻。但这并不一定是真的。也许摆脱线程和增加进程数量并不像你想象的那么昂贵?在选择最佳设置时,始终需要对应用程序进行负载测试(请参见第 10 章测试驱动开发中的负载和性能测试部分)。另外,使用多线程的一个副作用是,您会得到一个不太安全的环境,共享内存会造成数据损坏或可怕的死锁风险。也许更好的替代方法是使用一些异步方法,包括事件循环、绿色线程或协同路由。我们稍后将在异步编程一节中介绍此类解决方案。同样,如果没有合理的负载测试和实验,您无法真正判断哪种方法在您的环境中最有效。

线程应用程序的示例

为了了解 Python 线程在实践中是如何工作的,让我们构造一个示例应用程序,它可以从实现多线程中获得一些好处。我们将讨论一个简单的问题,您在专业实践中进行多个并行 HTTP 查询时可能会遇到这个问题。这个问题已经作为多线程的常见用例提到过。

比方说,我们需要使用无法批量处理为单个大型 HTTP 请求的多个查询从某个 web 服务获取数据。作为一个现实的例子,我们将使用 GoogleMapsAPI 中的地理编码端点。作出这一选择的原因如下:

  • 它非常受欢迎,而且是一项记录良好的服务
  • 此 API 的一个免费层不需要任何身份验证密钥
  • PyPI 上提供了一个python-gmaps包,允许您与各种 Google Maps API 端点交互,并且非常易于使用

地理编码只是将地址或地点转换成坐标。我们将尝试将各个城市的预定义列表地理编码为纬度/经度元组,并在标准输出上用python-gmaps显示结果。它非常简单,如下代码所示:

>>> from gmaps import Geocoding
>>> api = Geocoding()
>>> geocoded = api.geocode('Warsaw')[0]
>>> print("{:>25s}, {:6.2f}, {:6.2f}".format(
...         geocoded['formatted_address'],
...         geocoded['geometry']['location']['lat'],
...         geocoded['geometry']['location']['lng'],
...     ))
Warsaw, Poland,  52.23,  21.01

由于我们的目标是展示并发问题的多线程解决方案与标准同步解决方案的比较,因此我们将从一个根本不使用线程的实现开始。以下是一个程序的代码,该程序在城市列表上循环,查询 Google Maps API,并在文本格式的表格中显示有关城市地址和坐标的信息:

import time

from gmaps import Geocoding

api = Geocoding()

PLACES = (
    'Reykjavik', 'Vien', 'Zadar', 'Venice',
    'Wrocław', 'Bolognia', 'Berlin', 'Słubice',
    'New York', 'Dehli',
)

def fetch_place(place):
    geocoded = api.geocode(place)[0]

    print("{:>25s}, {:6.2f}, {:6.2f}".format(
        geocoded['formatted_address'],
        geocoded['geometry']['location']['lat'],
        geocoded['geometry']['location']['lng'],
    ))

def main():
    for place in PLACES:
        fetch_place(place)

if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started

    print()
    print("time elapsed: {:.2f}s".format(elapsed))

main()函数的执行过程中,我们添加了一些语句,用于测量完成作业所需的时间。在我的计算机上,此程序通常需要 2 到 3 秒来完成其任务:

$ python3 synchronous.py
 Reykjavík, Iceland,  64.13, -21.82
 Vienna, Austria,  48.21,  16.37
 Zadar, Croatia,  44.12,  15.23
 Venice, Italy,  45.44,  12.32
 Wrocław, Poland,  51.11,  17.04
 Bologna, Italy,  44.49,  11.34
 Berlin, Germany,  52.52,  13.40
 Slubice, Poland,  52.35,  14.56
 New York, NY, USA,  40.71, -74.01
 Dehli, Gujarat, India,  21.57,  73.22

time elapsed: 2.79s

我们脚本的每次运行都将花费不同的时间,因为它主要依赖于通过网络连接访问的远程服务。因此,有许多不确定因素影响最终结果。最好的方法是进行更长的测试,重复多次,并根据测量结果计算一些平均值。但为了简单起见,我们不会这么做。稍后您将看到,这种简化的方法仅用于说明目的。

每个项目使用一根螺纹

现在是改进的时候了。我们在 Python 中没有做很多处理,执行时间长是由于与外部服务的通信造成的。我们向服务器发送一个 HTTP 请求,它计算出答案,然后我们等待响应被传输回来。涉及到很多 I/O,因此多线程似乎是一个可行的选择。我们可以在单独的线程中一次启动所有请求,然后等待它们收到数据。如果我们正在与之通信的服务能够同时处理我们的请求,那么我们肯定会看到性能的提高。

让我们从最简单的方法开始。Python 通过threading模块在系统线程上提供了干净且易于使用的抽象。这个标准库的核心是表示单线程实例的Thread类。下面是main()函数的修改版本,它为每个要进行地理编码的地方创建并启动一个新线程,然后等待所有线程完成:

from threading import Thread

def main():
    threads = []
    for place in PLACES:
        thread = Thread(target=fetch_place, args=[place])
        thread.start()
        threads.append(thread)

    while threads:
        threads.pop().join()

这是一个快速而肮脏的变化,它有一些严重的问题,我们将在稍后尝试解决。它以一种有点轻浮的方式来处理问题,并且它不是一种编写可靠的软件来服务成千上万或数百万用户的方式。但是,嘿,它起作用了:

$ python3 threaded.py
 Wrocław, Poland,  51.11,  17.04
 Vienna, Austria,  48.21,  16.37
 Dehli, Gujarat, India,  21.57,  73.22
 New York, NY, USA,  40.71, -74.01
 Bologna, Italy,  44.49,  11.34
 Reykjavík, Iceland,  64.13, -21.82
 Zadar, Croatia,  44.12,  15.23
 Berlin, Germany,  52.52,  13.40
 Slubice, Poland,  52.35,  14.56
 Venice, Italy,  45.44,  12.32

time elapsed: 1.05s

因此,当我们知道线程对我们的应用程序有着有益的影响时,是时候以稍微理智的方式使用它们了。首先,我们需要确定前面代码中的问题:

  • 我们为每个参数启动一个新线程。线程初始化也需要一些时间,但这种较小的开销不是唯一的问题。线程还消耗其他资源,如内存和文件描述符。我们的示例输入有严格定义的项数,如果没有呢?您肯定不希望运行依赖于任意大小的数据输入的无限数量的线程。
  • 在线程中执行的fetch_place()函数调用内置print()函数,实际上,您不太可能希望在主应用程序线程之外执行该函数。首先,这是由于标准输出在 Python 中是如何缓冲的。当对该函数的多个调用在线程之间交错时,可能会遇到格式错误的输出。此外,print()功能被认为是缓慢的。如果在多线程中鲁莽使用,可能会导致序列化,这将取消多线程的所有好处。
  • 最后但并非最不重要的一点是,通过将每个函数调用委托给一个单独的线程,我们很难控制输入的处理速度。是的,我们希望尽可能快地完成这项工作,但外部服务通常会对来自单个客户机的请求速率施加硬限制。有时,合理地设计程序,使您能够限制处理速度,这样您的应用程序就不会因为滥用其使用限制而被外部 API 列入黑名单。

使用线程池

我们将尝试解决的第一个问题是程序运行的线程的未绑定限制。一个好的解决方案是构建一个线程工作池,该池具有严格定义的大小,可以处理所有并行工作,并通过一些线程安全的数据结构与工作池通信。通过使用这个线程池方法,我们还可以更容易地解决刚才提到的另外两个问题。

因此,一般的想法是启动一些预定义数量的线程,这些线程将使用队列中的工作项,直到完成为止。当没有其他工作要做时,线程将返回,我们将能够退出程序。用于与工作人员通信的结构的一个很好的候选者是内置queue模块中的Queue类。它是一个 FIFO(先进先出)队列实现,与来自collections模块的deque集合非常相似,专门设计用于处理线程间通信。以下是main()函数的一个修改版本,它仅启动有限数量的工作线程,以新的worker()函数为目标,并使用线程安全队列与它们通信:

from queue import Queue, Empty
from threading import Thread

THREAD_POOL_SIZE = 4

def worker(work_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            fetch_place(item)
            work_queue.task_done()

def main():
    work_queue = Queue()

    for place in PLACES:
        work_queue.put(place)

    threads = [
        Thread(target=worker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

运行我们程序的修改版本的结果与前一版本类似:

$ python threadpool.py 
 Reykjavík, Iceland,  64.13, -21.82
 Venice, Italy,  45.44,  12.32
 Vienna, Austria,  48.21,  16.37
 Zadar, Croatia,  44.12,  15.23
 Wrocław, Poland,  51.11,  17.04
 Bologna, Italy,  44.49,  11.34
 Slubice, Poland,  52.35,  14.56
 Berlin, Germany,  52.52,  13.40
 New York, NY, USA,  40.71, -74.01
 Dehli, Gujarat, India,  21.57,  73.22

time elapsed: 1.20s

运行时间将比每个参数只有一个线程的情况慢,但至少现在不可能用任意长的输入耗尽所有计算资源。此外,我们还可以调整THREAD_POOL_SIZE参数 a,以实现更好的资源/时间平衡。

使用双向队列

我们现在能够解决的另一个问题是在线程中打印输出的潜在问题。最好将这样的责任留给启动其他线程的主线程。我们可以通过提供另一个队列来处理这个问题,该队列将负责从我们的工作人员那里收集结果。以下是完整的代码,将所有内容与突出显示的主要更改放在一起:

import time
from queue import Queue, Empty
from threading import Thread

from gmaps import Geocoding

api = Geocoding()

PLACES = (
    'Reykjavik', 'Vien', 'Zadar', 'Venice',
    'Wrocław', 'Bolognia', 'Berlin', 'Słubice',
    'New York', 'Dehli',
)

THREAD_POOL_SIZE = 4

def fetch_place(place):
    return api.geocode(place)[0]

def present_result(geocoded):
 print("{:>25s}, {:6.2f}, {:6.2f}".format(
 geocoded['formatted_address'],
 geocoded['geometry']['location']['lat'],
 geocoded['geometry']['location']['lng'],
 ))

def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
 results_queue.put(
 fetch_place(item)
 )
            work_queue.task_done()

def main():
    work_queue = Queue()
 results_queue = Queue()

    for place in PLACES:
        work_queue.put(place)

    threads = [
 Thread(target=worker, args=(work_queue, results_queue))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

 while not results_queue.empty():
 present_result(results_queue.get())

if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started

    print()
    print("time elapsed: {:.2f}s".format(elapsed))

这消除了输出格式错误的风险,如果present_result()函数执行更多print()语句或执行一些额外的计算,我们可能会遇到这种情况。我们不期望这种方法在小输入的情况下有任何性能改进,但事实上,我们也降低了由于print()执行缓慢而导致的线程序列化风险。以下是我们的最终输出:

$ python threadpool_with_results.py 
 Vienna, Austria,  48.21,  16.37
 Reykjavík, Iceland,  64.13, -21.82
 Zadar, Croatia,  44.12,  15.23
 Venice, Italy,  45.44,  12.32
 Wrocław, Poland,  51.11,  17.04
 Bologna, Italy,  44.49,  11.34
 Slubice, Poland,  52.35,  14.56
 Berlin, Germany,  52.52,  13.40
 New York, NY, USA,  40.71, -74.01
 Dehli, Gujarat, India,  21.57,  73.22

time elapsed: 1.30s

处理错误和限速

前面提到的中,您在处理此类问题时可能会遇到的最后一个问题是外部服务提供商施加的费率限制。就谷歌地图 API 而言,在撰写本书时,免费和未经验证请求的官方费率限制为每秒 10 个请求,每天 2500 个请求。当使用多线程时,很容易耗尽这样的限制。这个问题甚至更严重,因为我们还没有讨论任何故障场景,而在多线程 Python 代码中处理异常比通常情况要复杂一些。

当客户端超过谷歌的速率时,api.geocode()函数将引发异常,这是个好消息。但此异常是单独引发的,不会使整个程序崩溃。工作线程当然会立即退出,但主线程将等待work_queue上存储的所有任务完成(使用work_queue.join()调用)。这意味着我们的工作线程应该优雅地处理可能的异常,并确保处理队列中的所有项目。如果没有进一步的改进,我们最终可能会遇到一些工作线程崩溃,程序永远不会退出的情况。

让我们对代码做一些小改动,以便为可能出现的任何问题做好准备。在工作线程中出现异常的情况下,我们可以在 results_queue队列中放置一个错误实例,并将当前任务标记为已完成,这与没有错误时的做法相同。这样我们可以确保主线程在等待work_queue.join()时不会无限期锁定。然后,主线程可能会检查结果并重新引发在结果队列中发现的任何异常。以下是worker()main()功能的改进版本,它们可以更安全地处理异常:

def worker(work_queue, results_queue):
    while True:
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
 try:
 result = fetch_place(item)
 except Exception as err:
 results_queue.put(err)
 else:
 results_queue.put(result)
 finally:
 work_queue.task_done()

def main():
    work_queue = Queue()
    results_queue = Queue()

    for place in PLACES:
        work_queue.put(place)

    threads = [
        Thread(target=worker, args=(work_queue, results_queue))
        for _ in range(THREAD_POOL_SIZE)
    ]

    for thread in threads:
        thread.start()

    work_queue.join()

    while threads:
        threads.pop().join()

 while not results_queue.empty():
 result = results_queue.get()

 if isinstance(result, Exception):
 raise result

        present_result(result)

当我们准备好处理异常时,是时候打破我们的代码并超过速率限制。通过修改一些初始条件,我们可以很容易地做到这一点。让我们增加地理编码的位置数量和线程池的大小:

PLACES = (
    'Reykjavik', 'Vien', 'Zadar', 'Venice',
    'Wrocław', 'Bolognia', 'Berlin', 'Słubice',
    'New York', 'Dehli',
) * 10

THREAD_POOL_SIZE = 10

如果您的执行环境足够快,您很快就会遇到类似的错误:

$ python3 threadpool_with_errors.py
 New York, NY, USA,  40.71, -74.01
 Berlin, Germany,  52.52,  13.40
 Wrocław, Poland,  51.11,  17.04
 Zadar, Croatia,  44.12,  15.23
 Vienna, Austria,  48.21,  16.37
 Bologna, Italy,  44.49,  11.34
 Reykjavík, Iceland,  64.13, -21.82
 Venice, Italy,  45.44,  12.32
 Dehli, Gujarat, India,  21.57,  73.22
 Slubice, Poland,  52.35,  14.56
 Vienna, Austria,  48.21,  16.37
 Zadar, Croatia,  44.12,  15.23
 Venice, Italy,  45.44,  12.32
 Reykjavík, Iceland,  64.13, -21.82
Traceback (most recent call last):
 File "threadpool_with_errors.py", line 83, in <module>
 main()
 File "threadpool_with_errors.py", line 76, in main
 raise result
 File "threadpool_with_errors.py", line 43, in worker
 result = fetch_place(item)
 File "threadpool_with_errors.py", line 23, in fetch_place
 return api.geocode(place)[0]
 File "...\site-packages\gmaps\geocoding.py", line 37, in geocode
 return self._make_request(self.GEOCODE_URL, parameters, "results")
 File "...\site-packages\gmaps\client.py", line 89, in _make_request
 )(response)
gmaps.errors.RateLimitExceeded: {'status': 'OVER_QUERY_LIMIT', 'results': [], 'error_message': 'You have exceeded your rate-limit for this API.', 'url': 'https://maps.googleapis.com/maps/api/geocode/json?address=Wroc%C5%82aw&sensor=false'}

前面的异常当然不是错误代码的结果。这个程序对于这个免费服务来说太快了一点。它会产生太多并发请求,为了正确工作,我们需要有一种方法来限制它们的速率。

限制工作节奏通常被称为节流。PyPI 上有几个软件包,允许您限制任何类型工作的速率,并且非常易于使用。但我们不会在这里使用任何外部代码。节流是为线程引入一些锁定原语的好机会,因此我们将尝试从头构建解决方案。

我们将使用的算法有时称为令牌桶,非常简单:

  1. 有一个包含预定义数量令牌的 bucket。
  2. 每个令牌响应处理一项工作的单个权限。
  3. 每次工作人员请求一个或多个令牌时(权限):
    • 我们测量上次重新装满水桶的时间
    • 如果时差允许,我们用响应时差的令牌量重新填充桶
    • 如果存储令牌的数量大于或等于请求的数量,我们将减少存储令牌的数量并返回该值
    • 如果存储的令牌数量小于请求的数量,则返回零

两件重要的事情是始终使用零令牌初始化令牌桶,并且根据我们的标准时间量,绝不允许它填充更多可用的令牌,以令牌表示。如果我们不遵守这些预防措施,我们可以在超过速率限制的情况下释放令牌。因为在我们的情况下,速率限制是以每秒请求数表示的,所以我们不需要处理任意数量的时间。我们假设度量的基础是 1 秒,因此我们存储的令牌永远不会超过该时间段内允许的请求数。下面是允许使用令牌桶算法进行节流的类的示例实现:

From threading import Lock

class Throttle:
    def __init__(self, rate):
        self._consume_lock = Lock()
        self.rate = rate
        self.tokens = 0
        self.last = 0

    def consume(self, amount=1):
        with self._consume_lock:
            now = time.time()

            # time measument is initialized on first
            # token request to avoid initial bursts
            if self.last == 0:
                self.last = now

            elapsed = now - self.last

            # make sure that quant of passed time is big
            # enough to add new tokens
            if int(elapsed * self.rate):
                self.tokens += int(elapsed * self.rate)
                self.last = now

            # never over-fill the bucket
            self.tokens = (
                self.rate
                if self.tokens > self.rate
                else self.tokens
            )

            # finally dispatch tokens if available
            if self.tokens >= amount:
                self.tokens -= amount
            else:
                amount = 0

            return amount

这个类的用法非常简单。假设我们在主线程中只创建了一个Throttle(以Throttle(10)为例)实例,并将其作为位置参数传递给每个工作线程。在不同的线程中使用相同的数据结构是安全的,因为我们使用threading模块中的Lock类实例保护对其内部状态的操作。我们现在可以更新worker()函数实现,以等待每个项目,直到 throttle 释放一个新令牌:

def worker(work_queue, results_queue, throttle):
    while True:
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
 while not throttle.consume():
 pass

            try:
                result = fetch_place(item)
            except Exception as err:
                results_queue.put(err)
            else:
                results_queue.put(result)
            finally:
                work_queue.task_done()

多处理

老实说,多线程是一项挑战,我们在上一节已经看到了这一点。事实上,解决这个问题最简单的方法只需要很少的努力。但是,以理智和安全的方式处理线程需要大量的代码。

我们必须设置线程池和通信队列,优雅地处理来自线程的异常,并且在尝试提供速率限制功能时还要考虑线程安全。数十行代码,仅用于并行执行外部库中的一个函数!我们只假设这是生产就绪,因为外部包创建者承诺他的库是线程安全的。对于一个实际上只适用于执行 I/O 绑定任务的解决方案来说,这似乎是一个很高的价格。

另一种允许您实现并行性的方法是多处理。不使用 GIL 相互约束的单独 Python 进程允许更好的资源利用率。这对于运行在多核处理器上的应用程序来说尤其重要,因为这些应用程序执行的是真正需要大量 CPU 的任务。目前,这是 Python 开发人员(使用 CPython 解释器)可以使用的唯一内置并发解决方案,它允许您从多处理器内核中获益。

使用多个进程的另一个优点是它们不共享内存上下文。因此,破坏数据和在应用程序中引入死锁会更加困难。不共享内存上下文意味着您需要一些额外的工作来在不同的进程之间传递数据,但幸运的是,有许多好方法可以实现可靠的进程间通信。事实上,Python 提供了一些原语,使进程之间的通信在线程之间尽可能容易。

在任何编程语言中,启动新进程的最基本方式通常是在某个点上通过分叉程序。在 POSIX 系统(Unix、Mac OS 和 Linux)上,fork 是 Python 中通过os.fork()函数公开的系统调用,它将创建一个新的子进程。然后,这两个进程在分叉后立即继续自己的程序。下面是一个只分叉一次的示例脚本:

import os

pid_list = []

def main():
    pid_list.append(os.getpid())
    child_pid = os.fork()

    if child_pid == 0:
        pid_list.append(os.getpid())
        print()
        print("CHLD: hey, I am the child process")
        print("CHLD: all the pids i know %s" % pid_list)

    else:
        pid_list.append(os.getpid())
        print()
        print("PRNT: hey, I am the parent")
        print("PRNT: the child is pid %d" % child_pid)
        print("PRNT: all the pids i know %s" % pid_list)

if __name__ == "__main__":
    main()

下面是一个在终端中运行它的示例:

$ python3 forks.py

PRNT: hey, I am the parent
PRNT: the child is pid 21916
PRNT: all the pids i know [21915, 21915]

CHLD: hey, I am the child process
CHLD: all the pids i know [21915, 21916]

注意在os.fork()调用之前,两个进程的数据初始状态完全相同。它们都具有与pid_list集合的第一个值相同的 PID 编号(进程标识符)。后来,两种状态都出现了分歧,我们可以看到子进程添加了21916值,而父进程复制了其21915PID。这是因为这两个进程的内存上下文不共享。它们具有相同的初始条件,但在os.fork()呼叫后不会相互影响。

将 fork 内存上下文复制到子进程后,每个进程处理自己的地址空间。为了进行通信,流程需要使用系统范围的资源或使用低级工具,例如信号

不幸的是,os.fork在 Windows 下不可用,需要生成一个新的解释器来模拟 fork 特性。因此,它需要根据平台的不同而有所不同。os模块还公开了允许您在 Windows 下生成新进程的函数,但最终您将很少使用它们。os.fork()也是如此。Python 提供了一个很棒的multiprocessing模块,它为多处理创建了一个高级接口。这个模块的最大优点是它提供了一些抽象,我们必须在一个线程应用程序部分的示例中从头开始编写这些抽象。它允许您限制样板代码的数量,从而提高应用程序的可维护性并降低其复杂性。令人惊讶的是,尽管名称不同,multiprocessing模块也为线程公开了类似的接口,因此您可能希望对这两种方法使用相同的接口。

内置多处理模块

multiprocessing提供了一种可移植的方式,可以像处理线程一样处理进程。

此模块包含一个与Thread类非常相似的Process类,可以在任何平台上使用:

from multiprocessing import Process
import os

def work(identifier):
    print(
        'hey, i am a process {}, pid: {}'
        ''.format(identifier, os.getpid())
    )

def main():
    processes = [
        Process(target=work, args=(number,))
        for number in range(5)
    ]
    for process in processes:
        process.start()

    while processes:
        processes.pop().join()

if __name__ == "__main__":
    main()

前面的脚本在执行时给出以下结果:

$ python3 processing.py
hey, i am a process 1, pid: 9196
hey, i am a process 0, pid: 8356
hey, i am a process 3, pid: 9524
hey, i am a process 2, pid: 3456
hey, i am a process 4, pid: 6576

创建进程时,内存被分叉(在 POSIX 系统上)。进程的最有效使用是让它们在创建后自行工作,以避免开销,并从主线程检查它们的状态。除了复制的内存状态外,Process类还在其构造函数中提供了一个额外的args参数,以便可以传递数据。

进程模块之间的通信需要一些额外的工作,因为它们的本地内存在默认情况下是不共享的。为简化此过程,多处理模块提供了进程间通信的几种方式:

  • 使用multiprocessing.Queue类,这是queue.Queue的一个近似克隆,之前用于线程之间的通信
  • 使用multiprocessing.Pipe,这是一个类似套接字的双向通信通道
  • 使用multiprocessing.sharedctypes模块,它允许您在进程之间共享的专用内存池中创建任意 C 类型(来自ctypes模块)

multiprocessing.Queue类和queue.Queue类具有相同的接口。唯一的区别是,第一种设计用于多进程环境,而不是多线程,因此它使用不同的内部传输和锁定原语。我们已经在一个线程化应用程序部分的示例中看到了如何将队列与多线程一起使用,因此我们不会对多处理执行相同的操作。用法保持完全相同,因此这样的示例不会带来任何新内容。

Pipe类现在提供了一个更有趣的模式。它是一个双工(双向)通信通道,在概念上与 Unix 管道非常相似。管道接口也非常类似于内置socket模块的简单插座。与原始系统管道和套接字的区别在于,它允许您发送任何可拾取的对象(使用pickle模块),而不仅仅是原始字节。这使得进程之间的通信更加容易,因为您可以发送几乎任何基本的 Python 类型:

from multiprocessing import Process, Pipe

class CustomClass:
    pass

def work(connection):
    while True:
        instance = connection.recv()

        if instance:
            print("CHLD: {}".format(instance))

        else:
            return

def main():
    parent_conn, child_conn = Pipe()

    child = Process(target=work, args=(child_conn,))

    for item in (
        42,
        'some string',
        {'one': 1},
        CustomClass(),
        None,
    ):
        print("PRNT: send {}:".format(item))
        parent_conn.send(item)

    child.start()
    child.join()

if __name__ == "__main__":
    main()

当查看前面脚本的示例输出时,您将看到您可以轻松地传递自定义类实例,并且它们具有不同的地址,具体取决于进程:

PRNT: send: 42
PRNT: send: some string
PRNT: send: {'one': 1}
PRNT: send: <__main__.CustomClass object at 0x101cb5b00>
PRNT: send: None
CHLD: recv: 42
CHLD: recv: some string
CHLD: recv: {'one': 1}
CHLD: recv: <__main__.CustomClass object at 0x101cba400>

另一种在进程之间共享状态的方法是使用共享内存池中的原始类型和multiprocessing.sharedctypes中提供的类。最基本的是ValueArray。以下是multiprocessing模块官方文档中的示例代码:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

此示例将打印以下输出:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

在使用multiprocessing.sharedctypes时,您需要记住您正在处理共享内存,因此为了避免数据损坏的风险,您需要使用锁定原语。多处理提供了线程中可用的一些类,如LockRLockSemaphore来实现这一点。sharedctypes类的缺点是,它们只允许您共享ctypes模块中的基本 C 类型。如果需要传递更复杂的结构或类实例,则需要使用队列、管道或其他进程间通信通道。在大多数情况下,避免使用sharedctypes中的类型是合理的,因为它们增加了代码复杂性,并带来了多线程中已知的所有危险。

使用流程池

使用多个进程而不是线程会增加大量开销。通常,它会增加内存占用,因为每个进程都有自己独立的内存上下文。这意味着,与多线程应用程序相比,允许子进程的数量不受限制是一个更大的问题。

在依赖多处理以提高资源利用率的应用程序中,控制资源使用的最佳模式是以类似于使用线程池一节中描述的线程的方式构建进程池。

multiprocessing模块最好的地方在于它提供了一个随时可用的Pool类,可以为您处理管理多个流程工作人员的所有复杂性。此池实现大大减少了所需的样板文件数量和与双向通信相关的问题数量。您也不需要手动使用join()方法,因为池可以用作上下文管理器(使用with语句)。下面是我们先前编写的线程示例之一,它使用了multiprocessing模块中的Pool类:

from multiprocessing import Pool

from gmaps import Geocoding

api = Geocoding()

PLACES = (
    'Reykjavik', 'Vien', 'Zadar', 'Venice',
    'Wrocław', 'Bolognia', 'Berlin', 'Słubice',
    'New York', 'Dehli',
)

POOL_SIZE = 4

def fetch_place(place):
    return api.geocode(place)[0]

def present_result(geocoded):
    print("{:>25s}, {:6.2f}, {:6.2f}".format(
        geocoded['formatted_address'],
        geocoded['geometry']['location']['lat'],
        geocoded['geometry']['location']['lng'],
    ))

def main():
    with Pool(POOL_SIZE) as pool:
        results = pool.map(fetch_place, PLACES)

    for result in results:
        present_result(result)

if __name__ == "__main__":
    main()

如您所见,代码现在要短得多。这意味着在出现问题时,现在更容易维护和调试。实际上,现在只有两行代码显式地处理多处理。与我们必须从头构建处理池的情况相比,这是一个很大的改进。现在我们甚至不需要关心通信通道,因为它们是在Pool实现内部隐式创建的。

使用 multiprocessing.dummy 作为多线程接口

来自multiprocessing模块的高级抽象,例如Pool类,比threading模块中提供的简单工具有很大的优势。但不,这并不意味着多处理总是比多线程更好的方法。在许多用例中,线程可能是比进程更好的解决方案。对于需要低延迟和/或高资源效率的情况尤其如此。

但这并不意味着,只要您想使用线程而不是进程,就需要牺牲multiprocessing模块中所有有用的抽象。还有一个multiprocessing.dummy模块,它复制multiprocessingAPI,但使用多个线程,而不是分叉/生成新进程。

这使您可以减少代码中的样板文件数量,并使接口更易于插拔。例如,让我们再看看前面示例中的main()函数。如果我们想让用户控制他想要使用哪个处理后端(进程或线程),我们可以简单地通过替换Pool类:

from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool

def main(use_threads=False):
    if use_threads:
        pool_cls = ThreadPool
    else:
        pool_cls = ProcessPool

    with pool_cls(POOL_SIZE) as pool:
        results = pool.map(fetch_place, PLACES)

    for result in results:
        present_result(result)

异步编程

异步编程近年来得到了广泛的关注。在 Python3.5 中,它最终获得了一些语法特性,这些特性巩固了异步执行的概念。但这并不意味着异步编程只能从 Python 3.5 开始。很多库和框架都是在更早的时候提供的,其中大多数都起源于 Python2 的旧版本。甚至还有一个完整的 Python 替代实现称为 Stackless(参见第 1 章Python 的当前状态),它集中于这种单一的编程方法。其中一些解决方案,如 Twisted、Tornado 或 Eventlet,仍然拥有庞大而活跃的社区,非常值得了解。无论如何,从 Python3.5 开始,异步编程比以往任何时候都容易。因此,预计其内置的异步特性将取代旧工具的更大部分,或者外部项目将逐渐转变为一种基于 Python 内置的高级框架。

当试图解释什么是异步编程时,最简单的方法是将此方法视为类似于线程但不涉及系统调度的方法。这意味着异步程序可以并发处理问题,但其上下文是在内部切换的,而不是由系统调度器切换的。

但是,当然,在异步程序中,我们不使用线程来并发处理工作。大多数解决方案使用不同的概念,并且根据实现的不同,其名称也不同。用于描述此类并发程序实体的一些示例名称包括:

  • Green 线程或 greenlet(greenlet、gevent 或 eventlet 项目)
  • 协同程序(Python 3.5 本机异步编程)
  • 微线程(无堆栈 Python)

这些基本上是相同的概念,但通常以稍微不同的方式实现。出于显而易见的原因,在本节中,我们将只关注 Python 从 3.5 版开始本机支持的协同路由。

协同多任务和异步 I/O

协同多任务是异步编程的核心。在这种类型的计算机多任务处理中,启动上下文切换(到另一个进程或线程)不是操作系统的责任,而是每个进程在空闲时自动释放控制,以允许同时执行多个程序。这就是为什么它被称为合作社。所有流程都需要协作才能顺利完成多任务。

这种多任务模型有时在操作系统中使用,但现在几乎没有发现它是一种系统级解决方案。这是因为存在一种风险,即一个设计糟糕的服务很容易破坏整个系统的稳定性。使用操作系统直接管理的上下文开关进行线程和进程调度现在是系统级并发的主要方法。但在应用程序级别上,协作多任务仍然是一个很好的并发工具。

当谈到应用程序级别的协作多任务时,我们不处理需要释放控制的线程或进程,因为所有执行都包含在单个进程和线程中。相反,我们有多个任务(协程、微线程和绿色线程),它们将控制权释放给处理任务协调的单个函数。此函数通常是某种事件循环。

为了避免以后的混淆(由于 Python 术语),从现在开始,我们将引用诸如协程之类的并发任务。协作多任务处理中最重要的问题是何时释放控制。在大多数异步应用程序中,控制权在 I/O 操作时释放给调度程序或事件循环。无论程序是从文件系统读取数据还是通过套接字进行通信,这样的 I/O 操作总是与进程空闲时的等待时间有关。等待时间取决于外部资源,因此这是一个释放控制的好机会,以便其他协程可以完成它们的工作,直到它们也需要等待。

这使得这种方法在行为上与 Python 中实现多线程的方式有些相似。我们知道 GIL 序列化 Python 线程,但它也在每个 I/O 操作中发布。主要区别在于 Python 中的线程是作为系统级线程实现的,因此操作系统可以抢占当前正在运行的线程,并在任何时间点将控制权交给另一个线程。在异步编程中,任务永远不会被主事件循环抢占。这就是为什么这种类型的多任务处理也被称为非先发制人的多任务处理。

当然,每个 Python 应用程序都运行在一个操作系统上,在这个操作系统中,还有其他进程在争夺资源。这意味着操作系统始终有权抢占整个进程,并将控制权交给另一个进程。但是,当异步应用程序返回运行时,它会从系统调度器介入时暂停的位置继续运行。这就是为什么协同程序仍然被认为是非强制性的。

Python 异步和等待关键字

asyncawait关键字是 Python 异步编程的主要构建块。

def语句定义新的协同程序之前使用的async关键字。在严格定义的情况下,可以暂停和恢复协同程序功能的执行。它的语法和行为与生成器非常相似(请参阅第 2 章语法最佳实践–低于类级别),事实上,生成器需要在 Python 的旧版本中使用,以实现协同路由。下面是使用async关键字的函数声明示例:

async def async_hello():
    print("hello, world!")

使用async关键字定义的函数是特殊的。调用时,它们不会在内部执行代码,而是返回一个协程对象:

>>> async def async_hello():
...     print("hello, world!")
... 
>>> async_hello()
<coroutine object async_hello at 0x1014129e8>

在事件循环中计划执行之前,协程对象不会执行任何操作。asyncio模块可用于提供基本的事件循环实现,以及许多其他异步实用程序:

>>> import asyncio
>>> async def async_hello():
...     print("hello, world!")
... 
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(async_hello())
hello, world!
>>> loop.close()

显然,由于我们只创建了一个简单的协程,所以我们的程序中不涉及并发性。为了看到真正并发的东西,我们需要创建更多将由事件循环执行的任务。

通过调用loop.create_task()方法或提供另一个对象以等待使用asyncio.wait()函数,可以将新任务添加到循环中。我们将使用后一种方法,尝试异步打印由range()函数生成的数字序列:

import asyncio

async def print_number(number):
    print(number)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    loop.run_until_complete(
        asyncio.wait([
            print_number(number)
            for number in range(10)
        ])
    )
    loop.close()

asyncio.wait()函数接受一系列协同程序对象并立即返回。结果是一个生成器,它生成表示未来结果(未来)的对象。顾名思义,它用于等待所提供的所有协同路由完成。它返回生成器而不是协程对象的原因是与 Python 的早期版本向后兼容,这将在后面解释。运行此脚本的结果可能如下所示:

$ python asyncprint.py 
0
7
8
3
9
4
1
5
2
6

正如我们所看到的,数字的打印顺序与我们创建协同程序的顺序不同。但这正是我们想要实现的。

Python 3.5 中添加的第二个重要关键字是await。它用于等待协程或未来(稍后解释)的结果,并将对执行的控制释放给事件循环。为了更好地理解它是如何工作的,我们需要查看一个更复杂的代码示例。

假设我们想要创建两个协同程序,它们将在一个循环中执行一些简单的任务:

  • 等待随机数秒
  • 打印一些作为参数提供的文本和睡眠时间

让我们从一个简单的实现开始,该实现存在一些并发性问题,稍后我们将尝试通过额外的await使用来改进这些问题:

import time
import random
import asyncio

async def waiter(name):
    for _ in range(4):
        time_to_sleep = random.randint(1, 3) / 4
        time.sleep(time_to_sleep)
        print(
            "{} waited {} seconds"
            "".format(name, time_to_sleep)
        )

async def main():
    await asyncio.wait([waiter("foo"), waiter("bar")])

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

在终端执行时(使用time命令测量时间),可能会给出以下输出:

$ time python corowait.py 
bar waited 0.25 seconds
bar waited 0.25 seconds
bar waited 0.5 seconds
bar waited 0.5 seconds
foo waited 0.75 seconds
foo waited 0.75 seconds
foo waited 0.25 seconds
foo waited 0.25 seconds

real	0m3.734s
user	0m0.153s
sys	0m0.028s

正如我们所看到的,这两个协同路由都完成了它们的执行,但不是以异步方式完成的。原因是它们都使用time.sleep()功能,该功能阻止但不将控件释放到事件循环。这在多线程设置中会更好,但我们现在不想使用线程。那么我们如何解决这个问题呢?

答案是使用asyncio.sleep(),这是time.sleep()的异步版本,并使用await关键字等待其结果。我们已经在main()函数的第一个版本中使用了这个语句,但它只是为了提高代码的清晰度。它显然没有使我们的实现更加并行。让我们来看一个使用await asyncio.sleep()waiter()协同程序的改进版本:

async def waiter(name):
    for _ in range(4):
        time_to_sleep = random.randint(1, 3) / 4
        await asyncio.sleep(time_to_sleep)
        print(
            "{} waited {} seconds"
            "".format(name, time_to_sleep)
        )

如果我们运行更新后的脚本,我们可以看到两个函数的输出是如何相互交错的:

$ time python corowait_improved.py 
bar waited 0.25 seconds
foo waited 0.25 seconds
bar waited 0.25 seconds
foo waited 0.5 seconds
foo waited 0.25 seconds
bar waited 0.75 seconds
foo waited 0.25 seconds
bar waited 0.5 seconds

real  0m1.953s
user  0m0.149s
sys   0m0.026s

这种简单改进的另一个优点是代码运行速度更快。总的执行时间小于所有睡眠时间的总和,因为协同程序协同释放控制。

旧版本 Python 中的异步 IO

asyncio模块出现在 Python 3.4 中。因此,在 Python3.5 之前,它是唯一一个严重支持异步编程的 Python 版本。不幸的是,这两个后续版本似乎足以引入兼容性问题。

不管你喜欢与否,Python 中异步编程的核心比支持此模式的语法元素引入得更早。迟做总比不做好,但这造成了一种情况,即有两种语法可用于协同程序。

从 Python 3.5 开始,您可以使用asyncawait

async def main():
    await asyncio.sleep(0)

但是对于 Python 3.4,您需要使用asyncio.coroutine装饰器和yield from语句:

@asyncio.couroutine
def main():
    yield from asyncio.sleep(0)

另一个有用的事实是,yield from语句是在 Python3.3 中引入的,PyPI 上有一个asyncio后端口。这意味着您也可以在 Python3.3 中使用这种协作多任务的实现。

异步编程的一个实例

正如本章已经多次提到的,异步编程是处理 I/O 绑定操作的一个很好的工具。因此,是时候构建比简单的序列打印或异步等待更实用的东西了。

为了保持一致性,我们将尝试处理通过多线程和多处理解决的相同问题。因此,我们将尝试通过网络连接从外部资源异步获取一些数据。如果我们能使用与前面章节相同的python-gmaps包,那就太好了。不幸的是,我们不能。

python-gmaps的创建者有点懒,走了一条捷径。为了简化开发,他选择了一个requests包作为他的 HTTP 客户端库。遗憾的是,requests不支持与asyncawait的异步 I/O。还有一些其他项目旨在为requests项目提供一些并发性,但它们要么依赖 Gevent(grequests,请参考https://github.com/kennethreitz/grequests 或线程/进程池执行(requests-futures,参见https://github.com/ross/requests-futures 。这两者都不能解决我们的问题。

在你因为我在责骂一个无辜的开源开发人员而感到不安之前,请冷静下来。python-gmaps包背后的人是我。依赖项选择不当是本项目的问题之一。我只是喜欢不时地公开批评自己。这对我来说应该是一个痛苦的教训,因为python-gmaps在其最新版本(在编写本书时为 0.3.1)中无法轻松地与 Python 的异步 I/O 集成。无论如何,这在将来可能会改变,因此不会丢失任何东西。

在前面的例子中,我们知道库的局限性非常容易使用,因此我们需要构建一些东西来填补这一空白。GoogleMapsAPI 的使用非常简单,因此我们将构建一个快速而肮脏的异步实用程序,仅用于演示目的。版本 3.5 中 Python 的标准库仍然缺少一个库,该库可以使异步 HTTP 请求像调用urllib.urlopen()一样简单。我们肯定不想从头开始构建整个协议支持,因此我们将使用 PyPI 上提供的aiohttp包中的一些帮助。这是一个非常有前途的库,它为异步 HTTP 添加了客户端和服务器实现。这是一个构建在aiohttp之上的小模块,它创建了一个geocode()助手函数,该函数向 Google Maps API 服务发出地理编码请求:

import aiohttp

session = aiohttp.ClientSession()

async def geocode(place):
    params = {
        'sensor': 'false',
        'address': place
    }
    async with session.get(
        'https://maps.googleapis.com/maps/api/geocode/json',
        params=params
    ) as response:
        result = await response.json()
        return result['results']

假设此代码存储在名为asyncgmaps的模块中,我们稍后将使用该模块。现在我们准备重写讨论多线程和多处理时使用的示例。之前,我们将整个操作分为两个单独的步骤:

  1. 使用fetch_place()功能并行执行对外部服务的所有请求。
  2. 使用present_result()功能在循环中显示所有结果。

但是,由于协作多任务处理与使用多个进程或线程完全不同,我们可以稍微修改我们的方法。在中提出的大多数问题一项使用一个线程不再是我们关注的问题。协同路由是非抢先的,因此我们可以在等待 HTTP 响应后立即轻松显示结果。这将简化我们的代码并使其更清晰:

import asyncio
# note: local module introduced earlier
from asyncgmaps import geocode, session

PLACES = (
    'Reykjavik', 'Vien', 'Zadar', 'Venice',
    'Wrocław', 'Bolognia', 'Berlin', 'Słubice',
    'New York', 'Dehli',
)

async def fetch_place(place):
    return (await geocode(place))[0]

async def present_result(result):
    geocoded = await result
    print("{:>25s}, {:6.2f}, {:6.2f}".format(
        geocoded['formatted_address'],
        geocoded['geometry']['location']['lat'],
        geocoded['geometry']['location']['lng'],
    ))

async def main():
    await asyncio.wait([
        present_result(fetch_place(place))
        for place in PLACES
    ])

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # aiohttp will raise issue about unclosed
    # ClientSession so we perform cleanup manually
    loop.run_until_complete(session.close())
    loop.close()

使用 futures 将非同步代码与异步代码集成

异步编程非常好,特别是对于对构建可伸缩的应用程序感兴趣的后端开发人员。实际上,它是构建高度并发服务器的最重要工具之一。

但现实是痛苦的。许多处理 I/O 绑定问题的流行包并不打算与异步代码一起使用。主要原因是:

  • Python 3 及其一些高级功能的采用率仍然很低
  • Python 初学者对各种并发概念的理解程度较低

这意味着,现有同步多线程应用程序和包的迁移通常是不可能的(由于体系结构的限制),或者过于昂贵。许多项目可以从合并异步风格的多任务处理中获益匪浅,但最终只有少数项目能够做到这一点。

这意味着现在,在尝试从一开始构建异步应用程序时,您将遇到很多困难。在大多数情况下,这将类似于异步编程部分中提到的问题—接口不兼容和 I/O 操作的非同步阻塞。

当然,当您遇到这种不兼容时,您有时可以退出await,而只是同步获取所需的资源。但是,在等待结果时,这将阻止其他所有协同程序执行其代码。它在技术上是可行的,但也破坏了异步编程的所有收益。因此,最终,将异步 I/O 与同步 I/O 结合起来不是一种选择。这是一种“要么全有要么全无”的游戏。

另一个问题是长时间运行 CPU 限制的操作。在执行 I/O 操作时,从协同程序中释放控制不是问题。当从文件系统或套接字进行写/读操作时,您最终会等待,因此使用await调用是最好的方法。但是,当你需要实际计算一些东西,并且你知道这需要一段时间时,该怎么做呢?当然,您可以将问题分割成多个部分,并在每次将工作向前推进一点时释放控制。但是你很快就会发现这不是一个好的模式。这样的事情可能会把代码弄得一团糟,也不能保证好的结果。时间选择应由解释器或操作系统负责。

那么,如果您有一些无法或不愿重写的长时间同步 I/O 操作代码,该怎么办呢。或者,在一个主要考虑异步 I/O 的应用程序中,当您不得不执行一些繁重的 CPU 限制操作时,该怎么办?好你需要使用一种变通方法。我所说的变通方法是指多线程或多处理。

这听起来可能不太好,但有时最好的解决方案可能是我们试图逃避的方案。在 Python 中,使用多处理总是可以更好地并行处理 CPU 扩展任务。如果设置正确并小心处理,多线程可以处理与asyncawait一样好的 I/O 操作(快速且无大量资源开销)。

因此,有时当您不知道该做什么时,当某些事情根本不适合您的异步应用程序时,使用一段代码将其延迟到单独的线程或进程。您可以假设这是一个协同程序,将控件释放到事件循环,并最终在结果准备就绪时处理结果。幸运的是,Python 标准库提供了concurrent.futures模块,该模块也与asyncio模块集成。这两个模块一起允许您调度在线程或其他进程中执行的阻塞函数,因为它是异步非阻塞协程。

执行人与期货

在我们了解如何将线程或进程注入异步事件循环之前,我们将仔细了解concurrent.futures模块,该模块稍后将成为我们所谓变通方法的主要组成部分。

concurrent.futures模块中最重要的类是ExecutorFuture

Executor表示可以并行处理工作项的资源池。这在用途上可能与来自multiprocessing模块Pooldummy.Pool的类非常相似,但具有完全不同的接口和语义。它是一个不用于实例化的基类,有两个具体实现:

  • ThreadPoolExecutor:这是表示线程池的线程
  • ProcessPoolExecutor:这是一个表示流程池的流程

每个执行人提供三种方法:

  • submit(fn, *args, **kwargs):调度fn函数在资源池上执行,并返回表示可调用函数执行的Future对象
  • map(func, *iterables, timeout=None, chunksize=1):这将以与multiprocessing.Pool.map()方法类似的方式在一个 iterable 上执行 func 函数
  • shutdown(wait=True):这将关闭执行器并释放其所有资源

最有趣的方法是submit(),因为它返回Future对象。它表示可调用对象的异步执行,并且仅间接表示其结果。为了获得提交的 callable 的实际返回值,需要调用Future.result()方法。如果调用已经完成,result()方法不会阻止它,只会返回函数输出。如果它不是真的,它将阻止它,直到结果准备就绪。将其视为对结果的承诺(实际上,它与 JavaScript 中的承诺是相同的概念)。您不需要在收到后立即打开包装(使用result()方法),但如果您尝试这样做,则保证最终会返回某些内容:

>>> def loudy_return():
...     print("processing")
...     return 42
... 
>>> from concurrent.futures import ThreadPoolExecutor
>>> with ThreadPoolExecutor(1) as executor:
...     future = executor.submit(loudy_return)
... 
processing
>>> future
<Future at 0x33cbf98 state=finished returned int>
>>> future.result()
42

如果您想使用Executor.map()方法,它的用法与multiprocessing模块中池类的Pool.map()方法没有区别:

def main():
    with ThreadPoolExecutor(POOL_SIZE) as pool:
        results = pool.map(fetch_place, PLACES)

    for result in results:
        present_result(result)

在事件循环中使用执行器

Executor.submit()方法返回的Future类实例在概念上非常接近异步编程中使用的协程。这就是为什么我们可以使用执行器在协作多任务和多处理或多线程之间进行混合。

此解决方案的核心是事件循环类的 BaseEventLoop.run_in_executor(executor, func, *args)方法。它允许您在由executor参数表示的进程或线程池中调度func函数的执行。该方法最重要的一点是,它返回一个新的等待(一个可以通过await语句等待的对象)。因此,多亏了这一点,您可以执行一个阻塞函数,它不是一个完全相同的协程,并且无论需要多长时间来完成,它都不会阻塞。它将仅停止等待此类调用结果的函数,但整个事件循环仍将继续旋转。

一个有用的事实是,您甚至不需要创建 executor 实例。如果将None作为 executor 参数传递,ThreadPoolExecutor类将与默认线程数一起使用(对于 Python 3.5,它是处理器数乘以 5)。

因此,让我们假设我们不想重写python-gmaps包中引起我们头痛的问题部分。我们可以通过loop.run_in_executor()调用轻松地将阻塞调用推迟到一个单独的线程,同时仍然将fetch_place()函数作为一个可等待的协程:

async def fetch_place(place):
    coro = loop.run_in_executor(None, api.geocode, place)
    result = await coro
    return result[0]

这样的解决方案不如拥有一个完全异步的库来完成这项工作,但您知道半块面包总比没有面包好

总结

这是一个漫长的旅程,但我们成功地克服了 Python 程序员可用的最基本的并发编程方法。

在解释了什么是并发之后,我们开始行动,借助多线程技术剖析了一个典型的并发问题。在识别代码的基本缺陷并修复它们之后,我们转向多处理,以了解它在我们的案例中如何工作。

我们发现,使用multiprocessing模块比使用threading模块的基本线程更容易使用多个进程。但就在那之后,我们意识到,由于multiprocessing.dummy,我们也可以对线程使用相同的 API。因此,在多处理和多线程之间的选择现在只是哪个解决方案更适合这个问题,而不是哪个解决方案有更好的接口。

说到问题匹配,我们最终尝试了异步编程,这应该是 I/O 绑定应用程序的最佳解决方案,但我们意识到我们不能完全忘记线程和进程。于是我们绕了一圈,回到起点!

这就引出了本章的最终结论。没有银弹。您可能更喜欢或更喜欢一些方法。有一些方法可能更适合于一组给定的问题,但为了成功,您需要了解所有这些方法。在现实场景中,您可能会发现自己在单个应用程序中使用了所有并发工具和样式,这并不少见。

前面的结论是对下一章的主题第 14 章有用的设计模式的一个很好的介绍。这是因为没有一种模式可以解决所有问题。你应该知道尽可能多的,因为最终你会每天都使用它们。