En la primera parte, exploramos la necesidad básica de Celery y el funcionamiento del método delay(), además de ejemplos de uso práctico para mejorar tu comprensión sobre tareas asíncronas. delay() es una herramienta poderosa para agregar tareas a la cola de forma sencilla, pero a veces se requiere un control más detallado. Es en esos momentos cuando brilla el método apply_async(), que hoy vamos a explorar a fondo.

Hoy profundizaremos en qué es apply_async(), cómo se diferencia de delay(), y cómo gestionar tareas asíncronas de manera más efectiva utilizando diversas opciones.

Imagen de dibujo animado de Celery trabajando duro


1. apply_async(), una herramienta que abre más posibilidades

apply_async() es otro método poderoso que permite programar la ejecución asíncrona de tareas de Celery. A primera vista, se asemeja a delay(), pero ofrece opciones mucho más variadas sobre el momento de ejecución de la tarea, el método y la configuración de la cola, brindando mayor flexibilidad al desarrollador.

Recuerda que delay() es simplemente una función envolvente que se invoca como apply_async(args=args), por lo que puedes pensar que apply_async() es una versión ampliada que incluye todas las funciones de delay() mientras ofrece más control.


2. El funcionamiento de apply_async(): ajustes finos

El funcionamiento básico de apply_async() es similar al de delay(). La clave está en enviar un mensaje que contenga el nombre de la tarea y los argumentos a un intermediario de mensajes, donde un trabajador lo recibe y lo ejecuta. Sin embargo, apply_async() permite ajustar finamente el método de ejecución de la tarea a través de diversos parámetros durante el proceso de creación y envío del mensaje.

Los principales parámetros son los siguientes:

  • args: Lista o tupla de argumentos posicionales que se pasarán a la función de la tarea.
  • kwargs: Diccionario de argumentos de palabra clave que se pasarán a la función de la tarea.
  • countdown: Retrasa la ejecución de la tarea por un número específico de segundos antes de ejecutarla. Por ejemplo, si se establece countdown=60, la tarea se añadirá a la cola 60 segundos después del momento de llamada.
  • eta (Hora Estimada de Llegada): Programa la tarea para que se ejecute en un punto futuro específico. Se especifica en forma de objeto datetime. countdown y eta son mutuamente exclusivos.
  • expires: Establece el tiempo de caducidad de la tarea. Una vez pasado el tiempo especificado, el trabajador ignorará y no ejecutará dicha tarea. Se puede especificar como int (segundos), float (segundos) o como objeto datetime. Esto es útil para datos temporales o tareas con plazos de caducidad.
  • queue: Rutea la tarea a una cola de nombre específico. Celery permite configurar varias colas para procesar las tareas en diferentes grupos de trabajadores según el tipo o la importancia de la tarea.
  • routing_key: Envía la tarea a una cola específica según las reglas de enrutamiento del intermediario de mensajes. Se utiliza cuando se emplean configuraciones de enrutamiento avanzadas en intermediarios de mensajes como RabbitMQ.
  • priority: Establece la prioridad de la tarea. Dependiendo de la configuración del intermediario y los trabajadores, las tareas de mayor prioridad pueden ser procesadas primero. Generalmente, tiene un valor entero que va de 0 (la prioridad más alta) a 9 (la prioridad más baja).
  • serializer: Especifica el método a usar para serializar el mensaje de la tarea ('json', 'pickle', 'yaml', etc.).
  • compression: Especifica el método para comprimir el mensaje de la tarea ('gzip', 'bzip2', etc.).
  • headers: Puede incluir información adicional en el encabezado del mensaje de la tarea.
  • link: Especifica un tarea de callback que se ejecutará si la tarea actual se completa con éxito. Esto es utilizado para implementar encadenamiento.
  • link_error: Especifica una tarea de callback de error que se ejecutará si la tarea actual falla.

Así, gracias a la variada gama de parámetros, apply_async() puede controlar de manera mucho más detallada el momento de ejecución de las tareas que delay(), rutar las tareas a grupos de trabajadores específicos y gestionar prioridades, satisfaciendo así requisitos complejos de procesamiento asíncrono.


3. Ejemplos de uso de apply_async(): Cómo utilizarlo en diferentes situaciones

Ahora veamos ejemplos de uso de apply_async() utilizando diversas opciones para comprobar su potencia.

Ejemplo 1: Ejecutar después de un tiempo específico (countdown)

Si un usuario necesita enviar un correo de bienvenida tres días después de registrarse:

send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)

