Dans la première partie, nous avons examiné les besoins fondamentaux de Celery, le fonctionnement de la méthode delay(), ainsi que des exemples pratiques pour améliorer votre compréhension des tâches asynchrones. delay() est un outil puissant qui vous permet d'ajouter facilement des tâches dans une file d'attente, mais parfois, un contrôle plus précis est nécessaire. C'est précisément dans ces cas que la méthode apply_async(), que nous explorerons aujourd'hui, se révèle très utile.

Aujourd'hui, nous allons approfondir ce qu'est apply_async(), en quoi elle diffère de delay(), et comment utiliser différentes options pour gérer plus efficacement les tâches asynchrones.

Image de dessin animé de Celery travaillant dur


1. apply_async(), un outil ouvrant davantage de possibilités

apply_async() est une autre méthode puissante permettant de planifier l'exécution asynchrone des tâches Celery. À première vue, elle semble similaire à delay(), mais elle offre de nombreuses options supplémentaires concernant le moment et le mode d'exécution des tâches, ainsi que les paramètres de la file d'attente, ce qui confère aux développeurs une plus grande flexibilité.

Il est important de se rappeler que delay() est simplement une fonction d'enveloppement appelée sous la forme apply_async(args=args), ce qui signifie que apply_async() inclut toutes les fonctionnalités de delay() tout en offrant un contrôle accru, comme une version étendue.


2. Le fonctionnement de apply_async(): un ajustement délicat

Le fonctionnement de base de apply_async() est similaire à celui de delay(). L'essentiel est d'envoyer un message contenant le nom de la tâche et ses arguments au courtier de messages, puis le travailleur le récupère et l'exécute. Cependant, apply_async() permet un ajustement plus précis du mode d'exécution de la tâche grâce à divers paramètres lors de la génération et de l'envoi du message.

Les principaux paramètres incluent:

  • args: liste ou tuple des arguments positionnels à transmettre à la fonction de tâche.
  • kwargs: dictionnaire des arguments nommés à transmettre à la fonction de tâche.
  • countdown: exécute la tâche après un délai spécifié en secondes. Par exemple, si countdown=60 est réglé, la tâche sera ajoutée à la file d'attente 60 secondes après son appel.
  • eta (Estimated Time of Arrival): planifie l'exécution de la tâche à un moment futur déterminé, spécifié sous forme d'objet datetime. Les paramètres countdown et eta sont mutuellement exclusifs.
  • expires: définit un temps d'expiration pour la tâche. Après le délai spécifié, le travailleur ignorera la tâche sans l'exécuter. Cela peut être spécifié sous forme d'int (secondes), float (secondes), ou objet datetime. Cela est utile pour des données temporaires ou des tâches avec une date d'expiration.
  • queue: achemine la tâche vers une file de noms spécifiques. Celery peut configurer plusieurs files pour gérer des groupes de travailleurs en fonction du type ou de l'importance des tâches.
  • routing_key: envoie la tâche à une file spécifique selon les règles de routage du courtier de messages. Cela est utilisé lors de la mise en œuvre de configurations de routage avancées dans des courtiers de messages comme RabbitMQ.
  • priority: définit la priorité de la tâche. En fonction des réglages du courtier et du travailleur, les tâches de plus haute priorité peuvent être traitées en premier. Cela a généralement des valeurs entières de 0 (la plus haute priorité) à 9 (la plus basse priorité).
  • serializer: spécifie le mode de sérialisation des messages de tâche ('json', 'pickle', 'yaml', etc.).
  • compression: spécifie le mode de compression des messages de tâche ('gzip', 'bzip2', etc.).
  • headers: permet d'incorporer des informations d'en-tête supplémentaires dans le message de tâche.
  • link: spécifie une tâche de callback qui s'exécutera après l'achèvement réussi de la tâche actuelle. Cela est utilisé pour mettre en œuvre des chaînes.
  • link_error: spécifie une tâche de callback d'erreur qui s'exécutera si la tâche actuelle échoue.

Grâce à cette variété de paramètres, apply_async() permet de contrôler de manière plus précise le moment d'exécution de la tâche par rapport à delay(), de router les tâches vers des groupes de travailleurs spécifiques et de gérer la priorité des tâches, répondant ainsi aux exigences complexes de traitement asynchrone.


3. Exemples d'utilisation de apply_async(): applications selon les situations

Examinons maintenant la puissance de apply_async() à travers divers exemples d'utilisation des options.

Exemple 1: Exécution après un certain temps (countdown)

Si vous devez envoyer un email de bienvenue 3 jours après l'inscription d'un utilisateur :

send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)

