In our previous part, we enhanced your understanding of asynchronous tasks by discussing the basic necessity of Celery and the workings of the delay()
method through practical examples. While delay()
serves as a powerful tool for easily pushing tasks into the queue, there are times when finer control is needed. This is where the apply_async()
method, which we will focus on today, truly shines.
Today, we will explore what apply_async()
is, how it differs from delay()
, and how to utilize various options to manage asynchronous tasks more effectively.
1. apply_async()
, A Tool That Opens Up More Possibilities
apply_async()
is another powerful method for scheduling the execution of Celery tasks asynchronously. On the surface, it appears similar to delay()
, but it offers far more options, including when and how tasks are executed, as well as queue configurations, thus providing developers with greater flexibility.
Keep in mind that delay()
is simply a wrapper function that calls apply_async(args=args)
, so you can think of apply_async()
as an extended version of delay()
, which includes all its functionalities while offering more control.
2. The Mechanism of apply_async()
: Subtle Adjustments
The basic operation of apply_async()
is similar to that of delay()
. The core flow involves sending a message containing the task name and arguments to a message broker, which a worker then retrieves and executes. However, apply_async()
allows for detailed adjustments to the task execution process through various parameters.
The main parameters include:
args
: A list or tuple of positional arguments to be passed to the task function.kwargs
: A dictionary of keyword arguments to be passed to the task function.countdown
: Delays the task's execution for the specified number of seconds before running it. For example, settingcountdown=60
adds the task to the queue 60 seconds after it is called.eta
(Estimated Time of Arrival): Schedules the task to be executed at a specific future time, specified as adatetime
object. Note thatcountdown
andeta
are mutually exclusive.expires
: Sets an expiration time for the task. If the specified time passes, the worker will ignore the task. This can be specified as anint
(seconds),float
(seconds), ordatetime
object, which is useful for ephemeral data or time-sensitive tasks.queue
: Routes the task to a specific queue by name. Celery can be configured with multiple queues to handle tasks based on type or importance by different worker groups.routing_key
: Sends the task to a specific queue according to the message broker's routing rules, particularly useful for advanced routing configurations in brokers like RabbitMQ.priority
: Sets the priority of the task. Depending on broker and worker settings, tasks with higher priority may be executed first, typically ranging from 0 (highest) to 9 (lowest).serializer
: Specifies the method used to serialize the task message (e.g., 'json', 'pickle', 'yaml', etc.).compression
: Specifies the method used to compress the task message (e.g., 'gzip', 'bzip2', etc.).headers
: Allows for additional header information to be included in the task message.link
: Specifies a callback task to be executed upon the successful completion of the current task, enabling chaining.link_error
: Specifies an error callback task to be executed if the current task fails.
Through these various parameters, apply_async()
can manage the execution timing of tasks more finely than delay()
, route tasks to specific worker groups, and manage task priorities to meet complex asynchronous processing requirements.
3. Examples of apply_async()
Usage: Application in Different Scenarios
Now, let's observe some examples of how to use apply_async()
with various options, showcasing its power.
Example 1: Executing After a Certain Time (countdown
)
If you need to send a welcome email to a user three days after they sign up:
send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)
By using the countdown
option, the task does not get added to the queue immediately, but rather after the specified time (3 days).
Example 2: Executing at a Specific Time (eta
)
If a daily report needs to be generated at 3 AM every day:
from datetime import datetime, timedelta
target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # 3 AM next day
generate_daily_report.apply_async(eta=target_time)
Using the eta
option allows you to schedule the task to execute exactly at the specified time.
Example 3: Setting Expiration Time (expires
)
If the result of an external API call is only valid for 10 minutes, and there's no need to execute the task afterward:
from datetime import datetime, timedelta
expires_at = datetime.now() + timedelta(minutes=10)
fetch_external_data.apply_async(args=[api_url], expires=expires_at)
The expires
option helps to prevent unnecessary execution of the task, saving resources.
Example 4: Routing to a Specific Queue (queue
)
If you want CPU-intensive image processing tasks to be handled by a separate worker group:
process_image.apply_async(args=[image_id], queue='image_processing')
By configuring Celery to monitor the image_processing
queue with separate workers, image processing tasks will only be handled by that group.
Example 5: Linking Callback Tasks (link
)
If you want to automatically execute a task sending a notification email upon the successful completion of a payment processing task:
If you don't use apply_async
, you might implement the payment processing logic directly in a function and call .delay()
from within that function.
def process_payment(order_id):
# Payment processing logic here
send_notification_email.delay(order_id)
However, by utilizing apply_async()
, you can write it like this:
@shared_task
def process_payment(order_id):
# Payment processing logic
return order_id
@shared_task
def send_notification_email(order_id):
# Notification email sending logic
payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())
Here, .s()
generates a signature for the send_notification_email
task. When the process_payment
task successfully completes, its return value (in this case, order_id
) is passed as an argument to the send_notification_email
task for execution.
4. Choosing Between delay()
and apply_async()
: When to Use What?
By now, you should have a clear understanding of the differences between delay()
and apply_async()
along with their respective advantages. So, when should you choose which method in practice?
-
delay()
:- When you want to execute a task asynchronously in the simplest way
- When you don't need special control over the timing of task execution
- When simplicity of code is a priority
-
apply_async()
:- When you need to schedule task execution after a specific time or at a specific point (
countdown
,eta
) - When you want to set an expiration time for the task to prevent unnecessary execution (
expires
) - When you want to route tasks to specific queues to control worker groups (
queue
) - When you need to manage task priorities (
priority
) - When you need to explicitly specify serialization or compression methods for tasks (
serializer
,compression
) - When you want to set up callbacks to automatically execute other tasks upon success or failure of tasks (
link
,link_error
)
- When you need to schedule task execution after a specific time or at a specific point (
Generally, for simple asynchronous processing, using delay()
is convenient, while apply_async()
is advisable for more complex scenarios requiring finer control. Since delay()
is a simplified form of apply_async()
, you can always implement the same (or greater) functionalities using apply_async()
.
Conclusion
Today, we deeply explored Celery's powerful tool, the apply_async()
method, looking at its relationship with delay()
and various applications. apply_async()
allows for more refined management of asynchronous tasks and effective utilization of Celery according to service requirements.
In the next and final part, we will compare and analyze the core decorators that define Celery tasks, namely @app.task
and @shared_task
, to assist you in choosing the appropriate task definition method based on your project structure and development philosophy. Thank you for joining me on this Celery journey!
If you missed the previous post, please check the link below!
# The Magic of Celery: Unveiling the Inside and Out of delay()
There are no comments.