文章名字起的是基于协程的异步任务实现,可能不是特别准确,不过直观像表达的意思就是这样的。
应用场景:对于高io场景,通常会使用进程或线程来实现异步的操作,但是进程和线程的调度过程也会浪费很多资源,由此协程更能高效的利用系统的资源。
除此之外第三方模块为了提高cpu利用率和性能有已经编写好的async函数时,我们也可直接利用。

实现的一个简单封装,demo代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import asyncio
import time

from threading import Thread


class CreateLoop:
"""
创建loop
"""
@classmethod
def new(cls):
_new_loop = asyncio.new_event_loop()
_thread_t = Thread(target=cls._start_loop, args=(_new_loop,)) # 单线程创建一个loop,模拟一个线程中的调度
_thread_t.start()
return _new_loop

@classmethod
def _start_loop(cls, loop):
asyncio.set_event_loop(loop)
print(f'Start_Loop time {time.time()}')
loop.run_forever()


class CoroutineWorks:
"""
协程任务
"""
@staticmethod
async def do_some_work1(*args, **kwargs):
print(f'Start {kwargs.get("Name")}')
await asyncio.sleep(int(kwargs.get("Time"))) # 协程调度
# time.sleep(int(kwargs.get("Time"))) # 阻塞
return f'Finish {kwargs.get("Name")}'

@staticmethod
async def do_some_work2():
...


class CoroutineCallBack:
"""
协程异步处理完毕后回调函数
"""
@staticmethod
def call_back_func(obj):
try:
print(f'C_CallBack, {obj.result()} time {time.time()}')
except Exception as e:
print("[ Warning ]:Task cancelled OR Other reasons | "+str(e))

@staticmethod
def call_back_func2():
...


if __name__ == '__main__':
new_loop = CreateLoop.new()

# 异步任务测试
# 任务一
result1 = asyncio.run_coroutine_threadsafe(CoroutineWorks.do_some_work1(Name="work1", Time=5), new_loop)
result1.add_done_callback(CoroutineCallBack.call_back_func)
# 任务二
result2 = asyncio.run_coroutine_threadsafe(CoroutineWorks.do_some_work1(Name="work2", Time=3), new_loop)
result2.add_done_callback(CoroutineCallBack.call_back_func)
# 任务三
result3 = asyncio.run_coroutine_threadsafe(CoroutineWorks.do_some_work1(Name="work3", Time=7), new_loop)
result3.add_done_callback(CoroutineCallBack.call_back_func)

# 任务取消测试
# 手动取消任务
result4 = asyncio.run_coroutine_threadsafe(CoroutineWorks.do_some_work1(Name="work4", Time=4), new_loop)
result4.add_done_callback(CoroutineCallBack.call_back_func)
result4.cancel()

# 任务添加测试
# 手动添加任务name和time测试
while True:
name_i, time_i = input().split(' ')
result = asyncio.run_coroutine_threadsafe(CoroutineWorks.do_some_work1(Name=name_i, Time=time_i), new_loop)
result.add_done_callback(CoroutineCallBack.call_back_func)
print(name_i, time_i)

执行效果如下:

work1-4四个任务启动后,任务4由于启动后直接cancel()取消了,因此被捕获了异常,1-3任务正常执行。
各任务结束时间减去Loop开始时间则为每个任务大概执行时间,如下:
work1:1646218902 - 1646218897 = 5s
work2:1646218900 - 1646218897 = 3s
work3:1646218904 - 1646218897 = 7s
符合预期
work_custom 3 绿色的为输入的自定义任务,可以直接传入执行。(模拟新建一个异步的任务)