En utilisant l'option countdown, la tâche n'est pas immédiatement ajoutée à la file, mais est ajoutée après un temps spécifié (3 jours) pour être traitée par le travailleur.

Exemple 2: Exécution à un moment précis (eta)

Pour exécuter une tâche créant le rapport quotidien tous les jours à 3 heures du matin :

from datetime import datetime, timedelta

target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # 3 heures du matin le lendemain

generate_daily_report.apply_async(eta=target_time)

L'option eta permet de planifier l'exécution de la tâche à un moment précis.

Exemple 3: Définir un temps d'expiration (expires)

Si le résultat d'un appel API externe n'est valide que pendant 10 minutes, il n'est pas nécessaire d'exécuter la tâche après cela :

from datetime import datetime, timedelta

expires_at = datetime.now() + timedelta(minutes=10)

fetch_external_data.apply_async(args=[api_url], expires=expires_at)

Avec l'option expires, vous pouvez éviter l'exécution inutile de la tâche, économisant ainsi des ressources.

Exemple 4: Acheminer vers une file spécifique (queue)

Si vous souhaitez que des tâches de traitement d'image, qui consomment beaucoup de CPU, soient traitées par un groupe de travailleurs distinct :

process_image.apply_async(args=[image_id], queue='image_processing')

En configurant les travailleurs pour surveiller la file image_processing, les tâches de traitement d'image ne seront traitées que par ce groupe de travailleurs.

Exemple 5: Chaîner des tâches callback (link)

Pour exécuter automatiquement une tâche d'envoi d'email de notification après l'achèvement réussi d'une tâche de traitement de paiement : si vous n'utilisez pas apply_async, vous vous retrouveriez probablement à implémenter directement la logique de traitement du paiement dans une fonction et à appeler .delay() pour envoyer l'email.

def process_payment(order_id):
    # Logique de traitement du paiement
    send_notification_email.delay(order_id)

Cependant, en utilisant apply_async(), vous pourriez écrire ceci :

@shared_task
def process_payment(order_id):
    # Logique de traitement du paiement
    return order_id

@shared_task
def send_notification_email(order_id):
    # Logique d'envoi de l'email de notification

payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())

Ici, .s() génère la signature de la tâche send_notification_email. Une fois la tâche process_payment terminée avec succès, sa valeur de retour (ici order_id) sera transmise en tant qu'argument à la tâche send_notification_email.


4. Quand choisir delay() ou apply_async()?

Vous devez maintenant bien comprendre les différences entre delay() et apply_async(), ainsi que les avantages de chacune. Alors, dans quelles situations devez-vous choisir l'une ou l'autre méthode ?

  • delay():

    • Lorsque vous souhaitez exécuter une tâche asynchrone de la manière la plus simple possible.
    • Lorsque vous n'avez pas besoin de contrôle particulier sur le moment d'exécution de la tâche.
    • Lorsque la concision du code est importante.
  • apply_async():

    • Lorsque vous devez programmer l'exécution d'une tâche à un moment ultérieur ou à un moment précis (countdown, eta).
    • Lorsque vous souhaitez définir le temps d'expiration d'une tâche pour éviter son exécution inutile (expires).
    • Lorsque vous souhaitez router des tâches vers une file spécifique pour contrôler le groupe de travailleurs (queue).
    • Lorsque vous devez gérer la priorité des tâches (priority).
    • Lorsque vous devez spécifier explicitement le mode de sérialisation ou de compression des tâches (serializer, compression).
    • Lorsque vous souhaitez configurer des callbacks pour exécuter automatiquement d'autres tâches lors de la réussite ou de l'échec d'une tâche (link, link_error).

En général, il est pratique d'utiliser delay() pour un traitement asynchrone simple, et il est préférable d'utiliser apply_async() lorsque vous avez besoin d'un contrôle plus complexe et précis. delay() est une version simplifiée de apply_async(), donc dans tous les cas, vous pouvez utiliser apply_async() pour implémenter des fonctionnalités identiques (ou plus nombreuses).


En conclusion

Aujourd'hui, nous avons exploré en profondeur la puissante méthode apply_async() de Celery, en examinant sa relation avec delay() et ses différentes applications. apply_async() permet de gérer les tâches asynchrones de manière plus raffinée et d'exploiter Celery efficacement selon les besoins du service.

Dans la prochaine et dernière partie, nous comparerons les décorateurs essentiels pour définir les tâches Celery, @app.task et @shared_task, pour vous aider à choisir la méthode de définition des tâches qui convient à votre structure de projet et à votre philosophie de développement. Merci de continuer cette aventure avec Celery !

Pour ceux qui n'ont pas vu les articles précédents, veuillez consulter le lien ci-dessous !

# La magie de Celery: déchiffrer le fonctionnement de delay()