如何在Python 3中使用ThreadPoolExecutor_cukw6666的博客-程序员信息网

技术标签: python  java  linux  编程语言  人工智能  

The author selected the COVID-19 Relief Fund to receive a donation as part of the Write for DOnations program.

作者选择了COVID-19救济基金来接受捐赠,这是Write for DOnations计划的一部分。

介绍 (Introduction)

Python threads are a form of parallelism that allow your program to run multiple procedures at once. Parallelism in Python can also be achieved using multiple processes, but threads are particularly well suited to speeding up applications that involve significant amounts of I/O (input/output).

Python 线程是一种并行形式,它允许您的程序一次运行多个过程。 Python中的并行性也可以使用多个进程来实现,但是线程特别适合加速包含大量I / O(输入/输出)的应用程序。

Example I/O-bound operations include making web requests and reading data from files. In contrast to I/O-bound operations, CPU-bound operations (like performing math with the Python standard library) will not benefit much from Python threads.

I / O绑定操作示例包括发出Web请求和从文件读取数据。 与I / O绑定操作相比, CPU绑定操作 (例如使用Python标准库执行数学运算)不会从Python线程中受益太多。

Python 3 includes the ThreadPoolExecutor utility for executing code in a thread.

Python 3包含ThreadPoolExecutor实用程序,用于在线程中执行代码。

In this tutorial, we will use ThreadPoolExecutor to make network requests expediently. We’ll define a function well suited for invocation within threads, use ThreadPoolExecutor to execute that function, and process results from those executions.

在本教程中,我们将使用ThreadPoolExecutor方便地发出网络请求。 我们将定义一个非常适合在线程内调用的函数,使用ThreadPoolExecutor执行该函数,并处理这些执行的结果。

For this tutorial, we’ll make network requests to check for the existence of Wikipedia pages.

对于本教程,我们将发出网络请求以检查Wikipedia页面的存在。

Note: The fact that I/O-bound operations benefit more from threads than CPU-bound operations is caused by an idiosyncrasy in Python called the, global interpreter lock. If you’d like, you can learn more about Python’s global interpreter lock in the official Python documentation.

注意: I / O绑定操作比CPU绑定操作更多地受益于线程这一事实是由于Python中一种称为全局解释器锁的特性引起的。 如果愿意,您可以在Python 官方文档中了解有关Python的全局解释器锁的更多信息。

先决条件 (Prerequisites)

To get the most out of this tutorial, it is recommended to have some familiarity with programming in Python and a local Python programming environment with requests installed.

为了充分利用本教程,建议您熟悉Python编程以及安装了requests的本地Python编程环境。

You can review these tutorials for the necessary background information:

您可以查看这些教程以获取必要的背景信息:

  • pip install --user requests==2.23.0

    pip install-用户请求== 2.23.0

步骤1 —定义要在线程中执行的函数 (Step 1 — Defining a Function to Execute in Threads)

Let’s start by defining a function that we’d like to execute with the help of threads.

让我们首先定义一个我们希望在线程帮助下执行的函数。

Using nano or your preferred text editor/development environment, you can open this file:

使用nano或您首选的文本编辑器/开发环境,可以打开以下文件:

  • nano wiki_page_function.py

    纳米wiki_page_function.py

For this tutorial, we’ll write a function that determines whether or not a Wikipedia page exists:

在本教程中,我们将编写一个确定Wikipedia页面是否存在的函数:

wiki_page_function.py
wiki_page_function.py
import requests

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

The get_wiki_page_existence function accepts two arguments: a URL to a Wikipedia page (wiki_page_url), and a timeout number of seconds to wait for a response from that URL.

get_wiki_page_existence 函数接受两个参数:Wikipedia页面的URL( wiki_page_url ),以及等待该URL响应的timeout秒数。

get_wiki_page_existence uses the requests package to make a web request to that URL. Depending on the status code of the HTTP response, a string is returned that describes whether or not the page exists. Different status codes represent different outcomes of a HTTP request. This procedure assumes that a 200 “success” status code means the Wikipedia page exists, and a 404 “not found” status code means the Wikipedia page does not exist.

