Как запустить асинхронную задачу в Celery

Ответ

Для асинхронного запуска задачи в Celery необходимо определить ее с помощью декоратора @app.task и вызвать, используя метод .delay() или .apply_async().

Пример определения и вызова задачи

  1. Определите задачу в вашем файле tasks.py:

    from celery import Celery
    import time
    
    # Инициализация приложения Celery
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def process_data(x, y):
        time.sleep(5)  # Имитация долгой операции
        return x + y
  2. Вызовите задачу из другого модуля вашего приложения:

    from .tasks import process_data
    
    # Задача отправляется в очередь на выполнение, 
    # основной поток не блокируется.
    task_result = process_data.delay(10, 25)
    
    # task_result - это объект AsyncResult, который можно использовать
    # для получения статуса и результата задачи.
    print(f"Задача отправлена, ID: {task_result.id}")

Способы вызова

  • *`.delay(args, kwargs)` Это удобный шорткат для .apply_async(). Он принимает только аргументы для самой задачи.

  • .apply_async(args=[...], kwargs={...}, ...) Более гибкий метод, позволяющий передавать дополнительные параметры выполнения, такие как:

    • countdown: отложить выполнение на N секунд.
    • eta: выполнить задачу в конкретное время (объект datetime).
    • queue: отправить задачу в определенную очередь.
    # Запустить задачу через 60 секунд
    process_data.apply_async(args=[10, 25], countdown=60)

Запуск воркера

Важно: Чтобы задачи начали выполняться, необходимо запустить хотя бы один воркер Celery. Он будет слушать очередь сообщений и выполнять поступающие задачи.

# В терминале, в директории проекта
celery -A your_project.tasks worker --loglevel=info