欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

2種方式解決Python執行卡頓問題

系統 1756 0

2種方式解決Python執行卡頓問題_第1張圖片

轉載:hackpython

簡介

Flask 是 Python 中有名的輕量級同步 web 框架,在一些開發中,可能會遇到需要長時間處理的任務,此時就需要使用異步的方式來實現,讓長時間任務在后臺運行,先將本次請求的響應狀態返回給前端,不讓前端界面「卡頓」,當異步任務處理好后,如果需要返回狀態,再將狀態返回。

怎么實現呢?

使用線程的方式

當要執行耗時任務時,直接開啟一個新的線程來執行任務,這種方式最為簡單快速。

通過 ThreadPoolExecutor 來實現

              
                from flask import Flask	
from time import sleep	
from concurrent.futures import ThreadPoolExecutor	
# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor	
# 創建線程池執行器	
executor = ThreadPoolExecutor(2)	
app = Flask(__name__)	
@app.route('/jobs')	
def run_jobs():	
    # 交由線程去執行耗時任務	
    executor.submit(long_task, 'hello', 123)	
    return 'long task running.'	
# 耗時任務	
def long_task(arg1, arg2):	
    print("args: %s %s!" % (arg1, arg2))	
    sleep(5)	
    print("Task is done!")	
if __name__ == '__main__':	
    app.run()
              
            

當要執行一些比較簡單的耗時任務時就可以使用這種方式,如發郵件、發短信驗證碼等。

但這種方式有個問題,就是前端無法得知任務執行狀態。

如果想要前端知道,就需要設計一些邏輯,比如將任務執行狀態存儲到 redis 中,通過唯一的任務 id 進行標識,然后再寫一個接口,通過任務 id 去獲取任務的狀態,然后讓前端定時去請求該接口,從而獲得任務狀態信息。

全部自己實現就顯得有些麻煩了,而 Celery 剛好實現了這樣的邏輯,來使用一下。

使用 Celery

為了滿足前端可以獲得任務狀態的需求,可以使用 Celery。

Celery 是實時任務處理與調度的分布式任務隊列,它常用于 web 異步任務、定時任務等,后面單獨寫一篇文章描述 Celery 的架構,這里不深入討論。

現在我想讓前端可以通過一個進度條來判斷后端任務的執行情況。使用 Celery 就很容易實現,首先通過 pip 安裝 Celery 與 redis,之所以要安裝 redis,是因為讓 Celery 選擇 redis 作為「消息代理 / 消息中間件」。

              
                pip install celery	
pip install redis
              
            

在 Flask 中使用 Celery 其實很簡單,這里先簡單的過一下 Flask 中使用 Celery 的整體流程,然后再去實現具體的項目

  • 1. 在 Flask 中初始化 Celery

              
                from flask import Flask	
from celery import Celery	
app = Flask(__name__)	
# 配置	
# 配置消息代理的路徑,如果是在遠程服務器上,則配置遠程服務器中redis的URL	
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'	
# 要存儲 Celery 任務的狀態或運行結果時就必須要配置	
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'	
# 初始化Celery	
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])	
# 將Flask中的配置直接傳遞給Celery	
celery.conf.update(app.config)
              
            

上述代碼中,通過 Celery 類初始化 celery 對象,傳入的應用名稱與消息代理的連接 URL。

  • 2. 通過 celery.task 裝飾器裝飾耗時任務對應的函數

              
                @celery.task	
def long_task(arg1, arg2):	
    # 耗時任務的邏輯	
    return result
              
            
  • 3.Flask 中定義接口通過異步的方式執行耗時任務

              
                @app.route('/', methods=['GET', 'POST'])	
def index():	
    task = long_task.delay(1, 2)
              
            

delay () 方法是 apply async () 方法的快捷方式,apply async () 參數更多,可以更加細致的控制耗時任務,比如想要 long_task () 在一分鐘后再執行

              
                @app.route('/', methods=['GET', 'POST'])	
def index():	
    task = long_task.apply_async(args=[1, 2], countdown=60)
              
            

delay () 與 apply_async () 會返回一個任務對象,該對象可以獲取任務的狀態與各種相關信息。

通過這 3 步就可以使用 Celery 了。

接著就具體來實現「讓前端可以通過一個進度條來判斷后端任務的執行情況」的需求。

              
                # bind為True,會傳入self給被裝飾的方法	