get_wiki_page_existence使用requests包向该URL发出Web请求。 根据HTTP response状态码 ,返回一个字符串,该字符串描述页面是否存在。 不同的状态码表示HTTP请求的不同结果。 此过程假定状态代码为200 “成功”)表示存在Wikipedia页面,状态代码为404 “未找到”)表示Wikipedia页面不存在。

As described in the Prerequisites section, you’ll need the requests package installed to run this function.

如前提条件部分所述,您将需要安装requests包才能运行此功能。

Let’s try running the function by adding the url and function call following the get_wiki_page_existence function:

让我们尝试通过在get_wiki_page_existence函数之后添加url和function调用来get_wiki_page_existence函数:

wiki_page_function.py
wiki_page_function.py
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))

Once you’ve added the code, save and close the file.

添加代码后,保存并关闭文件。

If we run this code:

如果我们运行以下代码:

  • python wiki_page_function.py

    python wiki_page_function.py

We’ll see output like the following:

我们将看到如下输出:


   
    
Output
https://en.wikipedia.org/wiki/Ocean - exists

Calling the get_wiki_page_existence function with a valid Wikipedia page returns a string that confirms the page does, in fact, exist.

使用有效的Wikipedia页面调用get_wiki_page_existence函数将返回一个字符串,确认该页面确实存在。

Warning: In general, it is not safe to share Python objects or state between threads without taking special care to avoid concurrency bugs. When defining a function to execute in a thread, it is best to define a function that performs a single job and does not share or publish state to other threads. get_wiki_page_existence is an example of such a function.

警告:通常,如果不特别注意避免并发错误,则在线程之间共享Python对象或状态是不安全的。 在定义要在线程中执行的函数时,最好定义一个函数,该函数执行单个作业,并且不与其他线程共享或发布状态。 get_wiki_page_existence是此类功能的一个示例。

步骤2 —使用ThreadPoolExecutor在线程中执行函数 (Step 2 — Using ThreadPoolExecutor to Execute a Function in Threads)

Now that we have a function well suited to invocation with threads, we can use ThreadPoolExecutor to perform multiple invocations of that function expediently.

现在我们有了一个非常适合于线程调用的函数,我们可以使用ThreadPoolExecutor地执行该函数的多次调用。

Let’s add the following highlighted code to your program in wiki_page_function.py:

让我们在wiki_page_function.py以下突出显示的代码添加到您的程序中:

wiki_page_function.py
wiki_page_function.py
import requests
import concurrent.futures

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())

Let’s take a look at how this code works:

让我们看一下这段代码是如何工作的:

  • concurrent.futures is imported to give us access to ThreadPoolExecutor.

    concurrent.futures ThreadPoolExecutor导入以使我们能够访问ThreadPoolExecutor

  • A with statement is used to create a ThreadPoolExecutor instance executor that will promptly clean up threads upon completion.

    with语句用于创建ThreadPoolExecutor实例executor ,该executor程序将在完成后立即清理线程。

  • Four jobs are submitted to the executor: one for each of the URLs in the wiki_page_urls list.

    四个作业submittedexecutorwiki_page_urls列表中的每个URL都有一个。

  • Each call to submit returns a Future instance that is stored in the futures list.

    每个submit返回一个Future实例 ,该实例存储在futures列表中。

  • The as_completed function waits for each Future get_wiki_page_existence call to complete so we can print its result.

    as_completed函数等待每个Future get_wiki_page_existence调用完成,以便我们打印其结果。

If we run this program again, with the following command:

如果我们再次运行此程序,请使用以下命令:

  • python wiki_page_function.py

    python wiki_page_function.py

We’ll see output like the following:

我们将看到如下输出:


   
    
Output
https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

This output makes sense: 3 of the URLs are valid Wikipedia pages, and one of them this_page_does_not_exist is not. Note that your output may be ordered differently than this output. The concurrent.futures.as_completed function in this example returns results as soon as they are available, regardless of what order the jobs were submitted in.

该输出很有意义:其中3个URL是有效的Wikipedia页面,而其中一个this_page_does_not_exist不是。 请注意,您的输出顺序可能与此输出不同。 在此示例中, concurrent.futures.as_completed函数在结果可用时立即返回结果,而不管作业以什么顺序提交。

步骤3 —处理线程中运行的函数的异常 (Step 3 — Processing Exceptions From Functions Run in Threads)

