python如何实现一个协程的框架(为何你还不懂得如何使用Python协程)
python如何实现一个协程的框架(为何你还不懂得如何使用Python协程)0x00 何为协程(Coroutine)注:我使用的Python环境是3.7。在前一篇《一文彻底搞懂Python可迭代(Iterable)、迭代器(Iterator)和生成器(Generator)的概念》 的文中,知道生成器(Generator)可由以下两种方式定义:在Python早期的版本中协程也是通过生成器来实现的,也就是基于生成器的协程(Generator-based Coroutines)。在前一篇介绍生成器的文章末尾举了一个生产者-消费者的例子,就是基于生成器的协程来实现的。def producer(c): n = 0 while n < 5: n = 1 print('producer {}'.format(n)) r = c.send(n) print('consumer return {}'.format(r)) def consumer(): r =
关于我
一个有思想的程序猿,终身学习实践者,目前在一个创业团队任team lead,技术栈涉及Android、python、Java和Go,这个也是我们团队的主要技术栈。
Github:https://github.com/hylinux1024
终身开发者(angrycode)
在前一篇《一文彻底搞懂Python可迭代(Iterable)、迭代器(Iterator)和生成器(Generator)的概念》 的文中,知道生成器(Generator)可由以下两种方式定义:
- 列表生成器
- 使用yield定义的函数
在Python早期的版本中协程也是通过生成器来实现的,也就是基于生成器的协程(Generator-based Coroutines)。在前一篇介绍生成器的文章末尾举了一个生产者-消费者的例子,就是基于生成器的协程来实现的。
def producer(c): n = 0 while n < 5: n = 1 print('producer {}'.format(n)) r = c.send(n) print('consumer return {}'.format(r)) def consumer(): r = '' while True: n = yield r if not n: return print('consumer {} '.format(n)) r = 'ok' if __name__ == '__main__': c = consumer() next(c) # 启动consumer producer(c)
看了这段代码,相信很多初学者和我一样对基于生成器的协程实现其实很难马上就能够根据业务写出自己的协程代码。Python实现者们也注意到这个问题,因为它太不Pythonic了。而基于生成器的协程也将被废弃,因此本文将重点介绍asyncio包的使用,以及涉及到的一些相关类概念。
注:我使用的Python环境是3.7。
0x00 何为协程(Coroutine)
协程(Coroutine)是在线程中执行的,可理解为微线程,但协程的切换没有上下文的消耗,它比线程更加轻量些。一个协程可以随时中断自己让另一个协程开始执行,也可以从中断处恢复并继续执行,它们之间的调度是由程序员来控制的(可以看本文开篇处生产者-消费者的代码)。
定义一个协程
在Python3.5 版本新增了aysnc和await关键字,这两个语法糖让我们非常方便地定义和使用协程。
在函数定义时用async声明就定义了一个协程。
import asyncio # 定义了一个简单的协程 async def simple_async(): print('hello') await asyncio.sleep(1) # 休眠1秒 print('python') # 使用asynio中run方法运行一个协程 asyncio.run(simple_async()) # 执行结果为 # hello # python
在协程中如果要调用另一个协程就使用await。要注意await关键字要在async定义的函数中使用,而反过来async函数可以不出现await
# 定义了一个简单的协程 async def simple_async(): print('hello') asyncio.run(simple_async()) # 执行结果 # hello
asyncio.run()将运行传入的协程,负责管理asyncio事件循环。
除了run()方法可直接执行协程外,还可以使用事件循环loop
async def do_something(index): print(f'start {time.strftime("%X")}' index) await asyncio.sleep(1) print(f'finished at {time.strftime("%X")}' index) def test_do_something(): # 生成器产生多个协程对象 task = [do_something(i) for i in range(5)] # 获取一个事件循环对象 loop = asyncio.get_event_loop() # 在事件循环中执行task列表 loop.run_until_complete(asyncio.wait(task)) loop.close() test_do_something() # 运行结果 # start 00:04:03 3 # start 00:04:03 4 # start 00:04:03 1 # start 00:04:03 2 # start 00:04:03 0 # finished at 00:04:04 3 # finished at 00:04:04 4 # finished at 00:04:04 1 # finished at 00:04:04 2 # finished at 00:04:04 0
可以看出几乎同时启动了所有的协程。
其实翻阅源码可知asyncio.run()的实现也是封装了loop对象及其调用。而asyncio.run()每次都会创建一个新的事件循环对象用于执行协程。
0x01 Awaitable对象
在Python中可等待(Awaitable)对象有:协程(corountine)、任务(Task)、Future。即这些对象可以使用await关键字进行调用
await awaitable_object
1. 协程(Coroutine)
协程由async def声明定义,一个协程可由另一个协程使用await进行调用
async def nested(): print('in nested func') return 13 async def outer(): # 要使用await 关键字 才会执行一个协程函数返回的协程对象 print(await nested()) asyncio.run(outer()) # 执行结果 # in nested func # 13
如果在outer()方法中直接调用nested()而不使用await 将抛出一个RuntimeWarning
async def outer(): # 直接调用协程函数不会发生执行,只是返回一个 coroutine 对象 nested() asyncio.run(outer())
运行程序,控制台将输出以下信息
RuntimeWarning: coroutine 'nested' was never awaited nested() RuntimeWarning: Enable tracemalloc to get the object allocation traceback
2. 任务(Task)
任务(Task)是可以用来并发地执行协程。可以使用asyncio.create_task()将一个协程对象封装成任务,该任务将很快被排入调度队列并执行。
async def nested(): print('in nested func') return 13 async def create_task(): # create_task 将一个协程对象打包成一个 任务时,该协程就会被自动调度运行 task = asyncio.create_task(nested()) # 如果要看到task的执行结果 # 可以使用await等待协程执行完成,并返回结果 ret = await task print(f'nested return {ret}') asyncio.run(create_task()) # 运行结果 # in nested func # nested return 13
注:关于并发下文还会详细说明。
3. Future
Future是一种特殊的低层级(low-level)对象,它是异步操作的最终结果(eventual result)。 当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
通常在应用层代码不会直接创建Future对象。在某些库和asyncio模块中的会使用到该对象。
async def used_future_func(): await function_that_returns_a_future_object()
0x02 并发
1. Task
前面我们知道Task可以并发地执行。 asyncio.create_task()就是一个把协程封装成Task的方法。
async def do_after(what delay): await asyncio.sleep(delay) print(what) # 利用asyncio.create_task创建并行任务 async def corun(): task1 = asyncio.create_task(do_after('hello' 1)) # 模拟执行1秒的任务 task2 = asyncio.create_task(do_after('python' 2)) # 模拟执行2秒的任务 print(f'started at {time.strftime("%X")}') # 等待两个任务都完成,两个任务是并行的,所以总时间两个任务中最大的执行时间 await task1 await task2 print(f'finished at {time.strftime("%X")}') asyncio.run(corun()) # 运行结果 # started at 23:41:08 # hello # python # finished at 23:41:10
task1是一个执行1秒的任务,task2是一个执行2秒的任务,两个任务并发的执行,总共消耗2秒。
2. gather
除了使用asyncio.create_task()外还可以使用asyncio.gather(),这个方法接收协程参数列表
async def do_after(what delay): await asyncio.sleep(delay) print(what) async def gather(): print(f'started at {time.strftime("%X")}') # 使用gather可将多个协程传入 await asyncio.gather( do_after('hello' 1) do_after('python' 2) ) print(f'finished at {time.strftime("%X")}') asyncio.run(gather()) # 运行结果 # started at 23:47:50 # hello # python # finished at 23:47:52
两个任务消耗的时间为其中消耗时间最长的任务。
0x03 引用
- https://docs.python.org/3/library/asyncio-task.html