@celery.task(bind=True)	
def long_task(self):	
    verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']	
    adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']	
    noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']	
    message = ''	
    total = random.randint(10, 50)	
    for i in range(total):	
        if not message or random.random() < 0.25:	
            # 隨機的獲取一些信息	
            message = '{0} {1} {2}...'.format(random.choice(verb),	
                                              random.choice(adjective),	
                                              random.choice(noun))	
        # 更新Celery任務狀態	
        self.update_state(state='PROGRESS',	
                          meta={'current': i, 'total': total,	
                                'status': message})	
        time.sleep(1)	
    # 返回字典	
    return {'current': 100, 'total': 100, 'status': 'Task completed!',	
            'result': 42}
              
            

上述代碼中,celery.task () 裝飾器使用了 bind=True 參數,這個參數會讓 Celery 將 Celery 本身傳入,可以用于記錄與更新任務狀態。

然后就是一個 for 迭代,迭代的邏輯沒什么意義,就是隨機從 list 中抽取一些詞匯來模擬一些邏輯的運行,為了表示這是耗時邏輯,通過 time.sleep (1) 休眠一秒。

每次獲取一次詞匯,就通過 self.update_state () 更新 Celery 任務的狀態,Celery 包含一些內置狀態,如 SUCCESS、STARTED 等等,這里使用了自定義狀態「PROGRESS」,除了狀態外,還將本次循環的一些信息通過 meta 參數 (元數據) 以字典的形式存儲起來。有了這些數據,前端就可以顯示進度條了。

定義好耗時方法后,再定義一個 Flask 接口方法來調用該耗時方法

              
                @app.route('/longtask', methods=['POST'])	
def longtask():	
    # 異步調用	
    task = long_task.apply_async()	
    # 返回 202,與Location頭	
    return jsonify({}), 202, {'Location': url_for('taskstatus',	
                                                  task_id=task.id)}
              
            

簡單而言,前端通過 POST 請求到 /longtask,讓后端開始去執行耗時任務。

返回的狀態碼為 202,202 通常表示一個請求正在進行中,然后還在返回數據包的包頭 (Header) 中添加了 Location 頭信息,前端可以通過讀取數據包中 Header 中的 Location 的信息來獲取任務 id 對應的完整 url。

