Im ersten Teil haben wir die grundlegenden Anforderungen an Celery sowie die Funktionsweise der delay()
-Methode untersucht und anhand von praktischen Beispielen Ihr Verständnis von asynchronen Aufgaben vertieft. delay()
ist ein leistungsstarkes Werkzeug, um einfach Aufgaben in die Warteschlange zu stellen, aber manchmal benötigen wir eine genauere Kontrolle. Genau dann kommt die apply_async()
-Methode ins Spiel, die wir heute eingehend erkunden werden.
Heute werden wir herausfinden, was apply_async()
ist, wie es sich von delay()
unterscheidet und wie man verschiedene Optionen nutzt, um asynchrone Aufgaben effektiver zu verwalten.
1. apply_async()
: Ein Werkzeug, das mehr Möglichkeiten eröffnet
apply_async()
ist eine weitere leistungsstarke Methode, um Celery-Aufgaben asynchron auszuführen. Auf den ersten Blick sieht es delay()
ähnlich aus, bietet jedoch weit mehr Optionen für den Zeitpunkt, die Art der Ausführung und die Warteschlangeneinstellungen, die den Entwicklern mehr Flexibilität bieten.
Wenn Sie sich daran erinnern, dass delay()
einfach eine Wrapper-Funktion ist, die in der Form apply_async(args=args)
aufgerufen wird, können Sie apply_async()
als erweiterte Version von delay()
betrachten, die alle Funktionen von delay()
enthält und dabei mehr Kontrolle bietet.
2. Die Funktionsweise von apply_async()
: Feine Steuerung
Die grundlegende Funktionsweise von apply_async()
ähnelt der von delay()
. Der Hauptfluss besteht darin, eine Nachricht mit dem Namen der Aufgabe und den Argumenten an den Nachrichtenbroker zu senden, den der Worker dann empfängt und ausführt. Aber apply_async()
ermöglicht es, die Ausführungsweise der Aufgabe durch verschiedene Parameter während des Prozesses der Nachrichtenverarbeitung und -übertragung präzise zu steuern.
Die Hauptparameter sind:
args
: Eine Liste oder ein Tupel von Positionsargumenten, die an die Aufgabenfunktion übergeben werden.kwargs
: Ein Wörterbuch von Schlüsselwortargumenten, die an die Aufgabenfunktion übergeben werden.countdown
: Führt die Aufgabe nicht sofort aus, sondern wartet eine angegebene Anzahl von Sekunden. Wenn zum Beispielcountdown=60
gesetzt ist, wird die Aufgabe 60 Sekunden nach dem Aufruf zur Warteschlange hinzugefügt.eta
(Estimated Time of Arrival): Plant die Aufgabe für einen bestimmten Zukunftspunkt. Wird in Form einesdatetime
-Objekts angegeben.countdown
undeta
sind gegenseitig ausschließend.expires
: Setzt eine Ablaufzeit für die Aufgabe. Nach Ablauf dieser Zeit ignoriert der Worker die Aufgabe und führt sie nicht aus. Kann alsint
(Sekunden),float
(Sekunden) oderdatetime
-Objekt angegeben werden. Nützlich für temporäre Daten oder Aufgaben mit einer Frist.queue
: Leitet die Aufgabe an eine bestimmte Warteschlange mit diesem Namen weiter. Celery kann mehrere Warteschlangen einrichten, um Aufgaben je nach Typ oder Wichtigkeit von verschiedenen Worker-Gruppen bearbeiten zu lassen.routing_key
: Sendet die Aufgabe an eine bestimmte Warteschlange entsprechend den Routing-Regeln des Nachrichtenbrokers. Wird verwendet, wenn fortgeschrittene Routing-Einstellungen in einem Nachrichtenbroker wie RabbitMQ verwendet werden.priority
: Setzt die Priorität der Aufgabe. Je nach Broker- und Worker-Einstellungen kann die Aufgabe mit höherer Priorität zuerst bearbeitet werden. In der Regel hat sie ganzzahlige Werte von 0 (höchste Priorität) bis 9 (niedrigste Priorität).serializer
: Gibt an, welche Methode zum Serialisieren der Aufgabenmeldung verwendet werden soll ('json', 'pickle', 'yaml' usw.).compression
: Gibt die Methode an, die zum Komprimieren der Aufgabenmeldung verwendet wird ('gzip', 'bzip2' usw.).headers
: Ermöglicht das Hinzufügen von weiteren Header-Informationen zur Aufgabenmeldung.link
: Gibt an, welche Callback-Aufgabe ausgeführt wird, wenn die aktuelle Aufgabe erfolgreich abgeschlossen ist. Wird zur Implementierung von Chaining verwendet.link_error
: Gibt die Fehler-Callback-Aufgabe an, die ausgeführt wird, wenn die aktuelle Aufgabe fehlschlägt.
Durch diese Vielzahl von Parametern ermöglicht apply_async()
, den Ausführungszeitpunkt von Aufgaben viel feiner zu steuern, die Aufgaben an bestimmte Worker-Gruppen weiterzuleiten und die Priorität der Aufgaben zu verwalten, um so komplexen Anforderungen an die asynchrone Verarbeitung gerecht zu werden.
3. Beispiele für die Verwendung von apply_async()
: Situationsspezifische Anwendungen
Jetzt werden wir die Kraft von apply_async()
anhand von verschiedenen Beispielen nutzen, die die Nutzung der verschiedenen Optionen demonstrieren.
Beispiel 1: Ausführung nach einer bestimmten Zeit (countdown
)
Wenn ein Benutzer nach der Anmeldung 3 Tage später eine Willkommens-E-Mail erhalten soll:
send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)
Mit der countdown
-Option wird die Aufgabe nicht sofort zur Warteschlange hinzugefügt, sondern nach der angegebenen Zeit (3 Tage) hinzugefügt und von einem Worker bearbeitet.
Beispiel 2: Ausführung zu einem bestimmten Zeitpunkt (eta
)
Wenn eine Aufgabe jeden Tag um 3 Uhr morgens einen täglichen Bericht erstellen soll:
from datetime import datetime, timedelta
target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # Nächster Tag um 3 Uhr morgens
generate_daily_report.apply_async(eta=target_time)
Mit der eta
-Option kann die Aufgabe genau zu dem angegebenen Zeitpunkt ausgeführt werden.
Beispiel 3: Ablaufzeit festlegen (expires
)
Wenn das Ergebnis eines externen API-Aufrufs nur für 10 Minuten gültig ist und die Aufgabe danach nicht mehr ausgeführt werden muss:
from datetime import datetime, timedelta
expires_at = datetime.now() + timedelta(minutes=10)
fetch_external_data.apply_async(args=[api_url], expires=expires_at)
Mit der expires
-Option kann verhindert werden, dass die Aufgabe unnötig ausgeführt wird, sodass Ressourcen gespart werden.
Beispiel 4: Routing zu einer bestimmten Warteschlange (queue
)
Wenn Sie eine CPU-intensive Bildverarbeitungsaufgabe in einer separaten Worker-Gruppe bearbeiten möchten:
process_image.apply_async(args=[image_id], queue='image_processing')
Wenn Sie in Ihrer Celery-Konfiguration Worker speziell für die image_processing
-Warteschlange einrichten, werden Bildverarbeitungsaufgaben ausschließlich von dieser Worker-Gruppe bearbeitet.
Beispiel 5: Verknüpfung von Callback-Aufgaben (link
)
Wenn Sie möchten, dass eine Aufgabe zur Benachrichtigung per E-Mail automatisch ausgeführt wird, sobald die Zahlung erfolgreich verarbeitet wurde:
Ohne apply_async
würde dies wahrscheinlich so aussehen, dass Sie die Zahlungsabwicklung in eine Funktion implementieren und innerhalb dieser Funktion .delay()
aufrufen, um die E-Mail zu senden.
def process_payment(order_id):
# Zahlungsabwicklungslogik
send_notification_email.delay(order_id)
Mit apply_async()
könnte dies jedoch so aussehen:
@shared_task
def process_payment(order_id):
# Zahlungsabwicklungslogik
return order_id
@shared_task
def send_notification_email(order_id):
# Logik zum Senden der Benachrichtigungs-E-Mail
payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())
Hierbei generiert .s()
die Signatur der send_notification_email
-Aufgabe. Wenn die process_payment
-Aufgabe erfolgreich abgeschlossen ist, wird der Rückgabewert (in diesem Fall order_id
) als Argument an die send_notification_email
-Aufgabe übergeben.
4. delay()
und apply_async()
: Wann sollte was gewählt werden?
Jetzt haben Sie die Unterschiede zwischen delay()
und apply_async()
sowie die Vorteile beider Methoden klar verstanden. In welchen Situationen sollten Sie also welche Methode wählen?
-
delay()
:- Wenn Sie die Aufgabe am einfachsten asynchron ausführen möchten
- Wenn keine besondere Kontrolle über den Zeitpunkt der Ausführung der Aufgabe erforderlich ist
- Wenn Sie auf die Klarheit des Codes Wert legen
-
apply_async()
:- Wenn die Ausführung der Aufgabe für einen bestimmten Zeitpunkt oder nach einer bestimmten Zeit geplant werden muss (
countdown
,eta
) - Wenn Sie die Ablaufzeit der Aufgabe festlegen möchten, um unnötige Ausführungen zu verhindern (
expires
) - Wenn Sie die Aufgabe an eine bestimmte Warteschlange weiterleiten möchten, um die Gruppe von Workern zu steuern (
queue
) - Wenn Sie die Priorität der Aufgaben verwalten müssen (
priority
) - Wenn Sie die Serialisierungs- oder Komprimierungsverfahren explizit angeben müssen (
serializer
,compression
) - Wenn Sie Callbacks für das automatische Ausführen anderer Aufgaben bei Erfolg oder Misserfolg der Aufgabe einrichten möchten (
link
,link_error
)
- Wenn die Ausführung der Aufgabe für einen bestimmten Zeitpunkt oder nach einer bestimmten Zeit geplant werden muss (
Im Allgemeinen ist es am bequemsten, delay()
für einfache asynchrone Aufgaben zu verwenden, während apply_async()
für komplexere und genauere Anforderungen empfehlenswert ist. Da delay()
eine vereinfachte Version von apply_async()
ist, können Sie in jedem Fall apply_async()
verwenden, um dasselbe (oder mehr) zu erreichen.
Abschließend
Heute haben wir das mächtige Werkzeug apply_async()
von Celery eingehend untersucht und die Beziehung zu delay()
sowie verschiedene Anwendungsmöglichkeiten betrachtet. apply_async()
ermöglicht eine präzisere Verwaltung von asynchronen Aufgaben und eine effektive Nutzung von Celery, angepasst an die Anforderungen des Dienstes.
Im nächsten und letzten Teil werden wir den wichtigen Dekorator @app.task
und @shared_task
zur Definition von Celery-Aufgaben vergleichen, um Ihnen bei der Auswahl der geeigneten Methode zur Aufgabenbeschreibung in Bezug auf die Projektstruktur und Entwicklungskultur zu helfen. Vielen Dank, dass Sie uns auf dieser Celery-Reise begleiten!
Wenn Sie die vorherigen Artikel nicht gesehen haben, überprüfen Sie bitte den folgenden Link!
# Die Magie von Celery: Die Geheimnisse von delay() enthüllen
Es sind keine Kommentare vorhanden.