APScheduler (advanceded python scheduler)是一款Python開發的定時任務工具。
文檔地址?apscheduler.readthedocs.io/en/latest/u…
特點:
- 不依賴于Linux系統的crontab系統定時,獨立運行
- 可以?動態添加 新的定時任務,如下單后30分鐘內必須支付,否則取消訂單,就可以借助此工具(每下一單就要添加此訂單的定時任務)
- 對添加的定時任務可以做持久保存
1 安裝
pip install apscheduler
2 組成
- APScheduler 由以下四部分組成:
- triggers 觸發器 指定定時任務執行的時機
- job stores 存儲器 可以將定時持久存儲
- executors 執行器 在定時任務該執行時,以進程或線程方式執行任務
- schedulers 調度器 常用的有BackgroundScheduler(?后臺運行 )和BlockingScheduler(?阻塞式 )
3 使用方式
from apscheduler.schedulers.background import BlockingScheduler # 創建定時任務的調度器對象 scheduler = BlockingScheduler() # 創建執行器 executors = { 'default': ThreadPoolExecutor(20), } # 定義定時任務 def my_job(param1, param2): # 參數通過add_job()args傳遞傳遞過來 print(param1) # 100 print(param2) # python # 向調度器中添加定時任務 scheduler.add_job(my_job, 'date', args=[100, 'python'], executors=executors) # 啟動定時任務調度器工作 scheduler.start()
4 調度器 Scheduler
負責管理定時任務
BlockingScheduler : 作為獨立進程時使用
from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() scheduler.start() # 此處程序會發生阻塞 BackgroundScheduler : 在框架程序(如Django、Flask)中使用. from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() scheduler.start() # 此處程序不會發生阻塞
- AsyncIOScheduler : 當你的程序使用了asyncio的時候使用。
- GeventScheduler : 當你的程序使用了gevent的時候使用。
- TornadoScheduler : 當你的程序基于Tornado的時候使用。
- TwistedScheduler : 當你的程序使用了Twisted的時候使用
- QtScheduler : 如果你的應用是一個Qt應用的時候可以使用。
4 執行器 executors
在定時任務該執行時,以進程或線程方式執行任務
ThreadPoolExecutor from apscheduler.executors.pool import ThreadPoolExecutor ThreadPoolExecutor(max_workers)
使用方法
from apscheduler.executors.pool import ThreadPoolExecutor executors = { 'default': ThreadPoolExecutor(20) # 最多20個線程同時執行 } scheduler = BackgroundScheduler(executors=executors) ProcessPoolExecutor from apscheduler.executors.pool import ProcessPoolExecutor ProcessPoolExecutor(max_workers)
使用方法
from apscheduler.executors.pool import ProcessPoolExecutor executors = { 'default': ProcessPoolExecutor(5) # 最多5個進程同時執行 } scheduler = BackgroundScheduler(executors=executors)
5 觸發器 Trigger
指定定時任務執行的時機。
1) date 在特定的時間日期執行
from datetime import date # 在2019年11月6日00:00:00執行 sched.add_job(my_job, 'date', run_date=date(2019, 11, 6)) # 在2019年11月6日16:30:05 sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5)) sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05') # 立即執行 sched.add_job(my_job, 'date') sched.start()
2) interval 經過指定的時間間隔執行
weeks (int) ?C number of weeks to wait days (int) ?C number of days to wait hours (int) ?C number of hours to wait minutes (int) ?C number of minutes to wait seconds (int) ?C number of seconds to wait start_date (datetime|str) ?C starting point for the interval calculation end_date (datetime|str) ?C latest possible date/time to trigger on timezone (datetime.tzinfo|str) ?C time zone to use for the date/time calculations from datetime import datetime # 每兩小時執行一次 sched.add_job(job_function, 'interval', hours=2) # 在2012年10月10日09:30:00 到2014年6月15日11:00:00的時間內,每兩小時執行一次 sched.add_job(job_function, 'interval', hours=2, start_date='2012-10-10 09:30:00', end_date='2014-06-15 11:00:00')
3) cron 按指定的周期執行
year (int|str) ?C 4-digit year month (int|str) ?C month (1-12) day (int|str) ?C day of the (1-31) week (int|str) ?C ISO week (1-53) day_of_week (int|str) ?C number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) ?C hour (0-23) minute (int|str) ?C minute (0-59) second (int|str) ?C second (0-59) start_date (datetime|str) ?C earliest possible date/time to trigger on (inclusive) end_date (datetime|str) ?C latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) ?C time zone to use for the date/time calculations (defaults to scheduler timezone) # 在6、7、8、11、12月的第三個周五的00:00, 01:00, 02:00和03:00 執行 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 在2014年5月30日前的周一到周五的5:30執行 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
6.任務存儲
MemoryJobStore 默認內存存儲 MongoDBJobStore 任務保存到MongoDB from apscheduler.jobstores.mongodb import MongoDB JobStoreMongoDBJobStore()復制代碼 RedisJobStore 任務保存到redis from apscheduler.jobstores.redis import RedisJobStore RedisJobStore()
7 配置方法
方法1
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor executors = { 'default': ThreadPoolExecutor(20), } conf = { # redis配置 "host":127.0.0.1, "port":6379, "db":15, # 連接15號數據庫 "max_connections":10 # redis最大支持300個連接數 } scheduler = BackgroundScheduler(executors=executors) scheduler.add_jobstore(jobstore='redis', **conf) # 添加任務持久化存儲方式,如果未安裝redis可省略此步驟
方法2
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, 'processpool': ProcessPoolExecutor(max_workers=5) } scheduler = BackgroundScheduler() # .. 此處可以編寫其他代碼 # 使用configure方法進行配置 scheduler.configure(executors=executors)
8 啟動
scheduler.start()
對于BlockingScheduler ,程序會阻塞在這,防止退出,作為獨立進程時使用。(可以用來生成靜態頁面)
對于BackgroundScheduler,可以在應用程序中使用。不再以單獨的進程使用。(如30分鐘內取消訂單)
9 擴展
任務管理
方式1
job = scheduler.add_job(myfunc, 'interval', minutes=2) # 添加任務 job.remove() # 刪除任務 job.pause() # 暫定任務 job.resume() # 恢復任務
方式2
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 添加任務 scheduler.remove_job('my_job_id') # 刪除任務 scheduler.pause_job('my_job_id') # 暫定任務 scheduler.resume_job('my_job_id') # 恢復任務
調整任務調度周期
job.modify(max_instances=6, name='Alternate name') scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')復制代碼 停止APScheduler運行 scheduler.shutdown()
10 綜合使用
這里提供30分鐘取消訂單支付的思路,可以使用Flask或者Django程序都能實現,這里是在django應用中動態的添加一個定時任務,調度器需要使用BackgroundScheduler。下面先定義執行訂單取消的任務。
from apscheduler.executors.pool import ThreadPoolExecutor from datetime import datetime, timedelta from apscheduler.schedulers.blocking import BackgroundScheduler from goods.models import SKU from orders.models import OrderGoods def cancel_order_job(order_id, sku_id, stock, sales): # 將訂單商品和訂單信息篩選出來 order_goods = OrderGoods.objects.filter( order_id=order_id, sku_id=sku_id) order_goods.delete() # 刪除訂單 try: sku = SKU.objects.get(id=sku_id) sku.stock += stock # 訂單刪掉后商品表里的庫存恢復 sku.sales -= sales # 商品表里銷量還原 sku.save() except Exception as e: print(e)
具體操作哪些表要根據自身表的設計來定,大致是上面的思路。然后在生成訂單的視圖中同時生成取消訂單的任務。然后將取消訂單
cancel_order_job()
需要的參數傳遞過去,注意要判定當前訂單的狀態為未支付狀態。
from datetime import datetime, timedelta class OrderCommitView(View): def post(self, request): # ... 此處省略生成訂單相關邏輯 if status == OrderInfo.STATUS.UNPADED: # 待支付狀態 executors = { 'default': ThreadPoolExecutor(10) } now = datetime.now() delay = now + timedelta(minutes=30) # 從當前下訂單延時30分鐘后 scheduler = BackgroundScheduler(executors=executors) # 添加定時任務 scheduler.add_job(cancel_order_job, 'date', run_date=delay, args=[order_id, sku.id, sku.stock, sku.sales]) scheduler.start() # ....省略其他業務及返回
注意: 如果需要周期性的執行一個定時任務,如果用到了django中模型類或者Flask的配置信息等相關信息,需要將框架的配置信息導入。
總結
以上所述是小編給大家介紹的Python定時任務工具之APScheduler詳解,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對腳本之家網站的支持!
如果你覺得本文對你有幫助,歡迎轉載,煩請注明出處,謝謝!
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
