В первой части мы рассмотрели основные требования к Celery и принцип работы метода delay(), а также примеры его использования, чтобы повысить ваше понимание асинхронных задач. delay() - это мощный инструмент, который позволяет просто добавлять задачи в очередь, но иногда требуется более тонкий контроль. Именно в такие моменты всплывает на поверхность метод apply_async(), который мы сегодня будем подробно исследовать.

Сегодня мы глубоко погрузимся в то, что такое apply_async(), чем он отличается от delay(), и как управляющие опции могут помочь в более эффективном управлении асинхронными задачами.

Celery cartoon image working hard


1. apply_async(), инструмент, открывающий больше возможностей

apply_async() - это еще один мощный метод, который позволяет планировать выполнение задач Celery асинхронно. На первый взгляд он похож на delay(), но предоставляет гораздо более разнообразные опции по времени выполнения задачи, способу запуска и настройкам очереди, что дает разработчикам больше гибкости.

Помните, что delay() является простой оберткой, которая вызывается в форме apply_async(args=args), поэтому можно считать apply_async() расширенной версией delay(), которая включает все функции delay(), но предлагает больше контроля.


2. Принцип работы apply_async(): Тонкая настройка

Основной принцип работы apply_async() похож на работу delay(). Он отправляет сообщение с именем задачи и аргументами брокеру сообщений, и работник получает это сообщение для выполнения. Однако apply_async() позволяет тонко настраивать способ выполнения задачи с помощью различных параметров на этапе создания и отправки сообщения.

Основные параметры:

  • args: Список или кортеж позиционных аргументов, передаваемых функции задачи.
  • kwargs: Словарь ключевых аргументов, передаваемых функции задачи.
  • countdown: Задерживает выполнение задачи на указанное количество секунд перед ее запуском. Например, при установке countdown=60 задача будет добавлена в очередь через 60 секунд после вызова.
  • eta (Estimated Time of Arrival): Запланирует выполнение задачи на определенный будущий момент времени. Указывается в виде объекта datetime. countdown и eta взаимно исключают друг друга.
  • expires: Устанавливает время истечения задачи. Если указанный срок истекает, работник игнорирует задачу. Может быть указан в виде int (в секундах), float (в секундах) или объекта datetime. Это полезно для временных данных или задач с ограниченным сроком.
  • queue: Направляет задачу в определенную очередь по названию. Celery может настраивать несколько очередей, чтобы обрабатывать задачи различного типа или важности разными группами работников.
  • routing_key: Направляет задачу в определенную очередь согласно правилам маршрутизации брокера сообщений. Используется при наличии сложных настроек маршрутизации в брокерах, таких как RabbitMQ.
  • priority: Устанавливает приоритет задачи. В зависимости от настроек брокера и работника задачи с высоким приоритетом могут обрабатываться первыми. Обычно принимаются целочисленные значения от 0 (самый высокий приоритет) до 9 (самый низкий приоритет).
  • serializer: Указывает метод сериализации сообщения задачи ('json', 'pickle', 'yaml' и т. д.).
  • compression: Указывает метод сжатия сообщения задачи ('gzip', 'bzip2' и т. д.).
  • headers: Позволяет добавить дополнительные заголовочные данные в сообщение задачи.
  • link: Указывает колбек-задачу, которая будет выполнена после успешного завершения текущей задачи. Используется для реализации цепочек.
  • link_error: Указывает колбек-задачу, которая будет выполнена в случае ошибки текущей задачи.

Таким образом, благодаря различным параметрам apply_async() позволяет более тщательно управлять временем выполнения задач, направлять их в определенные группы работников и управлять приоритетами задач, удовлетворяя сложные требования к асинхронной обработке.


3. Примеры использования apply_async(): Как использовать в разных ситуациях

Теперь давайте рассмотрим примеры использования apply_async() с различными опциями, чтобы на практике убедиться в его мощи.

Пример 1: Выполнение через определенное время (countdown)

Когда нужно отправить приветственное email сообщение через 3 дня после регистрации пользователя:

