目的:很多時候,我們需要多個不同策略去完成一個任務,那個插件架構是很好一個選擇。而每一個策略,不希望一個個去執行,還能實時上報數據,所以每一個插件都是異步執行,把每一個插件結果回調方法實時上報,然后寫入數據。
?
線程工具類
#app/MyThread.py
import threading
class MyThread(threading.Thread):
def __init__(self, func, **kwargs):
threading.Thread.__init__(self)
self.func = func
self.kwargs = kwargs
self.thread_stop = False
def run(self):
self.func(self, self.kwargs)
def stop(self):
self.thread_stop = True
插件管理中心
#app/platform.py
#coding=utf-8
from .MyThread import MyThread
# 注冊加載插件
# 多線程運行插件
# 回調返回數據,放入寫入隊列
class DataManagerProcessor(object):
PLUGINS = {}
def run_plugin(self,parent,args):
self.PLUGINS[args["plugin_name"]]().run(parent,args["callback"])
def process(self, callback):
threads=[]
for plugin_name in self.PLUGINS.keys():
print(plugin_name)
dt = MyThread(self.run_plugin,plugin_name=plugin_name,callback=callback)
dt.start()
dt.setName(plugin_name)
threads.append(dt)
return threads
@classmethod
def plugin_register(cls, plugin_name):
def wrapper(plugin):
cls.PLUGINS.update({plugin_name:plugin})
return plugin
return wrapper
#app/main.py
#coding=utf-8
import queue
from .MyThread import MyThread
from .platform import DataManagerProcessor
result_queue = queue.Queue()
#異步寫入隊列
def updateDB(result):
result_queue.put(result)
#判斷插件任務線程是否結束
def isAllTasksCompleted(threads):
for t in threads:
if t.is_alive():
return False
return True
def syncDB(parent,args):
threads=args["threads"]
while True:
if not result_queue.empty():
data=result_queue.get()
print("*************************")
print(data)
print("*************************")
result_queue.task_done()
elif isAllTasksCompleted(threads):
break
def main():
processor = DataManagerProcessor()
threads=processor.process(updateDB)
# for t in threads:
# t.join()
syncThread=MyThread(syncDB,threads=threads)
syncThread.start()
syncThread.join()
print("Done")
#app/__init__.py
from .plugins import *
#run.py
from app.main import main
main()
插件編寫
#app/plugins/plugin1.py
from app.platform import DataManagerProcessor
@DataManagerProcessor.plugin_register('plugin1')
class plugin1(object):
def crawl_data(self,callback):
callback(1)#實時上報數據
#parent 是線程對象,可以隨時停止任務
def run(self, parent,callback):
print(parent.thread_stop)
self.crawl_data(callback)
if __name__ == '__main__':
def test(result):
print(result)
plugin1().crawl_data(result)
#app/plugins/plugin2.py
from app.platform import DataManagerProcessor
@DataManagerProcessor.plugin_register('plugin1')
class plugin2(object):
def crawl_data(self,callback):
callback(2)#實時上報數據
#parent 是線程對象,可以隨時停止任務
def run(self, parent,callback):
print(parent.thread_stop)
self.crawl_data(callback)
if __name__ == '__main__':
def test(result):
print(result)
plugin2().crawl_data(result)
#app/plugins/__init__.py
__all__ = ['plugin1', 'plugin2']
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
