前回のパートでは、Celeryの基本的な必要性とdelay()メソッドの作動原理、そして実際の使用例を通じて、皆さんの非同期作業の理解度を高めました。delay()は簡単にタスクをキューに投げる強力なツールですが、時にはもっと細かい制御が必要になることもあります。まさにその時に力を発揮するのが、本日集中して探求するapply_async()メソッドです。

今日はapply_async()が何か、delay()とはどのように異なるのか、そしてさまざまなオプションを活用して非同期作業をより効果的に管理する方法について深く探っていきます。

Celery cartoon image working hard


1. apply_async()、より多くの可能性を開くツール

apply_async()はCeleryタスクを非同期的に実行するためにスケジューリングするもうひとつの強力なメソッドです。一見するとdelay()と似ていますが、タスク実行時点、方式、そしてキュー設定など、もっと多くのオプションを提供することで、開発者により多くの柔軟性をもたらします。

delay()が単純にapply_async(args=args)の形で呼ばれるラッパー関数であることを覚えておけば、apply_async()delay()のすべての機能を含みつつも、より多くの制御権を提供する拡張版と捉えることができます。


2. apply_async()の作動原理:精巧な調整

apply_async()の基本的な作動方式はdelay()と似ています。タスク名と引数を含むメッセージをメッセージブローカーに送信し、ワーカーがこれを受け取って実行するのが主な流れです。しかし、apply_async()はメッセージ生成や送信の過程でさまざまなパラメータ(parameters)を通じてタスク実行の方式を精巧に調整できるようにしています。

主なパラメータは以下の通りです。

  • args: タスク関数に渡される位置引数(positional arguments)のリストまたはタプルです。
  • kwargs: タスク関数に渡されるキーワード引数(keyword arguments)の辞書です。
  • countdown: タスクを即座に実行するのではなく、指定された秒(seconds)だけ遅延させてから実行します。例えば、countdown=60と設定すると、タスクは呼ばれた時点から60秒後にキューに追加されます。
  • eta (Estimated Time of Arrival): タスクを特定の未来の時点で実行するように予約します。datetimeオブジェクト形式で指定します。countdownetaは相互排他的です。
  • expires: タスクの有効期限を設定します。指定された時間が経過すると、ワーカーはそのタスクを実行せずに無視します。int(秒)、float(秒)、またはdatetimeオブジェクトで指定できます。これは一時的なデータや有効期限のある作業に役立ちます。
  • queue: タスクを特定の名前のキューにルーティングします。Celeryは複数のキューを設定し、タスクの種類や重要度に応じて異なるワーカーグループで処理できるように構成できます。
  • routing_key: メッセージブローカーのルーティング規則に従って、タスクを特定のキューに送ります。RabbitMQなどのメッセージブローカーで高度なルーティング設定を使用する際に利用されます。
  • priority: タスクの優先順位を設定します。ブローカーとワーカーの設定に応じて、優先順位の高いタスクが先に処理される場合があります。一般的に0(最も高い優先順位)から9(最も低い優先順位)までの整数値を持ちます。
  • serializer: タスクメッセージをシリアライズするために使用する方式を指定します('json', 'pickle', 'yaml'など)。
  • compression: タスクメッセージを圧縮する方法を指定します('gzip', 'bzip2'など)。
  • headers: タスクメッセージに追加のヘッダー情報を含めることができます。
  • link: 現在のタスクが成功裏に完了した場合に実行されるコールバックタスク(callback task)を指定します。チェイニング(chaining)を実装するために使用されます。
  • link_error: 現在のタスクが失敗した場合に実行されるエラーコールバックタスク(error callback task)を指定します。

このようにさまざまなパラメータを通じて、apply_async()delay()よりも遥かに細かくタスクの実行時点を制御し、特定のワーカーグループにタスクをルーティングし、タスクの優先順位を管理するなど、複雑な非同期処理要件を満たすことができます。


3. apply_async()の使用例:状況別の活用法

さあ、さまざまなオプションを活用したapply_async()の使用例を通じて、その強力さを直接確認してみましょう。

例1: 特定の時間後に実行する (countdown)

ユーザーが会員登録後3日後に歓迎メールを送る必要がある場合:

send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)

このようにcountdownオプションを使用すると、タスクは即座にキューに追加されるのではなく、指定された時間(3日)後にキューに追加され、ワーカーによって処理されます。

例2: 特定の時刻に実行する (eta)

毎日午前3時に日報を生成するタスクを実行する必要がある場合:

from datetime import datetime, timedelta

target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # 次の日の午前3時

generate_daily_report.apply_async(eta=target_time)

etaオプションを使用すると、タスクが正確に指定された時間に実行されるように予約できます。

例3: 有効期限を設定する (expires)

外部API呼び出しの結果が10分間のみ有効な場合、その後にタスクを実行する必要がないとき:

from datetime import datetime, timedelta

expires_at = datetime.now() + timedelta(minutes=10)

fetch_external_data.apply_async(args=[api_url], expires=expires_at)

expiresオプションを通じて、タスクが不必要に実行されるのを防ぎ、リソースを節約できます。

例4: 特定のキューにルーティングする (queue)

CPUを多く使用する画像処理タスクを別のワーカーグループで処理したい場合:

process_image.apply_async(args=[image_id], queue='image_processing')

Celeryの設定を通じてimage_processingキューを監視するワーカーを別途構成すれば、画像処理タスクはそのワーカーグループでのみ処理されます。

例5: コールバックタスクを連結する (link)

決済処理タスクが成功裏に完了したら、通知メールを送るタスクを自動的に実行したい場合: apply_asyncを活用しない場合、恐らく以下のように直接決済処理ロジックを関数として実装し、関数内で.delay()を呼び出してメールを送り続けることでしょう。

def process_payment(order_id):
    # 決済処理ロジックを作成
    send_notification_email.delay(order_id)

ですが、apply_async()を活用すると、以下のように書くことができます。

@shared_task
def process_payment(order_id):
    # 決済処理ロジック
    return order_id

@shared_task
def send_notification_email(order_id):
    # 通知メール送信ロジック

payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())

ここで.s()send_notification_emailタスクのシグネチャ(signature)を生成します。process_paymentタスクが成功裏に完了すると、その返り値(ここではorder_id)がsend_notification_emailタスクの引数として渡されて実行されます。


4. delay()apply_async():いつ何を選択すべきか?

これでdelay()apply_async()の違いとそれぞれの利点を明確に理解したことでしょう。それでは実際にどの状況でどのメソッドを選択すべきですか?

  • delay():

    • 最も簡単にタスクを非同期的に実行したい場合
    • タスク実行時点に対する特別な制御が必要ない場合
    • コードの簡潔さを重要視する場合
  • apply_async():

    • タスク実行を特定の時間以降や特定の時点に予約する必要がある場合(countdowneta
    • タスクの有効期限を設定して不要な実行を防ぎたい場合(expires
    • タスクを特定のキューにルーティングしてワーカーグループを制御したい場合(queue
    • タスクの優先順位を管理する必要がある場合(priority
    • タスクのシリアライズ方式や圧縮方式を明示的に指定する必要がある場合(serializercompression
    • タスクの成功または失敗時に他のタスクを自動的に実行するコールバックを設定したい場合(linklink_error

一般的には簡単な非同期処理にはdelay()を使用するのが便利であり、さらに複雑で細かい制御が必要な場合にはapply_async()を利用する方が良いでしょう。delay()apply_async()の簡略化された形なので、どのような場合でもapply_async()を使用して同じ(またはより多くの)機能を実装できます。


まとめ

今日はCeleryの強力なツールであるapply_async()メソッドを深く探求し、delay()との関係やさまざまな活用法を見てきました。apply_async()は非同期作業をより精緻に管理し、サービスの要件に合わせてCeleryを効果的に活用できるようにします。

次回の最終回では、Celeryタスクを定義するための重要なデコレーターである@app.task@shared_taskを比較分析し、プロジェクトの構造や開発哲学に適したタスク定義の方法を選ぶ手助けをします。引き続きCeleryの旅にお付き合いくださり、ありがとうございます!

以前の投稿を見逃した方は、以下のリンクをご確認ください!

# Celeryの魔法_ delay()の表と裏を探る