期望实现效果: 某一任务,需要定时执行插入数据操作,或者是当存储的数据到达某一限度后也需要插入数据操作。 case: 数据处理归类后需要放入数据库,想要每满10条后进行数据库写入操作,或者是,10条一直没有存储够,但是等了很久,想要没满10条,但是到达一定时间后也要进行数据库写入操作。
每隔多少时间执行一次更新操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import timeimport threadingimport randominsert_data = [] def createtimer (): t = threading.Timer(3 , repeat) t.start() def repeat (): global insert_data print ('Now:' , time.strftime('%H:%M:%S' , time.localtime())) temp_insert_data = insert_data[:] if temp_insert_data: print ("repeat" , temp_insert_data) insert_data = insert_data[len (temp_insert_data):] createtimer() def test (): global insert_data createtimer() num = 0 while True : for i in range (5 ): time.sleep(random.randint(0 ,2 )) insert_data.append(str (num)+'-' +str (i)) num+=1 test()
代码仅为实现方式之一,演示demo,为轻量型实现方式之一,非最优代码。 定时任务可考虑使用较为成熟框架,如apscheduler等。 执行效果:
每多少数据为一批进行写入,当长时间凑不出一批后每隔多少时间执行一次更新操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import timeimport threadingimport randominsert_data = [] q = threading.RLock() def createtimer (): t = threading.Timer(3 , repeat) t.start() def repeat (): global insert_data global q print ('Now:' , time.strftime('%H:%M:%S' , time.localtime())) q.acquire() temp_insert_data = insert_data[:] if temp_insert_data: print ("repeat" , temp_insert_data) insert_data = insert_data[len (temp_insert_data):] q.release() createtimer() def test (): global insert_data global q createtimer() num = 0 while True : for i in range (5 ): time.sleep(random.randint(0 ,2 )) insert_data.append(str (num)+'-' +str (i)) q.acquire() temp_insert_data = insert_data[:] print ("RangeInsert: " , insert_data) insert_data = insert_data[len (temp_insert_data):] q.release() num+=1 test()
由于批量和定时操作同一数据,所以方式数据重复或丢失,在操作insert_data数据时需要进行加锁q.acquire()。 repeat为定时执行的数据库写入操作。 RangeInsert为到达一定批次后执行,如代码中循环超过5次后,无论insert_data有多少数据都进行数据库写入操作。 执行效果: