使用 loop.run_in_executor 进行函数的异步包装

Python 3.7(其实早先版本也有原型)实装了 async 关键字,也算是名正言顺的支持了异步。由于全局锁(GIL)的存在,Python 实现真正的多线程略显乏力,而多进程又缺少了一定的灵活性,因此将现有函数包装为异步执行,可以在需求并发时派上极大用场,并且重点是极易实现。

吐槽

请先允许我轻微吐槽一下。

恕我直言,如今写教程也成了一种流水线。国内各类专栏乃至 XXDN 都没有关于包装这一概念的详细讨论。对于异步的介绍基本都是拿 asyncio.get_event_loop 里面裹个 CPU 操作的函数(没错,就是 hello world)加个 asyncio 库里原生提供的 sleep 方法来给你人工阻塞一下,然后丢进 run_until_complete里一执行:“看呐!异步了!”问题是,这点小规模操作也用不着异步,卡 IO 也不是因为你往里扔个原生支持异步asyncio.sleep造成的。真正阻塞的东西(比如网络读取,看清楚,是比如而不是唯一)是一个字都没提过。

哦我忘了网络读取是有提到的嗯,怎么实现的?aiohttp拿原生的支持异步的 http 库来演示怎么异步包装 http IO?? 那我如果想异步读取文件呢?我想异步 OCR 呢?单纯把函数扔进 loop 里没有任何作用,一个 run_until_complete 直接就又是一个阻塞。难不成我还要再找一个 aiofileaioocr

这样的内容基本上会占满你想要搜索如何进行异步包装时填满你的浏览器,要知道国内的各大网站最擅长的就是我等最擅长的 CV 战士必杀技,你写一篇这样的无用文章放到网上被爬虫检索到就会变成五篇。看过的人把它再“总结”一下再写一篇换汤不换药的放出来就又是五篇。百度难用一方面在于某候选院士缺德,另一方面…… CSDN 的站内搜索结果也没好到哪去。

当然,不是说相关的文章就没有(但我也就找到这么一篇),来自苏明知乎专栏的 Asyncio 使用经验,里面部分描述了如何使用接下来我要叙述的内容进行包装,但也不是完美的……比如我看到最后也没明白他那个方法到底该咋用……当然可能是因为我太菜了。

吐槽完毕,本文结束

正文

还是声明一下最准确 (也最有逼格) 的参考资料:官方文档

由于太简单我选择直接搬原文的代码来……

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
import concurrent.futures

def blocking_io():
# 文件操作(如日志记录)会造成阻塞,在线程池中操作。
with open('/dev/urandom', 'rb') as f:
return f.read(100)

def cpu_bound():
# CPU 操作同样会阻塞,不同的是要在进程池中运行。
return sum(i * i for i in range(10 ** 7))

async def main():
loop = asyncio.get_running_loop()

## Options:

# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)

# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)

# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)

asyncio.run(main())

最后一句 asyncio.run(main()) 会返回一个 Future 对象,你可以用 await 来“运行”它。另外注意 IO 和 CPU 运算使用的池还是有不同的。

上文中给出了简单函数的包装,若要给函数传递参数,建议的方式是用 functools.partial() 来将函数对象化,下面是一个简单的例子(没错,requests 君临)。

1
2
3
4
5
6
7
8
import requests
from concurrent.futures import ThreadPoolExecutor
from functools import partial

get_partial = partial(requests.get, *args, **kwargs)
executor = ThreadPoolExecutor()
# ......
await asyncio.get_event_loop().run_in_executor(executor, get_partial)

这里有一个重点就是,如果不向 run_in_executor 传递第一个参数(缺省为 None),你的函数在多重异步调用(异步函数调用其他的异步函数)时依旧会造成阻塞(跟直接写原函数没有区别)

另外如果真的要对 requests 进行包装还需要注意:请求后返回的对象有一部分属性是惰性求值,因此单纯封装本体并不能完全的做到异步化,具体可以参考某 rc 的 aki 项目。

Addon

如果你喜欢 asyncio 且大量应用,有个东西你应该会喜欢:uvloop。这个东西只需要简单的几行代码就可以让你的 asyncio 性能大大提升。当然,首先不要忘了 pip install uvloop

1
2
3
4
5
6
7
8
9
import asyncio
import uvloop

async def main():
# Main entry-point.
...

uvloop.install()
asyncio.run(main())

结语

请允许我对于现在所谓的教程完全复制粘贴甚至不考虑是否可行表示敬意。无论是哪一类技术,记住官方文档永远是个好东西