Celery 是一個支持異步任務處理的強大框架。 @shared_task
裝飾器用於定義任務,特別是使用選項 bind
、autoretry_for
、retry_backoff
和 max_retries
可以大幅提升 任務的穩定性和自動錯誤處理。
在這篇文章中,我們將探討每個選項的運作方式和使用方法,並介紹常見的混淆以及解決這些問題的最佳方法。
1. bind=True
定義
bind=True
會將當前任務作為第一個參數傳遞,以便在任務內部使用 self
。這樣可以訪問任務的狀態、方法、屬性等。
主要功能
- 可訪問任務狀態: 可以訪問任務 ID、請求信息等以檢查狀態或進行日誌記錄。
- 顯式重試邏輯:
可以通過
self.retry()
方法手動實現重試邏輯。
範例
@shared_task(bind=True)
def my_task(self, some_arg):
print(f"任務 ID: {self.request.id}") # 輸出任務 ID
self.retry() # 重試任務
2. autoretry_for=(ExceptionType, ...)
定義
當出現指定的 異常類型 時,設定讓 Celery 自動 重試(retry) 任務。開發者無需顯式調用 self.retry()
來處理異常和自動重試。
注意事項
- 使用
autoretry_for
時: 由於重試是自動進行的,因此要 避免與self.retry()
重複使用。 - 混合使用的問題:
同時使用
autoretry_for
和self.retry()
會對相同的異常產生 重複重試。
範例
推薦方式:僅使用 autoretry_for
import requests
@shared_task(bind=True, autoretry_for=(requests.RequestException,), retry_backoff=True)
def my_task(self, url):
response = requests.get(url)
response.raise_for_status() # 若狀態碼不是 200,則引發異常
僅在特定條件下顯式重試 (self.retry()
) 的情況
import requests
@shared_task(bind=True, retry_backoff=True, max_retries=5)
def my_task(self, url):
try:
response = requests.get(url)
response.raise_for_status()
except requests.RequestException as e:
print(f"因錯誤重試: {e}")
self.retry(exc=e) # 顯式重試
3. retry_backoff=True
定義
啟用 指數退避(Exponential Backoff),使任務重試間隔逐漸增加。第一次重試立即執行,之後的重試間隔為 1 秒、2 秒、4 秒……依此類推。
主要功能
- 減少伺服器負載並有效處理網路故障。
- 退避時間可以透過 Celery 的基本設置自定義。
範例
@shared_task(bind=True, autoretry_for=(requests.RequestException,), retry_backoff=True)
def my_task(self):
# 第一次重試在 1 秒後,第二次重試在 2 秒後...
raise requests.RequestException("模擬失敗")
4. max_retries
定義
限制任務的 最大重試次數。如果任務在指定的次數內仍然未成功,則該任務會被記錄為失敗。
主要功能
- 防止無限重試以限制伺服器資源消耗。
- 根據失敗條件可以記錄任務或執行其他邏輯。
範例
@shared_task(bind=True, autoretry_for=(requests.RequestException,), retry_backoff=True, max_retries=5)
def my_task(self):
raise requests.RequestException("模擬失敗")
5. 混合使用注意事項:autoretry_for
vs self.retry()
正確使用指南
- 使用
autoretry_for
時: 自動重試已設定,因此無需顯式調用self.retry()
。在面對特定異常時,可以編寫簡潔的代碼來重新嘗試任務。 - 使用
self.retry()
時: 當需要在重試前執行額外操作(例如:寫日誌、檢查特定條件)時使用。 注意不要與autoretry_for
重複。
6. 選項摘要
選項 | 說明 |
---|---|
bind=True |
可以通過 self 訪問任務狀態和方法。 |
autoretry_for |
當某個異常發生時,自動重試該任務。 |
retry_backoff |
啟用逐漸增加重試間隔的指數退避(Exponential Backoff)。 |
max_retries |
通過限制最大重試次數來定義任務失敗條件。 |

7. 結論
Celery 的 @shared_task
選項有助於有效處理任務失敗並提高穩定性。
- 使用
autoretry_for
時: 自動化重試邏輯,因此 注意不要與self.retry()
重複使用。 - 如果需要條件邏輯或額外操作,可以使用
self.retry()
。
要穩定地實現基於 Celery 的任務,請組合這些選項並編寫適合場景的優化代碼! 😊
Add a New Comment