問題概述

DRF 伺服器向 Django 應用伺服器發送 webhook POST 請求,
Django 在接收請求並創建 Post 模型實例的過程中出現了以下問題。

  1. Webhook 回應超時
  2. 在創建 Post 對象後,由於 ManyToMany 欄位 categoriestags 的處理需要耗時
  3. 這導致 webhook 回應延遲,DRF 將此視為失敗

  4. 數據一致性未得到保障

  5. 如果在後續處理中立即調用 Celery 任務,則會在 tags.add(...)categories.add(...) 工作尚未完成的情況下發出調用
  6. 這將導致 Celery 處理不完整的數據的問題

  7. on_commit() 之後 Celery 接收到空數據

  8. 儘管使用 on_commit() 確保數據的一致性後調度了 Celery 任務,但在任務內部 tags.all() 的查詢結果仍然顯示為空列表
  9. 這是因為 Celery 將讀取操作發送到了副本 DB,而主從同步過程中存在延遲

解決策略

  1. Post 創建後立即返回回應
  2. 僅創建 Post 實例並立即向 DRF 返回 202 Accepted 回應,以避免超時問題

  3. 在單獨的線程中處理後續工作

  4. 利用 threading.Thread() 將關聯處理和 Celery 的調用與主流程分隔開來
  5. 設計避免 webhook 回應變慢

  6. 在 post_process 內保證事務

  7. 使用 transaction.atomic()ManyToMany 欄位處理的整個過程捆綁為事務
  8. 任務完成後使用 transaction.on_commit() 調度 Celery 任務
  9. 確保在關聯處理完成後,Celery 得以執行

  10. Celery 內部直接指定主 DB 進行查詢

  11. 為了防止由於副本延遲所引起的一致性問題,在 Celery 任務內明確使用 using('default')
post = Post.objects.using('default').get(id=post_id)
tags = post.tags.using('default').all()
categories = post.categories.using('default').all()

也可以利用 DB Router 使 Celery 請求始終查詢主 DB


最終結構總結

  1. DRF 伺服器 → Django webhook 請求
  2. Django:
    • 創建 Post 對象後立即返回回應
    • 後續工作在單獨的線程中執行
  3. 線程內部:
    • 使用 atomic() 進行關聯整理
    • 使用 on_commit() 調度 Celery 任務
  4. Celery:
    • 從主 DB(default)查詢數據

結論

所有組件在單獨的情況下都是正常的,但為了解決分散式架構環境中出現的延遲和數據一致性問題,
不僅需要考慮代碼結構,還需要進行 數據流和時機以及 DB 副本延遲的系統設計