前端有了任務 id 對應的 url 后,還需要提供一個接口給前端,讓前端可以通過任務 id 去獲取當前時刻任務的具體狀態。

              
                @app.route('/status/
                
                  ')	
def taskstatus(task_id):	
    task = long_task.AsyncResult(task_id)	
    if task.state == 'PENDING': # 在等待	
        response = {	
            'state': task.state,	
            'current': 0,	
            'total': 1,	
            'status': 'Pending...'	
        }	
    elif task.state != 'FAILURE': # 沒有失敗	
        response = {	
            'state': task.state, # 狀態	
            # meta中的數據,通過task.info.get()可以獲得	
            'current': task.info.get('current', 0), # 當前循環進度	
            'total': task.info.get('total', 1), # 總循環進度	
            'status': task.info.get('status', '')	
        }	
        if 'result' in task.info: 	
            response['result'] = task.info['result']	
    else:	
        # 后端執行任務出現了一些問題	
        response = {	
            'state': task.state,	
            'current': 1,	
            'total': 1,	
            'status': str(task.info),  # 報錯的具體異常	
        }	
    return jsonify(response)
                
              
            

為了可以獲得任務對象中的信息,使用任務 id 初始化 AsyncResult 類,獲得任務對象,然后就可以從任務對象中獲得當前任務的信息。

該方法會返回一個 JSON,其中包含了任務狀態以及 meta 中指定的信息,前端可以利用這些信息構建一個進度條。

如果任務在 PENDING 狀態,表示該任務還沒有開始,在這種狀態下,任務中是沒有什么信息的,這里人為的返回一些數據。如果任務執行失敗,就返回 task.info 中包含的異常信息,此外就是正常執行了,正常執行可以通 task.info 獲得任務中具體的信息。

這樣,后端的邏輯就處理完成了,接著就來實現前端的邏輯,要實現圖形進度條,可以直接使用 nanobar.js,簡單兩句話就可以實現一個進度條,其官網例子如下:

              
                var options = {	
    classname: 'my-class',	
  id: 'my-id',	
  // 進度條要出現的位置	
    target: document.getElementById('myDivId')	
};	
// 初始化進度條對象	
var nanobar = new Nanobar( options );	
nanobar.go( 30 ); // 30% 進度條	
nanobar.go( 76 ); // 76% 進度條	
// 100% 進度條,進度條結束	
nanobar.go(100);
              
            

有了 nanobar.js 就非常簡單了。

先定義一個簡單的 HTML 界面

              
                

Long running task with progress updates



通過 JavaScript 實現對后臺的請求

              
                // 按鈕點擊事件	
$(function() {	
       $('#start-bg-job').click(start_long_task);	
   });	
// 請求 longtask 接口	
function start_long_task() {	
            // 添加元素在html中	
            div = $('
                
0%
...
?

'); $('#progress').append(div); // 創建進度條對象 var nanobar = new Nanobar({ bg: '#44f', target: div[0].childNodes[0] }); // ajax請求longtask $.ajax({ type: 'POST', url: '/longtask', // 獲得數據,從響應頭中獲取Location success: function(data, status, request) { status_url = request.getResponseHeader('Location'); // 調用 update_progress() 方法更新進度條 update_progress(status_url, nanobar, div[0]); }, error: function() { alert('Unexpected error'); } }); } // 更新進度條 function update_progress(status_url, nanobar, status_div) { // getJSON()方法是JQuery內置方法,這里向Location中對應的url發起請求,即請求「/status/ 」 $.getJSON(status_url, function(data) { // 計算進度 percent = parseInt(data['current'] * 100 / data['total']); // 更新進度條 nanobar.go(percent); // 更新文字 $(status_div.childNodes[1]).text(percent + '%'); $(status_div.childNodes[2]).text(data['status']); if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') { if ('result' in data) { // 展示結果 $(status_div.childNodes[3]).text('Result: ' + data['result']); } else { // 意料之外的事情發生 $(status_div.childNodes[3]).text('Result: ' + data['state']); } } else { // 2秒后再次運行 setTimeout(function() { update_progress(status_url, nanobar, status_div); }, 2000); } }); }

可以通過注釋閱讀代碼整體邏輯。

至此,需求實現完了,運行一下。

首先運行 Redis

              
                redis-server
              
            

然后運行 celery

              
                celery worker -A app.celery --loglevel=info
              
            

最后運行 Flask 項目

              
                python app.py
              
            

效果如下:

2種方式解決Python執行卡頓問題_第2張圖片


Flask 異步運行任務的常見方式就介紹完了,因為本人在開發一個用于自動生成字幕的小玩具,其中視頻上傳以及字幕生成都是耗時任務,所以就單獨寫一篇文章來介紹一下這部分的內容,后面會將小玩具的代碼開源讓大家學習一下,先一睹其真容:

2種方式解決Python執行卡頓問題_第3張圖片

往期閱讀


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 亚洲欧美日韩精品久久奇米色影视 | 成年视频网站免费观看 | 激情五月婷婷色 | 天堂色综合 | 久久色网 | 黄色小视频在线观看 | 久在草视频| 欧美久久亚洲精品 | 亚洲精品乱码久久久久久9色 | 国产丝袜在线 | 亚洲国产日韩欧美综合久久 | 久草 在线| 中文字幕在线视频日本 | 五月婷婷婷婷 | 久久99精品久久久97夜夜嗨 | 成人黄性视频 | 午夜成人在线视频 | 日本大片久久久高清免费看 | 精品日本一区二区 | 天天射天天怕 | 国内精品视频区在线2021 | 午夜在线精品偷拍 | 久久久蜜桃 | 中国美女撒尿txxxxx视频 | 99在线视频精品 | 91在线亚洲精品专区 | 波多野结衣日韩 | 逼逼网 | 一个人看aaaa免费中文 | 亚洲午夜精品一区二区蜜桃 | 午夜色站 | 极品丝袜高跟91极品系列 | 久久青 | 日本高清天码一区在线播放 | 国产第一亚洲 | 91精品国产色综合久久 | 国产精品最新 | 中文字幕网在线 | 成人免费影 | 国产拳头交一区二区 | 亚洲精品中文字幕大岛优香 |