使用 Celery 進行背景任務¶
如果您的應用程式有長時間運行的任務,例如處理一些上傳的資料或發送電子郵件,您不會希望在請求期間等待它完成。相反地,使用任務佇列將必要的資料發送到另一個程序,該程序將在背景執行任務,同時請求立即返回。
Celery 是一個功能強大的任務佇列,可用於簡單的背景任務以及複雜的多階段程式和排程。本指南將向您展示如何使用 Flask 配置 Celery。請閱讀 Celery 的 Celery 入門 指南,以了解如何使用 Celery 本身。
Flask 儲存庫包含 一個範例,基於此頁面上的資訊,該範例也展示了如何使用 JavaScript 提交任務並輪詢進度和結果。
安裝¶
從 PyPI 安裝 Celery,例如使用 pip
$ pip install celery
將 Celery 與 Flask 整合¶
您可以使用 Celery 而無需與 Flask 整合,但是透過 Flask 的配置來配置它,並讓任務存取 Flask 應用程式會更方便。
Celery 使用與 Flask 類似的概念,使用一個 Celery
應用程式物件,該物件具有配置並註冊任務。在建立 Flask 應用程式時,請使用以下程式碼來建立和配置 Celery 應用程式。
from celery import Celery, Task
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
這會建立並返回一個 Celery
應用程式物件。Celery 配置 取自 Flask 配置中的 CELERY
鍵。Celery 應用程式被設定為預設值,以便在每個請求期間都能看到它。Task
子類別會自動在啟用的 Flask 應用程式上下文中執行任務函數,以便可以使用您的資料庫連線等服務。
這是一個基本的 example.py
,它配置 Celery 使用 Redis 進行通訊。我們啟用了一個結果後端,但預設情況下忽略結果。這允許我們僅為我們關心結果的任務儲存結果。
from flask import Flask
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://127.0.0.1",
result_backend="redis://127.0.0.1",
task_ignore_result=True,
),
)
celery_app = celery_init_app(app)
將 celery worker
命令指向此處,它將找到 celery_app
物件。
$ celery -A example worker --loglevel INFO
您也可以執行 celery beat
命令以按排程執行任務。有關定義排程的更多資訊,請參閱 Celery 的文件。
$ celery -A example beat --loglevel INFO
應用程式工廠¶
當使用 Flask 應用程式工廠模式時,請在工廠內部呼叫 celery_init_app
函數。它將 app.extensions["celery"]
設定為 Celery 應用程式物件,可用於從工廠返回的 Flask 應用程式中取得 Celery 應用程式。
def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://127.0.0.1",
result_backend="redis://127.0.0.1",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return app
為了使用 celery
命令,Celery 需要一個應用程式物件,但該物件不再直接可用。建立一個 make_celery.py
檔案,該檔案呼叫 Flask 應用程式工廠並從返回的 Flask 應用程式中取得 Celery 應用程式。
from example import create_app
flask_app = create_app()
celery_app = flask_app.extensions["celery"]
將 celery
命令指向此檔案。
$ celery -A make_celery worker --loglevel INFO
$ celery -A make_celery beat --loglevel INFO
定義任務¶
使用 @celery_app.task
裝飾器裝飾任務函數需要存取 celery_app
物件,當使用工廠模式時,該物件將不可用。這也意味著裝飾的任務與特定的 Flask 和 Celery 應用程式實例綁定,如果您更改測試的配置,這可能會在測試期間成為問題。
相反地,使用 Celery 的 @shared_task
裝飾器。這會建立任務物件,這些物件將存取「當前應用程式」,這是一個類似於 Flask 的藍圖和應用程式上下文的概念。這就是我們在上面呼叫 celery_app.set_default()
的原因。
這是一個範例任務,它將兩個數字相加並返回結果。
from celery import shared_task
@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
return a + b
稍早,我們將 Celery 配置為預設情況下忽略任務結果。由於我們想知道此任務的傳回值,因此我們設定 ignore_result=False
。另一方面,不需要結果的任務(例如發送電子郵件)不會設定此項。
呼叫任務¶
裝飾的函數變成一個任務物件,其中包含在背景中呼叫它的方法。最簡單的方法是使用 delay(*args, **kwargs)
方法。有關更多方法,請參閱 Celery 的文件。
必須執行 Celery worker 才能執行任務。啟動 worker 在前面的章節中已展示。
from flask import request
@app.post("/add")
def start_add() -> dict[str, object]:
a = request.form.get("a", type=int)
b = request.form.get("b", type=int)
result = add_together.delay(a, b)
return {"result_id": result.id}
路由不會立即取得任務的結果。那會透過封鎖回應來破壞目的。相反地,我們返回正在執行的任務的結果 ID,我們稍後可以使用它來取得結果。
取得結果¶
為了取得我們在上面啟動的任務的結果,我們將新增另一個路由,該路由採用我們之前返回的結果 ID。如果任務已完成,我們將返回任務是否完成(就緒)、是否成功完成以及傳回值(或錯誤)。
from celery.result import AsyncResult
@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
result = AsyncResult(id)
return {
"ready": result.ready(),
"successful": result.successful(),
"value": result.result if result.ready() else None,
}
現在您可以使用第一個路由啟動任務,然後使用第二個路由輪詢結果。這可以防止 Flask 請求 worker 被封鎖而等待任務完成。
Flask 儲存庫包含 一個範例,使用 JavaScript 提交任務並輪詢進度和結果。
傳遞資料至任務¶
上面的 “add” 任務採用兩個整數作為參數。為了將參數傳遞給任務,Celery 必須將它們序列化為它可以傳遞給其他程序的格式。因此,不建議傳遞複雜的物件。例如,不可能傳遞 SQLAlchemy 模型物件,因為該物件可能無法序列化,並且與查詢它的會話綁定。
傳遞最少量的資料,以便在任務中擷取或重新建立任何複雜的資料。考慮一個任務,該任務將在登入的使用者要求其資料封存時執行。Flask 請求知道登入的使用者,並且具有從資料庫查詢的使用者物件。它是透過在資料庫中查詢給定的 ID 來取得的,因此任務可以執行相同的操作。傳遞使用者的 ID 而不是使用者物件。
@shared_task
def generate_user_archive(user_id: str) -> None:
user = db.session.get(User, user_id)
...
generate_user_archive.delay(current_user.id)