In the previous step, get_wiki_page_existence successfully returned a value for all of our invocations. In this step, we’ll see that ThreadPoolExecutor can also raise exceptions generated in threaded function invocations.

在上一步中, get_wiki_page_existence成功返回了我们所有调用的值。 在这一步中,我们将看到ThreadPoolExecutor还可以引发线程函数调用中生成的异常。

Let’s consider the following example code block:

让我们考虑以下示例代码块:

wiki_page_function.py
wiki_page_function.py
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status


wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(
            executor.submit(
                get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
            )
        )
    for future in concurrent.futures.as_completed(futures):
        try:
            print(future.result())
        except requests.ConnectTimeout:
            print("ConnectTimeout.")

This code block is nearly identical to the one we used in Step 2, but it has two key differences:

此代码块与我们在步骤2中使用的代码块几乎相同,但是有两个主要区别:

  • We now pass timeout=0.00001 to get_wiki_page_existence. Since the requests package won’t be able to complete its web request to Wikipedia in 0.00001 seconds, it will raise a ConnectTimeout exception.

    现在,我们将timeout=0.00001传递给get_wiki_page_existence 。 由于requests包将无法在0.00001秒内完成对Wikipedia的Web请求,因此将引发ConnectTimeout异常。

  • We catch ConnectTimeout exceptions raised by future.result() and print out a string each time we do so.

    我们捕获由future.result()引发的ConnectTimeout异常,并每次输出一个字符串。

If we run the program again, we’ll see the following output:

如果再次运行该程序,我们将看到以下输出:


   
    
Output
ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

Four ConnectTimeout messages are printed—one for each of our four wiki_page_urls, since none of them were able to complete in 0.00001 seconds and each of the four get_wiki_page_existence calls raised the ConnectTimeout exception.

打印了四条ConnectTimeout消息-我们的四个wiki_page_urls每一个消息,因为它们都不能够在0.00001秒内完成,并且四个get_wiki_page_existence调用中的每一个都引发ConnectTimeout异常。

You’ve now seen that if a function call submitted to a ThreadPoolExecutor raises an exception, then that exception can get raised normally by calling Future.result. Calling Future.result on all your submitted invocations ensures that your program won’t miss any exceptions raised from your threaded function.

您现在已经看到,如果提交给ThreadPoolExecutor的函数调用引发异常,则可以通过调用Future.result正常引发该异常。 在所有提交的调用上调用Future.result可确保您的程序不会丢失线程函数引发的任何异常。

步骤4 —比较有线程和无线程的执行时间 (Step 4 — Comparing Execution Time With and Without Threads)

Now let’s verify that using ThreadPoolExecutor actually makes your program faster.

现在,让我们验证使用ThreadPoolExecutor实际上可以使您的程序更快。

First, let’s time get_wiki_page_existence if we run it without threads:

首先,如果我们在没有线程的情况下运行get_wiki_page_existence ,请让它计时:

wiki_page_function.py
wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
    print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)

In the code example we call our get_wiki_page_existence function with fifty different Wikipedia page URLs one by one. We use the time.time() function to print out the number of seconds it takes to run our program.

在代码示例中,我们get_wiki_page_existence调用了具有五十个不同的Wikipedia页面URL的get_wiki_page_existence函数。 我们使用time.time()函数打印出运行程序所需的秒数。

If we run this code again as before, we’ll see output like the following:

如果像以前一样再次运行此代码,我们将看到如下输出:


   
    
Output
Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

Entries 2–47 in this output have been omitted for brevity.

为简洁起见,此输出中的条目2–47已被省略。

The number of seconds printed after Without threads time will be different when you run it on your machine—that’s OK, you are just getting a baseline number to compare with a solution that uses ThreadPoolExecutor. In this case, it was ~5.803 seconds.

在计算机上运行“无线程Without threads time后打印的秒数会有所不同-没关系,您只是获得了一个基线数,可以与使用ThreadPoolExecutor的解决方案进行比较。 在这种情况下,它是~5.803秒。

Let’s run the same fifty Wikipedia URLs through get_wiki_page_existence, but this time using ThreadPoolExecutor:

让我们通过get_wiki_page_existence运行相同的五十个Wikipedia URL,但这一次使用ThreadPoolExecutor

wiki_page_function.py
wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
print("Threaded time:", time.time() - threaded_start)

The code is the same code we created in Step 2, only with the addition of some print statements that show us the number of seconds it takes to execute our code.

该代码与我们在步骤2中创建的代码相同,只是添加了一些打印语句,这些语句向我们显示了执行代码所需的秒数。

If we run the program again, we’ll see the following:

如果再次运行该程序,则会看到以下内容:


   
    
Output
Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

Again, the number of seconds printed after Threaded time will be different on your computer (as will the order of your output).

同样,“ Threaded time后打印的秒数在您的计算机上将有所不同(输出顺序也将有所不同)。

You can now compare the execution time for fetching the fifty Wikipedia page URLs with and without threads.

现在,您可以比较获取带有和不带有线程的五十个Wikipedia页面URL的执行时间。

On the machine used in this tutorial, without threads took ~5.803 seconds, and with threads took ~1.220 seconds. Our program ran significantly faster with threads.

在机器在本教程中使用,无绪花~5.803秒,并用螺纹花~1.220秒。 使用线程,我们的程序运行速度明显加快。

结论 (Conclusion)

In this tutorial, you have learned how to use the ThreadPoolExecutor utility in Python 3 to efficiently run code that is I/O bound. You created a function well suited to invocation within threads, learned how to retrieve both output and exceptions from threaded executions of that function, and observed the performance boost gained by using threads.

在本教程中,您学习了如何在Python 3中使用ThreadPoolExecutor实用程序来有效运行受I / O约束的代码。 您创建了一个非常适合在线程内调用的函数,学习了如何从该函数的线程执行中检索输出和异常,并观察了使用线程获得的性能提升。

From here you can learn more about other concurrency functions offered by the concurrent.futures module.

从这里您可以了解concurrent.futures模块提供的其他并发功能的更多信息。

翻译自: https://www.digitalocean.com/community/tutorials/how-to-use-threadpoolexecutor-in-python-3

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/cukw6666/article/details/108080796

智能推荐

[kuangbin带你飞]专题四 最短路练习 -A - Til the Cows Come Home_Amaswz的博客-程序员信息网

Til the Cows Come HomeTime Limit: 1000MS Memory Limit: 65536KTotal Submissions: 41196 Accepted: 13984DescriptionBessie is out in the field and wants to get back t

高级JAVA开发 分布式系统部分_凌夏大叔的博客-程序员信息网

高级JAVA开发 分布式系统分布式系统分布式系统接口的幂等性分布式锁Redis 普通实现基于Redis的分布式锁框架:Redisson、RedLock基于zookeeper的分布式锁分布式系统Session共享分布式事务两阶段提交/XA协议三阶段提交/TCC机制(Try、Confirm、Cancel)本地消息表分布式系统参考和摘自:中华石杉 《Java工程师面试突击第1季》分布式系统接口的...

ROS节点发布消息控制RVIZ中的MAKER_wanghq2013的博客-程序员信息网

一、简介rviz是ROS的一款二维可视化平台,是图形化的一个ROS功能包,能够对外部传感器信息实现图形化显示,或者通过rviz给对象发布控制信息,实现对机器人的监测与控制。rviz支持丰富的数据类型,通过加载不同的Dispalys类型来可视化。二、节点发布消息到RVIZ创建一个节点,发布消息,使rviz中的maker周期性改变形状.1. 创建一个功能包和节点catkin_create_pkg using_markers roscpp visualization_msgsgedit basi

多视图多行为对比学习推荐系统_机器学习与推荐算法的博客-程序员信息网

嘿,记得给“机器学习与推荐算法”添加星标作者:吴贻清单位:中科院计算所研究方向:多行为推荐多行为推荐(MBR)旨在联合考虑多种行为以提高目标行为的推荐效果。我们认为 MBR 模型应该:(1)对用户不同行为之间的粗粒度共性进行建模,(2)在多行为建模中同时考虑局部的序列视图和全局图视图,以及(3)捕获细粒度的用户的多种行为之间的差异。在这项工作中,我们提出了一种新的多行...

计算机病毒的命名规则_煜铭2011的博客-程序员信息网

