您现在的位置是:网站首页> 编程资料编程资料

Python线程池的实现浅析_python_

2023-05-26 420人已围观

简介 Python线程池的实现浅析_python_

雷猴啊,兄弟们!今天来展示一下如何用Python快速实现一个线程池。

一、序言

当有多个 IO 密集型的任务要被处理时,我们自然而然会想到多线程。但如果任务非常多,我们不可能每一个任务都启动一个线程去处理,这个时候最好的办法就是实现一个线程池,至于池子里面的线程数量可以根据业务场景进行设置。

比如我们实现一个有 10 个线程的线程池,这样可以并发地处理 10 个任务,每个线程将任务执行完之后,便去执行下一个任务。通过使用线程池,可以避免因线程创建过多而导致资源耗尽,而且任务在执行时的生命周期也可以很好地把控。

而线程池的实现方式也很简单,但这里我们不打算手动实现,因为 Python 提供了一个标准库 concurrent.futures,已经内置了对线程池的支持。所以本篇文章,我们就来详细介绍一下该模块的用法。

二、正文

1、Future 对象

当我们往线程池里面提交一个函数时,会分配一个线程去执行,同时立即返回一个 Future 对象。通过 Future 对象可以监控函数的执行状态,有没有出现异常,以及有没有执行完毕等等。如果函数执行完毕,内部便会调用 future.set_result 将返回值设置到 future 里面,然后外界便可调用 future.result 拿到返回值。

除此之外 future 还可以绑定回调,一旦函数执行完毕,就会以 future 为参数,自动触发回调。所以 future 被称为未来对象,可以把它理解为函数的一个容器,当我们往线程池提交一个函数时,会立即创建相应的 future 然后返回。函数的执行状态什么的,都通过 future 来查看,当然也可以给它绑定一个回调,在函数执行完毕时自动触发。

那么下面我们就来看一下 future 的用法,文字的话理解起来可能有点枯燥。

将函数提交到线程池里面运行时,会立即返回一个对象

这个对象就叫做 Future 对象,里面包含了函数的执行状态等等

当然我们也可以手动创建一个Future对象。

from concurrent.futures import Future # 创建 Future 对象 future future = Future() # 给 future 绑定回调 def callback(f: Future): print("当set_result的时候会执行回调,result:", f.result()) future.add_done_callback(callback) # 通过 add_done_callback 方法即可给 future 绑定回调 # 调用的时候会自动将 future 作为参数 # 如果需要多个参数,那么就使用偏函数 # 回调函数什么时候执行呢? # 显然是当 future 执行 set_result 的时候 # 如果 future 是向线程池提交函数时返回的 # 那么当函数执行完毕时会自动执行 future.set_result(xx) # 并将自身的返回设置进去 # 而这里的 future 是我们手动创建的,因此需要手动执行 future.set_result("嘿嘿")

当set_result的时候会执行回调,result: 嘿嘿

需要注意的是:只能执行一次 set_result,但是可以多次调用 result 获取结果。

from concurrent.futures import Future future = Future() future.set_result("哼哼") print(future.result()) # 哼哼 print(future.result()) # 哼哼 print(future.result()) # 哼哼

执行 future.result() 之前一定要先 set_result,否则会一直处于阻塞状态。当然 result 方法还可以接收一个 timeout 参数,表示超时时间,如果在指定时间内没有获取到值就会抛出异常。

2、提交函数自动创建 Future 对象

我们上面是手动创建的 Future 对象,但工作中很少会手动创建。我们将函数提交到线程池里面运行的时候,会自动创建 Future 对象并返回。这个 Future 对象里面就包含了函数的执行状态,比如此时是处于暂停、运行中还是完成等等,并且函数在执行完毕之后,还会调用 future.set_result 将自身的返回值设置进去。

