在第一篇文章中,我們通過介紹 Celery 的基本需求以及 delay()
方法的工作原理和實際使用示例,提高了大家對於異步任務的理解。 delay()
是一個簡便地將任務投遞到隊列中的強大工具,但有時需要更細緻的控制。這就是今天我們要重點探討的apply_async()
方法的用武之地。
今天我們將深入了解 apply_async()
是什麼,它與 delay()
的不同之處,以及如何利用各種選項更有效地管理異步任務。
1. apply_async()
,開啟更多可能性的工具
apply_async()
是用於異步執行 Celery 任務的另一個強大方法。它在表面上似乎與 delay()
相似,但提供了任務執行的時間、方式,以及隊列設置等多種選項,為開發者提供了更多的靈活性。
delay()
簡單地作為 apply_async(args=args)
的調用包裝器函數,記住這一點,我們可以認為 apply_async()
是一個包含 delay()
所有功能的擴展版本,同時提供了更多的控制權。
2. apply_async()
的工作原理:精緻的調整
apply_async()
的基本工作方式與 delay()
相似。核心流程是將包含任務名稱和參數的消息發送到消息代理,然後工作進程接收並執行它。但是 apply_async()
允許通過多種參數(parameters) 精細調整任務執行方式。
主要參數如下:
args
:將傳遞給任務函數的位置參數列表或元組。kwargs
:將傳遞給任務函數的關鍵字參數的字典。countdown
:將任務推遲指定的秒數後執行,而不是立即執行。例如,設置countdown=60
將使任務在調用後 60 秒加入隊列。eta
(預計到達時間):將任務安排在特定未來時間執行。以datetime
對象的形式指定。countdown
與eta
互斥。expires
:設置任務的到期時間。在指定時間後,工作進程將忽略該任務而不執行它。可以指定為int
(秒)、float
(秒)或datetime
對象。這在處理暫時數據或有截止日期的任務時非常有用。queue
:將任務路由到特定名稱的隊列。 Celery 可以設置多個隊列,以根據任務的種類或重要性在不同的工作組中進行處理。routing_key
:根據消息代理的路由規則將任務發送到特定隊列。當使用 RabbitMQ 等消息代理的高級路由設置時會使用到此參數。priority
:設置任務的優先級。根據代理和工作進程的設置,高優先級的任務可以先被處理。通常,優先級取值從 0(最高優先級)到 9(最低優先級)的整數。serializer
:指定用於序列化任務消息的方式('json', 'pickle', 'yaml' 等)。compression
:指定用於壓縮任務消息的方式('gzip', 'bzip2' 等)。headers
:可選擇在任務消息中包含額外的頭部信息。link
:指定當當前任務成功完成後要執行的回調任務(callback task)。用於實現鏈接(chaining)。link_error
:指定當前任務失敗時要執行的錯誤回調任務(error callback task)。
通過這些各種參數,apply_async()
可以比 delay()
更精細地控制任務執行的時間,路由到特定工作組,以及管理任務優先級,以滿足複雜的異步處理需求。
3. apply_async()
使用示例:情境專用
現在讓我們通過多種選項的使用示例來親自確認 apply_async()
的強大。
示例 1:在特定時間後執行 (countdown
)
當用戶在註冊後 3 天發送歡迎郵件的情況:
send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)
使用 countdown
選項使任務不會立即加入隊列,而是在指定的時間(3天)後加入隊列,然後由工作進程處理。
示例 2:在特定時間執行 (eta
)
當需要每天凌晨 3 點生成日報的任務:
from datetime import datetime, timedelta
target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # 明天凌晨 3 點
generate_daily_report.apply_async(eta=target_time)
使用 eta
選項將任務準確地安排在指定時間執行。
示例 3:設置到期時間 (expires
)
當外部 API 調用的結果僅在 10 分鐘內有效,之後不需要執行任務:
from datetime import datetime, timedelta
expires_at = datetime.now() + timedelta(minutes=10)
fetch_external_data.apply_async(args=[api_url], expires=expires_at)
通過 expires
選項,可以防止任務不必要地執行,從而節約資源。
示例 4:路由到特定隊列 (queue
)
當希望將大量使用 CPU 的圖像處理任務分配給單獨的工作組處理時:
process_image.apply_async(args=[image_id], queue='image_processing')
通過 Celery 設置,可以單獨構建監控 image_processing
隊列的工作進程,這樣只有該工作組才能處理圖像處理任務。
示例 5:連接回調任務 (link
)
當希望在支付處理任務成功完成後自動執行發送通知郵件的任務時:
如果不使用 apply_async
,則可能會像這樣直接在函數中實現支付處理邏輯,然後在內部調用 .delay()
發送郵件。
def process_payment(order_id):
# 編寫支付處理邏輯
send_notification_email.delay(order_id)
但如果使用 apply_async()
,則可以這樣編寫。
@shared_task
def process_payment(order_id):
# 支付處理邏輯
return order_id
@shared_task
def send_notification_email(order_id):
# 發送通知郵件的邏輯
payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())
這裡的 .s()
是為 send_notification_email
任務創建的簽名(signature)。一旦 process_payment
任務成功完成,其返回值(在這裡是 order_id
)將作為參數傳遞給 send_notification_email
任務以執行。
4. delay()
與 apply_async()
:何時選擇什麼?
現在,您應該明確理解 delay()
和 apply_async()
的區別及各自的優勢。那么,實際上在什麼情況下選擇哪個方法呢?
-
delay()
:- 當最簡單地執行異步任務時
- 當不需要特別控制任務執行時機時
- 當認為代碼的簡潔性很重要時
-
apply_async()
:- 當需要將任務執行安排在特定時間之後或特定時刻時(
countdown
、eta
) - 當需要設置任務的到期時間以防止不必要的執行時(
expires
) - 當希望將任務路由到特定隊列從而控制工作進程時(
queue
) - 當需要管理任務的優先級時(
priority
) - 當需要明確指定任務的序列化或壓縮方式時(
serializer
、compression
) - 當希望設置成功或失敗時自動執行其他任務的回調時(
link
、link_error
)
- 當需要將任務執行安排在特定時間之後或特定時刻時(
通常,對於簡單的異步處理,使用 delay()
更為便捷,而對於更複雜和需要細緻控制的情況,推薦使用 apply_async()
。 delay()
是 apply_async()
的簡化版本,因此在任何情況下都可以使用 apply_async()
實現相同(或更多)的功能。
結語
今天我們深入探討了 Celery 的強大工具 apply_async()
方法,並檢視了它與 delay()
的關係及各種使用法。 apply_async() 使異步任務的管理變得更加精緻,使 Celery 能夠有效地滿足服務的需求。
在接下來的最後一篇中,我們將比較分析定義 Celery 任務的核心裝飾器 @app.task
和 @shared_task
,幫助您選擇最適合項目結構和開發理念的任務定義方式。感謝您一直陪伴 Celery 的旅程!
未閱讀之前的文章的朋友們,請查看下面的鏈接!
目前沒有評論。