文章名字起的是基于协程的异步任务实现,可能不是特别准确,不过直观像表达的意思就是这样的。
应用场景:对于高io场景,通常会使用进程或线程来实现异步的操作,但是进程和线程的调度过程也会浪费很多资源,由此协程更能高效的利用系统的资源。
除此之外第三方模块为了提高cpu利用率和性能有已经编写好的async函数时,我们也可直接利用。
实现的一个简单封装,demo代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| import asyncio import time
from threading import Thread
class CreateLoop: """ 创建loop """ @classmethod def new(cls): _new_loop = asyncio.new_event_loop() _thread_t = Thread(target=cls._start_loop, args=(_new_loop,)) _thread_t.start() return _new_loop
@classmethod def _start_loop(cls, loop): asyncio.set_event_loop(loop) print(f'Start_Loop time {time.time()}') loop.run_forever()
class CoroutineWorks: """ 协程任务 """ @staticmethod async def do_some_work1(*args, **kwargs): print(f'Start {kwargs.get("Name")}') await asyncio.sleep(int(kwargs.get("Time"))) return f'Finish {kwargs.get("Name")}'
@staticmethod async def do_some_work2(): ...
class CoroutineCallBack: """ 协程异步处理完毕后回调函数 """ @staticmethod def call_back_func(obj): try: print(f'C_CallBack, {obj.result()} time {time.time()}') except Exception as e: print("[ Warning ]:Task cancelled OR Other reasons | "+str(e))
@staticmethod def call_back_func2(): ...
if __name__ == '__main__': new_loop = CreateLoop.new()
result1 = asyncio.run_coroutine_threadsafe(CoroutineWorks.do_some_work1(Name="work1", Time=5), new_loop) result1.add_done_callback(CoroutineCallBack.call_back_func) result2 = asyncio.run_coroutine_threadsafe(CoroutineWorks.do_some_work1(Name="work2", Time=3), new_loop) result2.add_done_callback(CoroutineCallBack.call_back_func) result3 = asyncio.run_coroutine_threadsafe(CoroutineWorks.do_some_work1(Name="work3", Time=7), new_loop) result3.add_done_callback(CoroutineCallBack.call_back_func)
result4 = asyncio.run_coroutine_threadsafe(CoroutineWorks.do_some_work1(Name="work4", Time=4), new_loop) result4.add_done_callback(CoroutineCallBack.call_back_func) result4.cancel()
while True: name_i, time_i = input().split(' ') result = asyncio.run_coroutine_threadsafe(CoroutineWorks.do_some_work1(Name=name_i, Time=time_i), new_loop) result.add_done_callback(CoroutineCallBack.call_back_func) print(name_i, time_i)
|
执行效果如下:
work1-4四个任务启动后,任务4由于启动后直接cancel()取消了,因此被捕获了异常,1-3任务正常执行。
各任务结束时间减去Loop开始时间则为每个任务大概执行时间,如下:
work1:1646218902 - 1646218897 = 5s
work2:1646218900 - 1646218897 = 3s
work3:1646218904 - 1646218897 = 7s
符合预期
work_custom 3 绿色的为输入的自定义任务,可以直接传入执行。(模拟新建一个异步的任务)