Celery 是一個支持異步任務處理的強大框架。 @shared_task 裝飾器用於定義任務,特別是使用選項 bindautoretry_forretry_backoffmax_retries 可以大幅提升 任務的穩定性和自動錯誤處理

在這篇文章中,我們將探討每個選項的運作方式和使用方法,並介紹常見的混淆以及解決這些問題的最佳方法。


1. bind=True

定義

bind=True 會將當前任務作為第一個參數傳遞,以便在任務內部使用 self。這樣可以訪問任務的狀態、方法、屬性等。

主要功能

  • 可訪問任務狀態: 可以訪問任務 ID、請求信息等以檢查狀態或進行日誌記錄。
  • 顯式重試邏輯: 可以通過 self.retry() 方法手動實現重試邏輯。

範例

@shared_task(bind=True)
def my_task(self, some_arg):
    print(f"任務 ID: {self.request.id}")  # 輸出任務 ID
    self.retry()  # 重試任務

2. autoretry_for=(ExceptionType, ...)

定義

當出現指定的 異常類型 時,設定讓 Celery 自動 重試(retry) 任務。開發者無需顯式調用 self.retry() 來處理異常和自動重試。

注意事項

  • 使用 autoretry_for: 由於重試是自動進行的,因此要 避免與 self.retry() 重複使用
  • 混合使用的問題: 同時使用 autoretry_forself.retry() 會對相同的異常產生 重複重試

範例

推薦方式:僅使用 autoretry_for
import requests

@shared_task(bind=True, autoretry_for=(requests.RequestException,), retry_backoff=True)
def my_task(self, url):
    response = requests.get(url)
    response.raise_for_status()  # 若狀態碼不是 200,則引發異常
僅在特定條件下顯式重試 (self.retry()) 的情況
import requests

@shared_task(bind=True, retry_backoff=True, max_retries=5)
def my_task(self, url):
    try:
        response = requests.get(url)
        response.raise_for_status()
    except requests.RequestException as e:
        print(f"因錯誤重試: {e}")
        self.retry(exc=e)  # 顯式重試

3. retry_backoff=True

定義

啟用 指數退避(Exponential Backoff),使任務重試間隔逐漸增加。第一次重試立即執行,之後的重試間隔為 1 秒、2 秒、4 秒……依此類推。

主要功能

  • 減少伺服器負載並有效處理網路故障。
  • 退避時間可以透過 Celery 的基本設置自定義。

範例

@shared_task(bind=True, autoretry_for=(requests.RequestException,), retry_backoff=True)
def my_task(self):
    # 第一次重試在 1 秒後,第二次重試在 2 秒後...
    raise requests.RequestException("模擬失敗")

4. max_retries

定義

限制任務的 最大重試次數。如果任務在指定的次數內仍然未成功,則該任務會被記錄為失敗。

主要功能

  • 防止無限重試以限制伺服器資源消耗。
  • 根據失敗條件可以記錄任務或執行其他邏輯。

範例

@shared_task(bind=True, autoretry_for=(requests.RequestException,), retry_backoff=True, max_retries=5)
def my_task(self):
    raise requests.RequestException("模擬失敗")

5. 混合使用注意事項:autoretry_for vs self.retry()

正確使用指南

  1. 使用 autoretry_for: 自動重試已設定,因此無需顯式調用 self.retry()。在面對特定異常時,可以編寫簡潔的代碼來重新嘗試任務。
  2. 使用 self.retry(): 當需要在重試前執行額外操作(例如:寫日誌、檢查特定條件)時使用。 注意不要與 autoretry_for 重複

6. 選項摘要

選項 說明
bind=True 可以通過 self 訪問任務狀態和方法。
autoretry_for 當某個異常發生時,自動重試該任務。
retry_backoff 啟用逐漸增加重試間隔的指數退避(Exponential Backoff)。
max_retries 通過限制最大重試次數來定義任務失敗條件。
Understanding Celery @shared_task Options

7. 結論

Celery 的 @shared_task 選項有助於有效處理任務失敗並提高穩定性。

  • 使用 autoretry_for: 自動化重試邏輯,因此 注意不要與 self.retry() 重複使用
  • 如果需要條件邏輯或額外操作,可以使用 self.retry()

要穩定地實現基於 Celery 的任務,請組合這些選項並編寫適合場景的優化代碼! 😊