0x00 背景 相信大家在遇到计算机中毒的情况时都是首先通过杀毒软件来进行扫描清除的,大多数时候杀毒软件也能把系统中的病毒找出并清除。但是我们往往会忽略查杀后的病毒的具体名字,就算认真观察也只是看懂名字而不知具体的代表含义以及具体起这个名字的缘由。事实上如果我们掌握一些病毒的命名规则,我们就能通过杀毒软件的报告中出现的病毒名来判断该病毒的一些公有的特性了。为此本次的讲解课...

Linux 串口编程(基于RAW模式)_奔跑的码仔的博客-程序员信息网

Linux下的串口编程对于大多数的系统编程人员来说肯定不陌生,特别是对于嵌入式系统开发人员来说,其会经常使用串口与各种各样的硬件设备进行通信。下面总结一下,Linux串口编程的基本模式和常见问题。编程模式Linux下的串口以设备文件的形式存在,所以,对于串口设备的所有操作都时围绕其设备文件而展开。熟悉Linux文件相关操作的开发人员,应该很熟悉下面的模式:打开文件(open);...

随便推点

Apache服务的工作模式详解_江湖小白脸的博客-程序员信息网

Apache工作模式介绍Apache作为现今web服务器用的最广泛也是最稳定的开源服务器软件其工作模式有许多种,源码包安装httpd时可查看httpd-mpm.conf文件,该文件位于extra/conf目录中目前主要有三种模式:event模式prefork模式worker模式event工作模式介绍event是Apache最新的工作模式,它和work...

mysql 记录不存在_怎样使得 MySQL 当记录不存在时插入,当记录存在时更新?_吴君君的博客-程序员信息网

表ID,date,code,prediction, factID 自动增长, primary key当 date 和 code 确定,能确定这一条记录。目的:每天更新昨日预测结果和发布次日预测。假设今天是 2015-12-15 ,记录应该如下:ID date code prediction fact1 2015-12-15 000001 1 12 2015-12-15 000002 1 -13 2...

linux基础(三)——yum的使用_爱吃串串的瘦子的博客-程序员信息网_yum使用

YUM 简介(类似于yum-conda,rpm-pip)YUM 的全称是 Yellowdog Updater, Modified,它是一个 C/S 架构的软件,能够对基于 RPM 格式的软件包进行管理,它提供了包括自动解决依赖关系,软件包的分组,软件包的升级等功能。 2013 年 7 月 10 日, yum 工具的开发者 Seth Vidal 先生因为车祸不幸去世, 我们为计算机领域失去这位专...

动态封杀与解封IP_蝈蝈俊的博客-程序员信息网

我们在应对网站的恶意请求时候,一个解决方法就是把有问题的请求IP封杀掉。 如果想快速处理这种问题,就需要编写一段代码,达到一定门槛,自动封杀。再复杂点就是不是永久封杀,还可以自动在一定时间后解封。 封杀的逻辑代码看后面提供的。 需要说明的是:IIS7时,情况发生了不同。   下面的代码,在处理封杀IP时候,不论IIS6还是IIS7 都可以把需要封杀的IP加入封杀列表。但是需要注意的是我们代码写的是

连接问题:ORA-3136:inbound connection timed out_ii_木兮的博客-程序员信息网_ora-3136

WARNING: inbound connection timed out (ORA-3136) 解决方法SQLNET.INBOUND_CONNECT_TIMEOUT:这个参数是指客户端连接数据库服务认证的时间长,即用户连接DB的时间的,单位是秒。当client在INBOUND_CONNECT_TIMEOUT指定的时间内没有成功连接上服务器(db负载大,网络延时等原因)。那么在服务器的sq...

java项目实现流水号自动增长_逍遥不羁的博客-程序员信息网_java 流水号自增

项目中有一个规则编号字段,从1开始,编号长度为5位,那么第一条数据编号就是00001。实现的基本思路就是项目启动时,从数据库获取当前最大值,作为静态变量存储; 业务获取新的编码,考虑并发问题,获取编码方法(编码自增部分)为synchronized同步方法,如果自增为原子操作,则无需同步; 编码前置0,使用String.format("%05d", newNum);格式化获取。实现代码如下:pub

推荐文章

热门文章

相关标签