В первой части мы рассмотрели основные требования к Celery и принцип работы метода delay()
, а также примеры его использования, чтобы повысить ваше понимание асинхронных задач. delay()
- это мощный инструмент, который позволяет просто добавлять задачи в очередь, но иногда требуется более тонкий контроль. Именно в такие моменты всплывает на поверхность метод apply_async()
, который мы сегодня будем подробно исследовать.
Сегодня мы глубоко погрузимся в то, что такое apply_async()
, чем он отличается от delay()
, и как управляющие опции могут помочь в более эффективном управлении асинхронными задачами.
1. apply_async()
, инструмент, открывающий больше возможностей
apply_async()
- это еще один мощный метод, который позволяет планировать выполнение задач Celery асинхронно. На первый взгляд он похож на delay()
, но предоставляет гораздо более разнообразные опции по времени выполнения задачи, способу запуска и настройкам очереди, что дает разработчикам больше гибкости.
Помните, что delay()
является простой оберткой, которая вызывается в форме apply_async(args=args)
, поэтому можно считать apply_async()
расширенной версией delay()
, которая включает все функции delay()
, но предлагает больше контроля.
2. Принцип работы apply_async()
: Тонкая настройка
Основной принцип работы apply_async()
похож на работу delay()
. Он отправляет сообщение с именем задачи и аргументами брокеру сообщений, и работник получает это сообщение для выполнения. Однако apply_async()
позволяет тонко настраивать способ выполнения задачи с помощью различных параметров на этапе создания и отправки сообщения.
Основные параметры:
args
: Список или кортеж позиционных аргументов, передаваемых функции задачи.kwargs
: Словарь ключевых аргументов, передаваемых функции задачи.countdown
: Задерживает выполнение задачи на указанное количество секунд перед ее запуском. Например, при установкеcountdown=60
задача будет добавлена в очередь через 60 секунд после вызова.eta
(Estimated Time of Arrival): Запланирует выполнение задачи на определенный будущий момент времени. Указывается в виде объектаdatetime
.countdown
иeta
взаимно исключают друг друга.expires
: Устанавливает время истечения задачи. Если указанный срок истекает, работник игнорирует задачу. Может быть указан в видеint
(в секундах),float
(в секундах) или объектаdatetime
. Это полезно для временных данных или задач с ограниченным сроком.queue
: Направляет задачу в определенную очередь по названию. Celery может настраивать несколько очередей, чтобы обрабатывать задачи различного типа или важности разными группами работников.routing_key
: Направляет задачу в определенную очередь согласно правилам маршрутизации брокера сообщений. Используется при наличии сложных настроек маршрутизации в брокерах, таких как RabbitMQ.priority
: Устанавливает приоритет задачи. В зависимости от настроек брокера и работника задачи с высоким приоритетом могут обрабатываться первыми. Обычно принимаются целочисленные значения от 0 (самый высокий приоритет) до 9 (самый низкий приоритет).serializer
: Указывает метод сериализации сообщения задачи ('json', 'pickle', 'yaml' и т. д.).compression
: Указывает метод сжатия сообщения задачи ('gzip', 'bzip2' и т. д.).headers
: Позволяет добавить дополнительные заголовочные данные в сообщение задачи.link
: Указывает колбек-задачу, которая будет выполнена после успешного завершения текущей задачи. Используется для реализации цепочек.link_error
: Указывает колбек-задачу, которая будет выполнена в случае ошибки текущей задачи.
Таким образом, благодаря различным параметрам apply_async()
позволяет более тщательно управлять временем выполнения задач, направлять их в определенные группы работников и управлять приоритетами задач, удовлетворяя сложные требования к асинхронной обработке.
3. Примеры использования apply_async()
: Как использовать в разных ситуациях
Теперь давайте рассмотрим примеры использования apply_async()
с различными опциями, чтобы на практике убедиться в его мощи.
Пример 1: Выполнение через определенное время (countdown
)
Когда нужно отправить приветственное email сообщение через 3 дня после регистрации пользователя:
send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)
Используя опцию countdown
, задача не добавляется мгновенно в очередь, а добавляется через указанное время (3 дня) для обработки работником.
Пример 2: Выполнение в конкретное время (eta
)
Когда необходимо создать ежедневный отчет в 3 часа ночи:
from datetime import datetime, timedelta
target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # Следующее утро в 3 часа
generate_daily_report.apply_async(eta=target_time)
Используя опцию eta
, задачу можно запланировать на точное время выполнения.
Пример 3: Установка времени истечения (expires
)
Когда результат вызова внешнего API действителен только в течение 10 минут, и не нужно выполнять задачу позже:
from datetime import datetime, timedelta
expires_at = datetime.now() + timedelta(minutes=10)
fetch_external_data.apply_async(args=[api_url], expires=expires_at)
Опция expires
позволит избежать ненужных запусков задачи и сэкономить ресурсы.
Пример 4: Маршрутизация в конкретную очередь (queue
)
Когда нужно обработать ресурсоемкие задачи по обработке изображений отдельной группой работников:
process_image.apply_async(args=[image_id], queue='image_processing')
Настроив очередь image_processing
и назначив для нее отдельных работников в настройках Celery, задачи обработки изображений будут обрабатываться только этой группой работников.
Пример 5: Соединение колбек задач (link
)
Когда необходимо автоматически отправлять уведомления по email после успешной обработки платежа:
Без использования apply_async
, вероятно, вам пришлось бы реализовать логику обработки платежа как функцию и вызывать .delay()
внутри функции для отправки email.
def process_payment(order_id):
# Логика обработки платежа
send_notification_email.delay(order_id)
Но используя apply_async()
, можно поступить так:
@shared_task
def process_payment(order_id):
# Логика обработки платежа
return order_id
@shared_task
def send_notification_email(order_id):
# Логика отправки уведомления по email
payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())
Здесь .s()
генерирует подпись задачи send_notification_email
. После успешного завершения задачи process_payment
ее возвращаемое значение (в данном случае order_id
) передается как аргумент для выполнения задачи send_notification_email
.
4. delay()
и apply_async()
: Когда что выбирать?
Теперь вы наверняка четко понимаете разницу между delay()
и apply_async()
и преимущества каждого из них. Так в каких ситуациях следует использовать тот или иной метод?
-
delay()
:- Когда нужно просто запустить задачу асинхронно
- Когда нет особых требований к времени выполнения задачи
- Когда важна простота кода
-
apply_async()
:- Когда нужно запланировать выполнение задачи на определенное время или в определенный момент (
countdown
,eta
) - Когда необходимо установить время истечения для предотвращения ненужных запусков (
expires
) - Когда задачи нужно маршрутизировать в определенные очереди для управления группами работников (
queue
) - Когда нужно управлять приоритетами задач (
priority
) - Когда необходимо явно указать методы сериализации или сжатия задач (
serializer
,compression
) - Когда нужно настроить автоматическое выполнение других задач при успешном или неудачном завершении текущей задачи (
link
,link_error
)
- Когда нужно запланировать выполнение задачи на определенное время или в определенный момент (
В общем, для простых асинхронных процессов удобно использовать delay()
, в то время как для более сложных и точных процессов лучше использовать apply_async()
. delay()
является упрощенной версией apply_async()
, поэтому в любом случае можно реализовать одинаковые (или более сложные) функции с помощью apply_async()
.
В заключение
Сегодня мы глубоко исследовали мощный инструмент apply_async()
в Celery, его связь с delay()
и различные способы его использования. apply_async()
помогает более точно управлять асинхронными задачами и эффективно использовать Celery в соответствии с требованиями сервиса.
В следующей, финальной части мы сравним ключевые декораторы для определения задач в Celery: @app.task
и @shared_task
, чтобы помочь вам выбрать способ определения задач в соответствии со структурой проекта и философией разработки. Спасибо, что сопровождали меня в этом путешествии по Celery!
Тем, кто не видел предыдущие записи, рекомендую ознакомиться с ними по следующей ссылке!
Комментариев нет.