Al usar la opción countdown, la tarea no se añadirá de inmediato a la cola, sino que se añadirá después de un tiempo específico (3 días) para ser procesada por el trabajador.

Ejemplo 2: Ejecutar a una hora específica (eta)

Si necesitas ejecutar una tarea para generar un informe diario a las 3 de la mañana:

from datetime import datetime, timedelta

target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # 3 de la mañana del día siguiente

generate_daily_report.apply_async(eta=target_time)

La opción eta te permite programar la tarea para que se ejecute exactamente a la hora designada.

Ejemplo 3: Establecer un tiempo de caducidad (expires)

Si el resultado de una llamada a una API externa es válido solo por 10 minutos, y no tiene sentido ejecutar la tarea después de eso:

from datetime import datetime, timedelta

expires_at = datetime.now() + timedelta(minutes=10)

fetch_external_data.apply_async(args=[api_url], expires=expires_at)

La opción expires ayuda a evitar que la tarea se ejecute innecesariamente, ahorrando recursos.

Ejemplo 4: Rutar a una cola específica (queue)

Si deseas que una tarea de procesamiento de imágenes que consume mucho CPU sea gestionada por un grupo de trabajadores diferente:

process_image.apply_async(args=[image_id], queue='image_processing')

Configurando Celery, puedes tener trabajadores dedicados que vigilan la cola image_processing, asegurando que las tareas de procesamiento de imágenes solo sean manejadas por ese grupo de trabajadores.

Ejemplo 5: Conectar tareas de callback (link)

Si deseas que una tarea que procesa un pago ejecute automáticamente otra tarea que envía un correo de notificación cuando se complete con éxito: Sin apply_async, lo más seguro es que implementes la lógica de procesamiento de pagos como una función que llamaría a .delay() para enviar el correo dentro de dicha función.

def process_payment(order_id):
    # Lógica de procesamiento del pago
    send_notification_email.delay(order_id)

Pero usando apply_async(), puedes escribirlo así:

@shared_task
def process_payment(order_id):
    # Lógica de procesamiento del pago
    return order_id

@shared_task
def send_notification_email(order_id):
    # Lógica para enviar el correo de notificación

payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())

Aquí, .s() crea la firma de la tarea send_notification_email. Cuando la tarea process_payment se completa correctamente, su valor de retorno (en este caso, order_id) se pasa como argumento a la tarea send_notification_email para que se ejecute.


4. delay() vs apply_async(): ¿Cuándo elegir cada uno?

Ahora que has entendido claramente las diferencias y las ventajas de delay() y apply_async(), ¿en qué situaciones deberías elegir cada método?

  • delay():

    • Cuando solo quieras ejecutar una tarea de la forma más sencilla posible de manera asíncrona
    • Cuando no requieras control especial sobre el momento de ejecución de la tarea
    • Cuando valores la claridad y brevedad del código
  • apply_async():

    • Cuando necesites programar la ejecución de la tarea para un tiempo específico o un momento determinado (countdown, eta)
    • Cuando quieras establecer un tiempo de caducidad para prevenir ejecuciones innecesarias (expires)
    • Cuando quieras rutar tareas a una cola específica para controlar grupos de trabajadores (queue)
    • Cuando necesites gestionar la prioridad de las tareas (priority)
    • Cuando necesites especificar explícitamente el método de serialización o compresión de las tareas (serializer, compression)
    • Cuando quieras establecer callbacks automáticos para ejecutar otras tareas en caso de éxito o error de una tarea (link, link_error)

En general, para un procesamiento asíncrono simple, usar delay() es conveniente, mientras que para situaciones que requieren un control más complejo y detallado, es mejor utilizar apply_async(). Dado que delay() es una forma simplificada de apply_async(), en cualquier caso siempre podrás utilizar apply_async() para implementar la misma (o más) funcionalidad.


Conclusión

Hoy hemos explorado en profundidad el poderoso método apply_async() de Celery, viendo su relación con delay() y diversas formas de utilizarlo. apply_async() permite gestionar tareas asíncronas de manera más meticulosa y aprovechar Celery de manera efectiva según las necesidades del servicio.

En la próxima y última parte, compararemos los decoradores clave para definir tareas de Celery, @app.task y @shared_task, para ayudar a elegir el enfoque correcto de definición de tareas según la estructura del proyecto y la filosofía de desarrollo. ¡Gracias por acompañarnos en este viaje con Celery!

Para aquellos que no han visto las publicaciones anteriores, revisen el siguiente enlace:

# La magia de Celery: exploración profunda de delay()