send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)

Используя опцию countdown, задача не добавляется мгновенно в очередь, а добавляется через указанное время (3 дня) для обработки работником.

Пример 2: Выполнение в конкретное время (eta)

Когда необходимо создать ежедневный отчет в 3 часа ночи:

from datetime import datetime, timedelta

target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # Следующее утро в 3 часа

generate_daily_report.apply_async(eta=target_time)

Используя опцию eta, задачу можно запланировать на точное время выполнения.

Пример 3: Установка времени истечения (expires)

Когда результат вызова внешнего API действителен только в течение 10 минут, и не нужно выполнять задачу позже:

from datetime import datetime, timedelta

expires_at = datetime.now() + timedelta(minutes=10)

fetch_external_data.apply_async(args=[api_url], expires=expires_at)

Опция expires позволит избежать ненужных запусков задачи и сэкономить ресурсы.

Пример 4: Маршрутизация в конкретную очередь (queue)

Когда нужно обработать ресурсоемкие задачи по обработке изображений отдельной группой работников:

process_image.apply_async(args=[image_id], queue='image_processing')

Настроив очередь image_processing и назначив для нее отдельных работников в настройках Celery, задачи обработки изображений будут обрабатываться только этой группой работников.

Пример 5: Соединение колбек задач (link)

Когда необходимо автоматически отправлять уведомления по email после успешной обработки платежа:

Без использования apply_async, вероятно, вам пришлось бы реализовать логику обработки платежа как функцию и вызывать .delay() внутри функции для отправки email.

def process_payment(order_id):
    # Логика обработки платежа
    send_notification_email.delay(order_id)

Но используя apply_async(), можно поступить так:

@shared_task
def process_payment(order_id):
    # Логика обработки платежа
    return order_id

@shared_task
def send_notification_email(order_id):
    # Логика отправки уведомления по email

payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())

Здесь .s() генерирует подпись задачи send_notification_email. После успешного завершения задачи process_payment ее возвращаемое значение (в данном случае order_id) передается как аргумент для выполнения задачи send_notification_email.


4. delay() и apply_async(): Когда что выбирать?

Теперь вы наверняка четко понимаете разницу между delay() и apply_async() и преимущества каждого из них. Так в каких ситуациях следует использовать тот или иной метод?

  • delay():

    • Когда нужно просто запустить задачу асинхронно
    • Когда нет особых требований к времени выполнения задачи
    • Когда важна простота кода
  • apply_async():

    • Когда нужно запланировать выполнение задачи на определенное время или в определенный момент (countdown, eta)
    • Когда необходимо установить время истечения для предотвращения ненужных запусков (expires)
    • Когда задачи нужно маршрутизировать в определенные очереди для управления группами работников (queue)
    • Когда нужно управлять приоритетами задач (priority)
    • Когда необходимо явно указать методы сериализации или сжатия задач (serializer, compression)
    • Когда нужно настроить автоматическое выполнение других задач при успешном или неудачном завершении текущей задачи (link, link_error)

В общем, для простых асинхронных процессов удобно использовать delay(), в то время как для более сложных и точных процессов лучше использовать apply_async(). delay() является упрощенной версией apply_async(), поэтому в любом случае можно реализовать одинаковые (или более сложные) функции с помощью apply_async().


В заключение

Сегодня мы глубоко исследовали мощный инструмент apply_async() в Celery, его связь с delay() и различные способы его использования. apply_async() помогает более точно управлять асинхронными задачами и эффективно использовать Celery в соответствии с требованиями сервиса.

В следующей, финальной части мы сравним ключевые декораторы для определения задач в Celery: @app.task и @shared_task, чтобы помочь вам выбрать способ определения задач в соответствии со структурой проекта и философией разработки. Спасибо, что сопровождали меня в этом путешествии по Celery!

Тем, кто не видел предыдущие записи, рекомендую ознакомиться с ними по следующей ссылке!

# Магия Celery: За кулисами delay()