지난 1편에서는 Celery의 기본적인 필요성과 delay()
메서드의 작동 원리, 그리고 실전 사용 예시를 통해 여러분의 비동기 작업 이해도를 높여보았습니다. delay()
는 간편하게 태스크를 큐에 던지는 강력한 도구였지만, 때로는 더 세밀한 제어가 필요할 때가 있습니다. 바로 그럴 때 빛을 발하는 것이 오늘 우리가 집중적으로 탐구할 apply_async()
메서드입니다.
오늘은 apply_async()
가 무엇인지, delay()
와는 어떻게 다른지, 그리고 다양한 옵션을 활용하여 비동기 작업을 더욱 효과적으로 관리하는 방법에 대해 심층적으로 알아보겠습니다.
1. apply_async()
, 더 많은 가능성을 열어주는 도구
apply_async()
는 Celery 태스크를 비동기적으로 실행하도록 스케줄링하는 또 다른 강력한 메서드입니다. 겉보기에는 delay()
와 유사하지만, 태스크 실행 시점, 방식, 그리고 큐 설정 등 훨씬 더 다양한 옵션을 제공하여 개발자에게 더 많은 유연성을 선사합니다.
delay()
가 단순하게 apply_async(args=args)
의 형태로 호출되는 래퍼 함수라는 점을 기억하면, apply_async()
가 delay()
의 모든 기능을 포함하면서도 더 많은 제어권을 제공하는 확장판이라고 생각할 수 있습니다.
2. apply_async()
의 작동 원리: 섬세한 조율
apply_async()
의 기본적인 작동 방식은 delay()
와 유사합니다. 태스크 이름과 인자를 담은 메시지를 메시지 브로커로 전송하고, 워커가 이를 받아 실행하는 것이 핵심 흐름입니다. 하지만 apply_async()
는 메시지 생성 및 전송 과정에서 다양한 파라미터(parameters) 를 통해 태스크 실행 방식을 정교하게 조율할 수 있도록 합니다.
주요 파라미터들은 다음과 같습니다.
args
: 태스크 함수에 전달될 위치 인자(positional arguments)의 리스트 또는 튜플입니다.kwargs
: 태스크 함수에 전달될 키워드 인자(keyword arguments)의 딕셔너리입니다.countdown
: 태스크를 즉시 실행하지 않고 지정된 초(seconds)만큼 지연시킨 후 실행합니다. 예를 들어,countdown=60
으로 설정하면 태스크는 호출된 시점으로부터 60초 후에 큐에 추가됩니다.eta
(Estimated Time of Arrival): 태스크를 특정 미래 시점에 실행하도록 예약합니다.datetime
객체 형태로 지정합니다.countdown
과eta
는 상호 배타적입니다.expires
: 태스크의 만료 시간을 설정합니다. 지정된 시간이 지나면 워커는 해당 태스크를 실행하지 않고 무시합니다.int
(초),float
(초), 또는datetime
객체로 지정할 수 있습니다. 이는 일시적인 데이터나 만료 기한이 있는 작업에 유용합니다.queue
: 태스크를 특정 이름의 큐로 라우팅합니다. Celery는 여러 개의 큐를 설정하여 태스크의 종류나 중요도에 따라 다른 워커 그룹에서 처리하도록 구성할 수 있습니다.routing_key
: 메시지 브로커의 라우팅 규칙에 따라 태스크를 특정 큐로 보냅니다. RabbitMQ와 같은 메시지 브로커에서 고급 라우팅 설정을 사용할 때 활용됩니다.priority
: 태스크의 우선순위를 설정합니다. 브로커와 워커 설정에 따라 우선순위가 높은 태스크가 먼저 처리될 수 있습니다. 일반적으로 0 (가장 높은 우선순위)부터 9 (가장 낮은 우선순위)까지의 정수 값을 가집니다.serializer
: 태스크 메시지를 직렬화하는 데 사용할 방식을 지정합니다 ('json', 'pickle', 'yaml' 등).compression
: 태스크 메시지를 압축하는 방식을 지정합니다 ('gzip', 'bzip2' 등).headers
: 태스크 메시지에 추가적인 헤더 정보를 포함시킬 수 있습니다.link
: 현재 태스크가 성공적으로 완료되면 실행될 콜백 태스크(callback task) 를 지정합니다. 체이닝(chaining)을 구현하는 데 사용됩니다.link_error
: 현재 태스크가 실패하면 실행될 에러 콜백 태스크(error callback task) 를 지정합니다.
이처럼 다양한 파라미터를 통해 apply_async()
는 delay()
보다 훨씬 세밀하게 태스크의 실행 시점을 제어하고, 특정 워커 그룹으로 태스크를 라우팅하며, 태스크의 우선순위를 관리하는 등 복잡한 비동기 처리 요구 사항을 충족시킬 수 있습니다.
3. apply_async()
사용 예시: 상황별 활용법
이제 다양한 옵션을 활용한 apply_async()
의 사용 예시를 통해 그 강력함을 직접 확인해 보겠습니다.
예시 1: 특정 시간 이후에 실행하기 (countdown
)
사용자가 회원가입 후 3일 뒤에 환영 이메일을 보내야 하는 경우:
send_welcome_email.apply_async(args=[user.email], countdown=3 * 24 * 60 * 60)
이렇게 countdown
옵션을 사용하면 태스크가 즉시 큐에 추가되는 것이 아니라, 지정된 시간(3일) 이후에 큐에 추가되어 워커에 의해 처리됩니다.
예시 2: 특정 시각에 실행하기 (eta
)
매일 새벽 3시에 일일 보고서를 생성하는 태스크를 실행해야 하는 경우:
from datetime import datetime, timedelta
target_time = datetime.now().replace(hour=3, minute=0, second=0, microsecond=0) + timedelta(days=1) # 다음 날 새벽 3시
generate_daily_report.apply_async(eta=target_time)
eta
옵션을 사용하면 태스크가 정확히 지정된 시간에 실행되도록 예약할 수 있습니다.
예시 3: 만료 시간 설정 (expires
)
외부 API 호출 결과가 10분 동안만 유효한 경우, 그 이후에는 태스크를 실행할 필요가 없을 때:
from datetime import datetime, timedelta
expires_at = datetime.now() + timedelta(minutes=10)
fetch_external_data.apply_async(args=[api_url], expires=expires_at)
expires
옵션을 통해 태스크가 불필요하게 실행되는 것을 방지하여 리소스를 절약할 수 있습니다.
예시 4: 특정 큐로 라우팅 (queue
)
CPU를 많이 사용하는 이미지 처리 태스크를 별도의 워커 그룹에서 처리하고 싶을 때:
process_image.apply_async(args=[image_id], queue='image_processing')
Celery 설정을 통해 image_processing
큐를 감시하는 워커들을 별도로 구성하면, 이미지 처리 태스크는 해당 워커 그룹에서만 처리됩니다.
예시 5: 콜백 태스크 연결 (link
)
결제 처리 태스크가 성공적으로 완료되면 알림 이메일을 보내는 태스크를 자동으로 실행하고 싶을 때:
apply_async
를 활용하지 않는다면 아마도 아래와 같이 직접 결제 처리로직을 함수로 구현하고 함수 내부에 .delay()
를 호출하여 메일을 보내게 될 것입니다.
def process_payment(order_id):
# 결제 처리 로직을 작성
send_notification_email.delay(order_id)
하지만 apply_async()
를 활용하면 아래와 같이 작성할 수 있습니다.
@shared_task
def process_payment(order_id):
# 결제 처리 로직
return order_id
@shared_task
def send_notification_email(order_id):
# 알림 이메일 전송 로직
payment_result = process_payment.apply_async(args=[order_id], link=send_notification_email.s())
여기서 .s()
는 send_notification_email
태스크의 시그니처(signature) 를 생성합니다. process_payment
태스크가 성공적으로 완료되면, 그 반환값(여기서는 order_id
)이 send_notification_email
태스크의 인자로 전달되어 실행됩니다.
4. delay()
와 apply_async()
: 언제 무엇을 선택해야 할까?
이제 delay()
와 apply_async()
의 차이점과 각각의 장점을 명확히 이해했을 것입니다. 그렇다면 실제로 어떤 상황에서 어떤 메서드를 선택해야 할까요?
-
delay()
:- 가장 간단하게 태스크를 비동기적으로 실행하고 싶을 때
- 태스크 실행 시점에 대한 특별한 제어가 필요 없을 때
- 코드의 간결성을 중요하게 생각할 때
-
apply_async()
:- 태스크 실행을 특정 시간 이후나 특정 시점에 예약해야 할 때 (
countdown
,eta
) - 태스크의 만료 시간을 설정하여 불필요한 실행을 방지하고 싶을 때 (
expires
) - 태스크를 특정 큐로 라우팅하여 워커 그룹을 제어하고 싶을 때 (
queue
) - 태스크의 우선순위를 관리해야 할 때 (
priority
) - 태스크의 직렬화 방식이나 압축 방식을 명시적으로 지정해야 할 때 (
serializer
,compression
) - 태스크 성공 또는 실패 시 다른 태스크를 자동으로 실행하는 콜백을 설정하고 싶을 때 (
link
,link_error
)
- 태스크 실행을 특정 시간 이후나 특정 시점에 예약해야 할 때 (
일반적으로 간단한 비동기 처리에는 delay()
를 사용하는 것이 편리하며, 더 복잡하고 세밀한 제어가 필요한 경우에는 apply_async()
를 활용하는 것이 좋습니다. delay()
는 apply_async()
의 단순화된 형태이므로, 어떤 경우에도 apply_async()
를 사용하여 동일한 (혹은 더 많은) 기능을 구현할 수 있습니다.
마무리하며
오늘은 Celery의 강력한 도구인 apply_async()
메서드를 깊이 있게 탐구하고, delay()
와의 관계 및 다양한 활용법을 살펴보았습니다. apply_async()는 비동기 작업을 더욱 정교하게 관리하고, 서비스의 요구 사항에 맞게 Celery를 효과적으로 활용할 수 있게 합니다.
다음 마지막 편에서는 Celery 태스크를 정의하는 핵심 데코레이터인 @app.task
와 @shared_task
를 비교 분석하여, 프로젝트 구조와 개발 철학에 맞는 태스크 정의 방식을 선택하는 데 도움을 드리겠습니다. 꾸준히 Celery 여정을 함께 해주셔서 감사합니다!
이전의 글을 보지 못하신 분들은 아래의 링크를 확인해주세요!
댓글이 없습니다.