from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" # 创建一个线程池 # 里面还可以指定 max_workers 参数,表示最多创建多少个线程 # Python学习交流裙279199867 # 如果不指定,那么每提交一个函数,都会为其创建一个线程 executor = ThreadPoolExecutor() # 通过 submit 即可将函数提交到线程池,一旦提交,就会立刻运行 # 因为开启了一个新的线程,主线程会继续往下执行 # 至于 submit 的参数,按照函数名,对应参数提交即可 # 切记不可写成task("古明地觉", 3),这样就变成调用了 future = executor.submit(task, "屏幕前的你", 3) # 由于函数里面出现了 time.sleep,并且指定的 n 是 3 # 所以函数内部会休眠 3 秒,显然此时处于运行状态 print(future) """  """ # 我们说 future 相当于一个容器,包含了内部函数的执行状态 # 函数是否正在运行中 print(future.running()) """ True """ # 函数是否执行完毕 print(future.done()) """ False """ # 主程序也 sleep 3 秒 time.sleep(3) # 显然此时函数已经执行完毕了 # 并且打印结果还告诉我们返回值类型是 str print(future) """  """ print(future.running()) """ False """ print(future.done()) """ True """ # 函数执行完毕时,会将返回值设置在 future 里 # 也就是说一旦执行了 future.set_result # 那么就表示函数执行完毕了,然后外界可以调用 result 拿到返回值 print(future.result()) """ 屏幕前的你 睡了 3 秒 """

这里再强调一下 future.result(),这一步是会阻塞的,举个例子:

# 提交函数 future = executor.submit(task, "屏幕前的你", 3) start = time.perf_counter() future.result() end = time.perf_counter() print(end - start) # 3.00331525

可以看到,future.result() 这一步花了将近 3s。其实也不难理解,future.result() 是干嘛的?就是为了获取函数的返回值,可函数都还没有执行完毕,它又从哪里获取呢?所以只能先等待函数执行完毕,将返回值通过 set_result 设置到 future 里面之后,外界才能调用 future.result() 获取到值。

如果不想一直等待的话,那么在获取值的时候可以传入一个超时时间。

from concurrent.futures import ( ThreadPoolExecutor, TimeoutError ) import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() future = executor.submit(task, "屏幕前的你", 3) try: # 1 秒之内获取不到值,抛出 TimeoutError res = future.result(1) except TimeoutError: pass # 再 sleep 2 秒,显然函数执行完毕了 time.sleep(2) # 获取返回值 print(future.result()) """ 屏幕前的你 睡了 3 秒 """

当然啦,这么做其实还不够智能,因为我们不知道函数什么时候执行完毕。所以最好的办法还是绑定一个回调,当函数执行完毕时,自动触发回调。

from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" def callback(f): print(f.result()) executor = ThreadPoolExecutor() future = executor.submit(task, "屏幕前的你", 3) # 绑定回调,3 秒之后自动调用 future.add_done_callback(callback) """ 屏幕前的你 睡了 3 秒 """

需要注意的是,在调用 submit 方法之后,提交到线程池的函数就已经开始执行了。而不管函数有没有执行完毕,我们都可以给对应的 future 绑定回调。

如果函数完成之前添加回调,那么会在函数完成后触发回调。如果函数完成之后添加回调,由于函数已经完成,代表此时的 future 已经有值了,或者说已经 set_result 了,那么会立即触发回调。

3、future.set_result 到底干了什么事情

当函数执行完毕之后,会执行 set_result,那么这个方法到底干了什么事情呢?

我们看到 future 有两个被保护的属性,分别是 _result 和 _state。显然 _result 用于保存函数的返回值,而 future.result() 本质上也是返回 _result 属性的值。而 _state 属性则用于表示函数的执行状态,初始为 PENDING,执行中为 RUNING,执行完毕时被设置为 FINISHED。

调用 future.result() 的时候,会判断 _state 的属性,如果还在执行中就一直等待。当 _state 为 FINISHED 的时候,就返回 _result 属性的值。

4、提交多个函数

我们上面每次只提交了一个函数,但其实可以提交任意多个,我们来看一下:

from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() futures = [executor.submit(task, "屏幕前的你", 3), executor.submit(task, "屏幕前的你", 4), executor.submit(task, "屏幕前的你", 1)] # 此时都处于running print(futures) """ [, , ] """ time.sleep(3) # 主程序 sleep 3s 后 # futures[0]和futures[2]处于 finished # futures[1]仍处于 running print(futures) """ [, , ] """

如果是多个函数,要如何拿到返回值呢?很简单,遍历 futures 即可。

executor = ThreadPoolExecutor() futures = [executor.submit(task, "屏幕前的你", 5), executor.submit(task, "屏幕前的你", 2), executor.submit(task, "屏幕前的你", 4), executor.submit(task, "屏幕前的你", 3), executor.submit(task, "屏幕前的你", 6)] for future in futures: print(future.result()) """ 屏幕前的你 睡了 5 秒 屏幕前的你 睡了 2 秒 屏幕前的你 睡了 4 秒 屏幕前的你 睡了 3 秒 屏幕前的你 睡了 6 秒 """

这里面有一些值得说一说的地方,首先 futures 里面有 5 个 future,记做 future1, future2, future3, future4, future5。

当使用 for 循环遍历的时候,实际上会依次遍历这 5 个 future,所以返回值的顺序就是我们添加的函数的顺序。由于 future1 对应的函数休眠了 5s,那么必须等到 5s 后,future1 里面才会有值。

但这五个函数是并发执行的,future2, future3, future4 由于只休眠了 2s, 4s, 3s,所以肯定会先执行完毕,然后执行 set_result,将返回值设置到对应的 future 里。

但 Python 的 for 循环不可能在第一次迭代还没有结束,就去执行第二次迭代。因为 futures 里面的几个 future 的顺序已经一开始就被定好了,只有当第一个 future.result() 执行完成之后,才会执行第二个 future.result(),以及第三个、第四个。

因此即便后面的函数已经执行完毕,但由于 for 循环的顺序,也只能等着,直到前面的 future.result() 执行完毕。所以当第一个 future.result() 结束时,后面三个 future.result() 会立刻输出,因为它们内部的函数已经执行结束了。

而最后一个 future,由于内部函数 sleep 了 6 秒,因此要再等待 1 秒,才会打印 future.result()。

5、使用 map 来提交多个函数

使用 submit 提交函数会返回一个 future,并且还可以给 future 绑定一个回调。但如果不关心回调的话,那么还可以使用 map 进行提交。

executor = ThreadPoolExecutor() # map 内部也是使用了 submit results = executor.map(task, ["屏幕前的你"] * 3, [3, 1, 2]) # 并且返回的是迭代器 print(results) """  """ # 此时遍历得到的是不再是 future # 而是 future.result() for result in results: print(result) """ 屏幕前的你 睡了 3 秒 屏幕前的你 睡了 1 秒 屏幕前的你 睡了 2 秒 """

可以看到,当使用for循环的时候,map 执行的逻辑和 submit 是一样的。唯一的区别是,此时不需要再调用 result 了,因为返回的就是函数的返回值。

或者我们直接调用 list 也行。

executor = ThreadPoolExecutor() results = executor.map(task, ["屏幕前的你"] * 3, [3, 1, 2]) print(list(results)) """ ['屏幕前的你 睡了 3 秒', '屏幕前的你 睡了 1 秒', '屏幕前的你 睡了 2 秒'] """

results 是一个生成器,调用 list 的时候会将里面的值全部产出。由于 map 内部还是使用的 submit,然后通过 future.result() 拿到返回值,而耗时最长的函数需要 3 秒,因此这一步会阻塞 3 秒。3 秒过后,会打印所有函数的返回值。

6、按照顺序等待执行

上面在获取返回值的时候,是按照函数的提交顺序获取的。如果我希望哪个函数先执行完毕,就先获取哪个函数的返回值,该怎么做呢?

from concurrent.futures import ( ThreadPoolExecutor, as_completed ) import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() futures = [executor.submit(task, "屏幕前的你", 5), executor.submit(task, "屏幕前的你", 2), executor.submit(task, "屏幕前的你", 1), executor.submit(task, "屏幕前的你", 3), executor.submit(task, "屏幕前的你", 4)] for future in as_completed(futures): print(future.result()) """ 屏幕前的你 睡了 1 秒 屏幕前的你 睡了 2 秒 屏幕前的你 睡了 3 秒 屏幕前的你 睡了 4 秒 屏幕前的你 睡了 5 秒 """

此时谁先完成,谁先返回。

7、取消一个函数的执行

我们通过 submit 可以将函数提交到线程池中执行,但如果我们想取消该怎么办呢?

executor = ThreadPoolExecutor() future1 = executor.submit(task, "屏幕前的你", 1) futu
                
                

-六神源码网