In het eerste deel hebben we de basisbehoeften van Celery en de werking van de delay() methode bekeken, en via praktische voorbeelden uw begrip van asynchrone taken vergroot. delay() is een krachtig hulpmiddel om taken eenvoudig naar de wachtrij te sturen, maar soms is er behoefte aan meer gedetailleerde controle. Dit is het moment waarop de methode apply_async() zijn waarde bewijst, waar we vandaag dieper op in zullen gaan.

Vandaag zullen we kijken naar wat apply_async() is, hoe het verschilt van delay(), en hoe we met verschillende opties asynchrone taken effectiever kunnen beheren.

Celery cartoon image working hard


1. apply_async(), een hulpmiddel dat meer mogelijkheden biedt

apply_async() is een andere krachtige methode die het mogelijk maakt om Celery taken asynchroon uit te voeren. Het lijkt op delay(), maar biedt veel meer opties zoals het exacte moment van uitvoering, de methode en queuinstellingen, wat ontwikkelaars meer flexibiliteit geeft.

Houd er rekening mee dat delay() een eenvoudige wrapper functie is die apply_async(args=args) aanroept, waardoor apply_async() een uitgebreide versie is van delay() die alle functies van delay() bevat, maar met meer controle.


2. De werking van apply_async(): verfijnde afstemming

De basiswerking van apply_async() is vergelijkbaar met die van delay(). Het belangrijkste proces is het verzenden van een boodschap met de taaknaam en argumenten naar de berichtenbroker, waarbij de werkers deze ontvangen en uitvoeren. Wat apply_async() echter uniek maakt, is dat het meer parameters biedt die de uitvoeringswijze van de taak kunnen verfijnen tijdens het maken en verzenden van berichten.

Belangrijke parameters zijn:

  • args: Een lijst of tuple van positionele argumenten die aan de taakfunctie worden doorgegeven.
  • kwargs: Een woordenboek van keyword argumenten die aan de taakfunctie worden doorgegeven.
  • countdown: Voert de taak uit na een opgegeven aantal seconden vertraging. Als u bijvoorbeeld countdown=60 instelt, wordt de taak 60 seconden na het aanroepen aan de wachtrij toegevoegd.
  • eta (Geschatte Tijd van Aankomst): Plant de uitvoering van de taak op een bepaalde toekomstige tijd. Dit wordt opgegeven als een datetime object. countdown en eta zijn wederzijds exclusief.
  • expires: Stelt een vervaldatum voor de taak in. Na de opgegeven tijd zal de werker deze taak negeren en niet uitvoeren. Dit kan worden ingesteld als int (in seconden), float (in seconden) of een datetime object. Dit is handig voor tijdelijke gegevens of taken met een deadline.
  • queue: Routert de taak naar een specifieke naam van een wachtrij. Celery kan meerdere wachtrijen instellen zodat taken door verschillende werkgroepen op basis van soort of belangrijkheid kunnen worden behandeld.
  • routing_key: Verzendt de taak naar een specifieke wachtrij volgens de routeringsregels van de berichtenbroker. Dit wordt gebruikt bij het toepassen van geavanceerde routeringsinstellingen in brokers zoals RabbitMQ.
  • priority: Stelt de prioriteit van de taak in. De prioriteiten kunnen met de instellingen van de broker en de werker variëren, waarbij hogere prioriteiten eerder kunnen worden verwerkt. Gewoonlijk zijn dit gehele getallen van 0 (de hoogste prioriteit) tot 9 (de laagste prioriteit).
  • serializer: Bepaalt de manier waarop de taakberichten worden geserialiseerd ('json', 'pickle', 'yaml', enz.).
  • compression: Bepaalt de wijze van compressie voor taakberichten ('gzip', 'bzip2', enz.).
  • headers: Hiermee kunnen extra header-informatie aan taakberichten worden toegevoegd.
  • link: Specificeert een callback-taak die wordt uitgevoerd wanneer de huidige taak succesvol is voltooid. Dit wordt gebruikt om chaining te implementeren.
  • link_error: Specificeert een foutcallback-taak die wordt uitgevoerd als de huidige taak faalt.

Door deze diverse parameters kunnen we met apply_async() de uitvoeringstijden van taken veel nauwkeuriger beheren dan met delay(), taken routeren naar specifieke werkgroepen, de prioriteit van taken beheren en voldoen aan complexe eisen voor asynchrone verwerking.


3. Voorbeelden van het gebruik van apply_async(): situatie-specifieke toepassingen

Laten we de kracht van apply_async() zelf aan de hand van verschillende opties verkennen.

Voorbeeld 1: Uitvoeren na een bepaalde tijd (countdown)

Als een gebruiker welkomse-mails moet ontvangen 3 dagen na registratie:

send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)

