在第一篇文章中,我們通過介紹 Celery 的基本需求以及 delay() 方法的工作原理和實際使用示例,提高了大家對於異步任務的理解。 delay() 是一個簡便地將任務投遞到隊列中的強大工具,但有時需要更細緻的控制。這就是今天我們要重點探討的apply_async() 方法的用武之地。

今天我們將深入了解 apply_async() 是什麼,它與 delay() 的不同之處,以及如何利用各種選項更有效地管理異步任務。

Celery cartoon image working hard


1. apply_async(),開啟更多可能性的工具

apply_async() 是用於異步執行 Celery 任務的另一個強大方法。它在表面上似乎與 delay() 相似,但提供了任務執行的時間、方式,以及隊列設置等多種選項,為開發者提供了更多的靈活性。

delay() 簡單地作為 apply_async(args=args) 的調用包裝器函數,記住這一點,我們可以認為 apply_async() 是一個包含 delay() 所有功能的擴展版本,同時提供了更多的控制權。


2. apply_async() 的工作原理:精緻的調整

apply_async() 的基本工作方式與 delay() 相似。核心流程是將包含任務名稱和參數的消息發送到消息代理,然後工作進程接收並執行它。但是 apply_async() 允許通過多種參數(parameters) 精細調整任務執行方式。

主要參數如下:

  • args:將傳遞給任務函數的位置參數列表或元組。
  • kwargs:將傳遞給任務函數的關鍵字參數的字典。
  • countdown:將任務推遲指定的秒數後執行,而不是立即執行。例如,設置 countdown=60 將使任務在調用後 60 秒加入隊列。
  • eta (預計到達時間):將任務安排在特定未來時間執行。以 datetime 對象的形式指定。 countdowneta 互斥。
  • expires:設置任務的到期時間。在指定時間後,工作進程將忽略該任務而不執行它。可以指定為 int(秒)、float(秒)或 datetime 對象。這在處理暫時數據或有截止日期的任務時非常有用。
  • queue:將任務路由到特定名稱的隊列。 Celery 可以設置多個隊列,以根據任務的種類或重要性在不同的工作組中進行處理。
  • routing_key:根據消息代理的路由規則將任務發送到特定隊列。當使用 RabbitMQ 等消息代理的高級路由設置時會使用到此參數。
  • priority:設置任務的優先級。根據代理和工作進程的設置,高優先級的任務可以先被處理。通常,優先級取值從 0(最高優先級)到 9(最低優先級)的整數。
  • serializer:指定用於序列化任務消息的方式('json', 'pickle', 'yaml' 等)。
  • compression:指定用於壓縮任務消息的方式('gzip', 'bzip2' 等)。
  • headers:可選擇在任務消息中包含額外的頭部信息。
  • link:指定當當前任務成功完成後要執行的回調任務(callback task)。用於實現鏈接(chaining)。
  • link_error:指定當前任務失敗時要執行的錯誤回調任務(error callback task)

通過這些各種參數,apply_async() 可以比 delay() 更精細地控制任務執行的時間,路由到特定工作組,以及管理任務優先級,以滿足複雜的異步處理需求。


3. apply_async() 使用示例:情境專用

現在讓我們通過多種選項的使用示例來親自確認 apply_async() 的強大。

示例 1:在特定時間後執行 (countdown)

當用戶在註冊後 3 天發送歡迎郵件的情況:

send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)

使用 countdown 選項使任務不會立即加入隊列,而是在指定的時間(3天)後加入隊列,然後由工作進程處理。

示例 2:在特定時間執行 (eta)

當需要每天凌晨 3 點生成日報的任務:

from datetime import datetime, timedelta

target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # 明天凌晨 3 點

generate_daily_report.apply_async(eta=target_time)

使用 eta 選項將任務準確地安排在指定時間執行。

示例 3:設置到期時間 (expires)

當外部 API 調用的結果僅在 10 分鐘內有效,之後不需要執行任務:

from datetime import datetime, timedelta

expires_at = datetime.now() + timedelta(minutes=10)

fetch_external_data.apply_async(args=[api_url], expires=expires_at)

通過 expires 選項,可以防止任務不必要地執行,從而節約資源。

示例 4:路由到特定隊列 (queue)

當希望將大量使用 CPU 的圖像處理任務分配給單獨的工作組處理時:

process_image.apply_async(args=[image_id], queue='image_processing')

通過 Celery 設置,可以單獨構建監控 image_processing 隊列的工作進程,這樣只有該工作組才能處理圖像處理任務。

示例 5:連接回調任務 (link)

當希望在支付處理任務成功完成後自動執行發送通知郵件的任務時: 如果不使用 apply_async,則可能會像這樣直接在函數中實現支付處理邏輯,然後在內部調用 .delay() 發送郵件。

def process_payment(order_id):
    # 編寫支付處理邏輯
    send_notification_email.delay(order_id)

但如果使用 apply_async(),則可以這樣編寫。

@shared_task
def process_payment(order_id):
    # 支付處理邏輯
    return order_id

@shared_task
def send_notification_email(order_id):
    # 發送通知郵件的邏輯

payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())

這裡的 .s() 是為 send_notification_email 任務創建的簽名(signature)。一旦 process_payment 任務成功完成,其返回值(在這裡是 order_id)將作為參數傳遞給 send_notification_email 任務以執行。


4. delay()apply_async():何時選擇什麼?

現在,您應該明確理解 delay()apply_async() 的區別及各自的優勢。那么,實際上在什麼情況下選擇哪個方法呢?

  • delay()

    • 當最簡單地執行異步任務時
    • 當不需要特別控制任務執行時機時
    • 當認為代碼的簡潔性很重要時
  • apply_async()

    • 當需要將任務執行安排在特定時間之後或特定時刻時(countdowneta
    • 當需要設置任務的到期時間以防止不必要的執行時(expires
    • 當希望將任務路由到特定隊列從而控制工作進程時(queue
    • 當需要管理任務的優先級時(priority
    • 當需要明確指定任務的序列化或壓縮方式時(serializercompression
    • 當希望設置成功或失敗時自動執行其他任務的回調時(linklink_error

通常,對於簡單的異步處理,使用 delay() 更為便捷,而對於更複雜和需要細緻控制的情況,推薦使用 apply_async()delay()apply_async() 的簡化版本,因此在任何情況下都可以使用 apply_async() 實現相同(或更多)的功能。


結語

今天我們深入探討了 Celery 的強大工具 apply_async() 方法,並檢視了它與 delay() 的關係及各種使用法。 apply_async() 使異步任務的管理變得更加精緻,使 Celery 能夠有效地滿足服務的需求。

在接下來的最後一篇中,我們將比較分析定義 Celery 任務的核心裝飾器 @app.task@shared_task,幫助您選擇最適合項目結構和開發理念的任務定義方式。感謝您一直陪伴 Celery 的旅程!

未閱讀之前的文章的朋友們,請查看下面的鏈接!

# Celery 的魔法_ 探討 delay() 的表面與內涵