En la primera parte, exploramos la necesidad básica de Celery y el funcionamiento del método delay()
, además de ejemplos de uso práctico para mejorar tu comprensión sobre tareas asíncronas. delay()
es una herramienta poderosa para agregar tareas a la cola de forma sencilla, pero a veces se requiere un control más detallado. Es en esos momentos cuando brilla el método apply_async()
, que hoy vamos a explorar a fondo.
Hoy profundizaremos en qué es apply_async()
, cómo se diferencia de delay()
, y cómo gestionar tareas asíncronas de manera más efectiva utilizando diversas opciones.
1. apply_async()
, una herramienta que abre más posibilidades
apply_async()
es otro método poderoso que permite programar la ejecución asíncrona de tareas de Celery. A primera vista, se asemeja a delay()
, pero ofrece opciones mucho más variadas sobre el momento de ejecución de la tarea, el método y la configuración de la cola, brindando mayor flexibilidad al desarrollador.
Recuerda que delay()
es simplemente una función envolvente que se invoca como apply_async(args=args)
, por lo que puedes pensar que apply_async()
es una versión ampliada que incluye todas las funciones de delay()
mientras ofrece más control.
2. El funcionamiento de apply_async()
: ajustes finos
El funcionamiento básico de apply_async()
es similar al de delay()
. La clave está en enviar un mensaje que contenga el nombre de la tarea y los argumentos a un intermediario de mensajes, donde un trabajador lo recibe y lo ejecuta. Sin embargo, apply_async()
permite ajustar finamente el método de ejecución de la tarea a través de diversos parámetros durante el proceso de creación y envío del mensaje.
Los principales parámetros son los siguientes:
args
: Lista o tupla de argumentos posicionales que se pasarán a la función de la tarea.kwargs
: Diccionario de argumentos de palabra clave que se pasarán a la función de la tarea.countdown
: Retrasa la ejecución de la tarea por un número específico de segundos antes de ejecutarla. Por ejemplo, si se establececountdown=60
, la tarea se añadirá a la cola 60 segundos después del momento de llamada.eta
(Hora Estimada de Llegada): Programa la tarea para que se ejecute en un punto futuro específico. Se especifica en forma de objetodatetime
.countdown
yeta
son mutuamente exclusivos.expires
: Establece el tiempo de caducidad de la tarea. Una vez pasado el tiempo especificado, el trabajador ignorará y no ejecutará dicha tarea. Se puede especificar comoint
(segundos),float
(segundos) o como objetodatetime
. Esto es útil para datos temporales o tareas con plazos de caducidad.queue
: Rutea la tarea a una cola de nombre específico. Celery permite configurar varias colas para procesar las tareas en diferentes grupos de trabajadores según el tipo o la importancia de la tarea.routing_key
: Envía la tarea a una cola específica según las reglas de enrutamiento del intermediario de mensajes. Se utiliza cuando se emplean configuraciones de enrutamiento avanzadas en intermediarios de mensajes como RabbitMQ.priority
: Establece la prioridad de la tarea. Dependiendo de la configuración del intermediario y los trabajadores, las tareas de mayor prioridad pueden ser procesadas primero. Generalmente, tiene un valor entero que va de 0 (la prioridad más alta) a 9 (la prioridad más baja).serializer
: Especifica el método a usar para serializar el mensaje de la tarea ('json', 'pickle', 'yaml', etc.).compression
: Especifica el método para comprimir el mensaje de la tarea ('gzip', 'bzip2', etc.).headers
: Puede incluir información adicional en el encabezado del mensaje de la tarea.link
: Especifica un tarea de callback que se ejecutará si la tarea actual se completa con éxito. Esto es utilizado para implementar encadenamiento.link_error
: Especifica una tarea de callback de error que se ejecutará si la tarea actual falla.
Así, gracias a la variada gama de parámetros, apply_async()
puede controlar de manera mucho más detallada el momento de ejecución de las tareas que delay()
, rutar las tareas a grupos de trabajadores específicos y gestionar prioridades, satisfaciendo así requisitos complejos de procesamiento asíncrono.
3. Ejemplos de uso de apply_async()
: Cómo utilizarlo en diferentes situaciones
Ahora veamos ejemplos de uso de apply_async()
utilizando diversas opciones para comprobar su potencia.
Ejemplo 1: Ejecutar después de un tiempo específico (countdown
)
Si un usuario necesita enviar un correo de bienvenida tres días después de registrarse:
send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)
Al usar la opción countdown
, la tarea no se añadirá de inmediato a la cola, sino que se añadirá después de un tiempo específico (3 días) para ser procesada por el trabajador.
Ejemplo 2: Ejecutar a una hora específica (eta
)
Si necesitas ejecutar una tarea para generar un informe diario a las 3 de la mañana:
from datetime import datetime, timedelta
target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # 3 de la mañana del día siguiente
generate_daily_report.apply_async(eta=target_time)
La opción eta
te permite programar la tarea para que se ejecute exactamente a la hora designada.
Ejemplo 3: Establecer un tiempo de caducidad (expires
)
Si el resultado de una llamada a una API externa es válido solo por 10 minutos, y no tiene sentido ejecutar la tarea después de eso:
from datetime import datetime, timedelta
expires_at = datetime.now() + timedelta(minutes=10)
fetch_external_data.apply_async(args=[api_url], expires=expires_at)
La opción expires
ayuda a evitar que la tarea se ejecute innecesariamente, ahorrando recursos.
Ejemplo 4: Rutar a una cola específica (queue
)
Si deseas que una tarea de procesamiento de imágenes que consume mucho CPU sea gestionada por un grupo de trabajadores diferente:
process_image.apply_async(args=[image_id], queue='image_processing')
Configurando Celery, puedes tener trabajadores dedicados que vigilan la cola image_processing
, asegurando que las tareas de procesamiento de imágenes solo sean manejadas por ese grupo de trabajadores.
Ejemplo 5: Conectar tareas de callback (link
)
Si deseas que una tarea que procesa un pago ejecute automáticamente otra tarea que envía un correo de notificación cuando se complete con éxito:
Sin apply_async
, lo más seguro es que implementes la lógica de procesamiento de pagos como una función que llamaría a .delay()
para enviar el correo dentro de dicha función.
def process_payment(order_id):
# Lógica de procesamiento del pago
send_notification_email.delay(order_id)
Pero usando apply_async()
, puedes escribirlo así:
@shared_task
def process_payment(order_id):
# Lógica de procesamiento del pago
return order_id
@shared_task
def send_notification_email(order_id):
# Lógica para enviar el correo de notificación
payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())
Aquí, .s()
crea la firma de la tarea send_notification_email
. Cuando la tarea process_payment
se completa correctamente, su valor de retorno (en este caso, order_id
) se pasa como argumento a la tarea send_notification_email
para que se ejecute.
4. delay()
vs apply_async()
: ¿Cuándo elegir cada uno?
Ahora que has entendido claramente las diferencias y las ventajas de delay()
y apply_async()
, ¿en qué situaciones deberías elegir cada método?
-
delay()
:- Cuando solo quieras ejecutar una tarea de la forma más sencilla posible de manera asíncrona
- Cuando no requieras control especial sobre el momento de ejecución de la tarea
- Cuando valores la claridad y brevedad del código
-
apply_async()
:- Cuando necesites programar la ejecución de la tarea para un tiempo específico o un momento determinado (
countdown
,eta
) - Cuando quieras establecer un tiempo de caducidad para prevenir ejecuciones innecesarias (
expires
) - Cuando quieras rutar tareas a una cola específica para controlar grupos de trabajadores (
queue
) - Cuando necesites gestionar la prioridad de las tareas (
priority
) - Cuando necesites especificar explícitamente el método de serialización o compresión de las tareas (
serializer
,compression
) - Cuando quieras establecer callbacks automáticos para ejecutar otras tareas en caso de éxito o error de una tarea (
link
,link_error
)
- Cuando necesites programar la ejecución de la tarea para un tiempo específico o un momento determinado (
En general, para un procesamiento asíncrono simple, usar delay()
es conveniente, mientras que para situaciones que requieren un control más complejo y detallado, es mejor utilizar apply_async()
. Dado que delay()
es una forma simplificada de apply_async()
, en cualquier caso siempre podrás utilizar apply_async()
para implementar la misma (o más) funcionalidad.
Conclusión
Hoy hemos explorado en profundidad el poderoso método apply_async()
de Celery, viendo su relación con delay()
y diversas formas de utilizarlo. apply_async()
permite gestionar tareas asíncronas de manera más meticulosa y aprovechar Celery de manera efectiva según las necesidades del servicio.
En la próxima y última parte, compararemos los decoradores clave para definir tareas de Celery, @app.task
y @shared_task
, para ayudar a elegir el enfoque correcto de definición de tareas según la estructura del proyecto y la filosofía de desarrollo. ¡Gracias por acompañarnos en este viaje con Celery!
Para aquellos que no han visto las publicaciones anteriores, revisen el siguiente enlace:
No hay comentarios.