Met de countdown optie voegt de taak zich niet onmiddellijk toe aan de wachtrij, maar wordt deze toegevoegd na de opgegeven tijd (3 dagen) en verwerkt door de werker.

Voorbeeld 2: Uitvoeren op een specifieke tijd (eta)

Als we een taak moeten uitvoeren die dagelijks om 3 uur 's nachts een rapport genereert:

from datetime import datetime, timedelta

target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # Volgende ochtend 3 uur

generate_daily_report.apply_async(eta=target_time)

Met de eta optie kan de taak Exact op het opgegeven tijdstip worden uitgevoerd.

Voorbeeld 3: Instellen van een vervaldatum (expires)

Als het resultaat van een externe API-aanroep slechts 10 minuten geldig is, is het niet nodig de taak uit te voeren daarna:

from datetime import datetime, timedelta

expires_at = datetime.now() + timedelta(minutes=10)

fetch_external_data.apply_async(args=[api_url], expires=expires_at)

Met de expires optie kan de uitvoering van onnodige taken worden voorkomen, wat helpt om middelen te besparen.

Voorbeeld 4: Routeren naar een specifieke wachtrij (queue)

Als we een taak voor beeldverwerking die veel CPU verbruikt, door een aparte werkgroep willen laten uitvoeren:

process_image.apply_async(args=[image_id], queue='image_processing')

Door de Celery-instellingen te gebruiken om de werklieden specifiek voor de image_processing wachtrij in te stellen, worden beeldverwerkingstaken uitsluitend door die werkgroep verwerkt.

Voorbeeld 5: Verbind een callback-taak (link)

Als we een taak voor betalingsverwerking hebben die na succesvolle voltooiing automatisch een notificatiemail wil sturen: als we apply_async niet gebruiken, zou de logica voor betalingsverwerking wellicht in de functie zelf zijn geïmplementeerd, en zou de functie .delay() voor het verzenden van e-mails worden aangeroepen:

def process_payment(order_id):
    # Logica voor het verwerken van de betaling
    send_notification_email.delay(order_id)

Met apply_async() kunnen we dit echter als volgt schrijven:

@shared_task
def process_payment(order_id):
    # Logica voor het verwerken van de betaling
    return order_id

@shared_task
def send_notification_email(order_id):
    # Logica voor het verzenden van de notificatiemail

payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())

Hier genereert .s() de handtekening van de send_notification_email taak. Wanneer de process_payment taak succesvol is voltooid, wordt de teruggegeven waarde (hier order_id) als argument doorgegeven aan de send_notification_email taak.


4. delay() en apply_async(): wanneer moet welke worden gekozen?

U heeft nu een duidelijk begrip van de verschillen en voordelen van delay() en apply_async(). In welke situaties moeten we welke methode gebruiken?

  • delay():

    • Als u de taak eenvoudig asynchroon wilt uitvoeren
    • Als er geen speciale controle over het moment van uitvoering van taken nodig is
    • Als u waarde hecht aan de beknoptheid van de code
  • apply_async():

    • Als u de uitvoering van taken moet plannen voor een moment na een bepaalde tijd of op een bepaald tijdstip (countdown, eta)
    • Als u een vervaldatum voor de taak wilt instellen om onnodige uitvoering te voorkomen (expires)
    • Als u taken naar specifieke wachtrijen wilt routeren om werkgroepen te beheren (queue)
    • Als u prioriteiten voor taken moet beheren (priority)
    • Als u expliciet de serializer of compressiemethode van de taak moet specificeren (serializer, compression)
    • Als u callback-instellingen wilt automatiseren om andere taken uit te voeren bij succesvolle of mislukte taakuitvoering (link, link_error)

Over het algemeen is het handig om delay() te gebruiken voor eenvoudige asynchrone verwerkingen, terwijl apply_async() beter is voor complexere en gedetailleerde controlebehoeften. delay() is een vereenvoudigde versie van apply_async(), waardoor de functionaliteit van apply_async() altijd kan worden benut, ongeacht de situatie.


Concluderend

Vandaag hebben we diepgaand gekeken naar de krachtige apply_async() methode van Celery, de relatie met delay() en verschillende gebruiksmethoden verkend. apply_async() stelt ons in staat asynchrone taken verfijnder te beheren en Celery effectief aan te passen aan de vereisten van een dienst.

In het laatste deel zullen we de kerndecorators voor het definiëren van Celery-taken, namelijk @app.task en @shared_task, vergelijken en analyseren, om u te helpen de meest passende manier van het definiëren van taken te kiezen op basis van uw projectstructuur en ontwikkelingsfilosofie. Bedankt dat u deze Celery-reis met ons heeft gemaakt!

Voor degenen die eerdere artikelen hebben gemist, bekijk de onderstaande link!

# De magie van Celery: de binnen- en buitenkant van delay() ontrafelen