Как запустить асинхронную задачу в Celery

Ответ

Для асинхронного запуска задачи в Celery необходимо определить ее с помощью декоратора @app.task и вызвать, используя метод .delay() или .apply_async().

Пример определения и вызова задачи

  1. Определите задачу в вашем файле tasks.py:

    from celery import Celery
    import time
    
    # Инициализация приложения Celery
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def process_data(x, y):
        time.sleep(5)  # Имитация долгой операции
        return x + y
  2. Вызовите задачу из другого модуля вашего приложения:

    from .tasks import process_data
    
    # Задача отправляется в очередь на выполнение, 
    # основной поток не блокируется.
    task_result = process_data.delay(10, 25)
    
    # task_result - это объект AsyncResult, который можно использовать
    # для получения статуса и результата задачи.
    print(f"Задача отправлена, ID: {task_result.id}")

Способы вызова

  • *`.delay(args, kwargs)` Это удобный шорткат для .apply_async(). Он принимает только аргументы для самой задачи.

  • .apply_async(args=[...], kwargs={...}, ...) Более гибкий метод, позволяющий передавать дополнительные параметры выполнения, такие как:

    • countdown: отложить выполнение на N секунд.
    • eta: выполнить задачу в конкретное время (объект datetime).
    • queue: отправить задачу в определенную очередь.
    # Запустить задачу через 60 секунд
    process_data.apply_async(args=[10, 25], countdown=60)

Запуск воркера

Важно: Чтобы задачи начали выполняться, необходимо запустить хотя бы один воркер Celery. Он будет слушать очередь сообщений и выполнять поступающие задачи.

# В терминале, в директории проекта
celery -A your_project.tasks worker --loglevel=info

Ответ 18+ 🔞

Да ты посмотри, какой народ-то безмозглый, блядь! Сидят, думают, как асинхронно задачу в Celery запустить, а там всё проще, чем кажется, ёпта!

Смотри сюда, распиздяй. Чтобы твоя функция стала задачей, её надо просто пометить, как падлу на районе. Декоратором @app.task. Вот и всё, блядь. После этого она не просто функция, а уже полноценная задача, которую можно в очередь пнуть.

Шаг первый, объявляем задачу. Пишем в tasks.py:

from celery import Celery
import time

# Это наш главный начальник, брокер. Он раздаёт поручения. Обычно Redis.
app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def process_data(x, y):
    time.sleep(5)  # Представь, что тут какая-то тяжёлая хуйня, типа отчёта сводного
    return x + y

Вот, функция process_data теперь не просто так, а @app.task. Она готова к отправке на каторгу.

Шаг второй, вызываем эту падлу. Из любого другого места в коде:

from .tasks import process_data

# Всё, хуяк — и задача улетела в очередь. Главный поток даже не чихнул.
task_result = process_data.delay(10, 25)

# task_result — это как квиток из гардероба. По нему потом результат заберёшь.
print(f"Задача отправлена, ID: {task_result.id}")

Вот и вся магия, блядь. Не надо ждать, пока она там пять секунд спать будет. Отправил и пошёл дальше пиво пить.

А есть же ещё два способа её вызвать, хитрая жопа:

  • .delay(10, 25) — это как шорткат, для ленивых. Просто аргументы суёшь и всё.
  • .apply_async(args=[10, 25], countdown=60) — а это уже для продвинутых распиздяев. Тут можно и отложить выполнение (countdown), и в конкретную очередь отправить, и время точное указать. Гибкость — пиздец.

И главное, ёпта, не забудь! Задачи сами не выполнятся, как по волшебству. Надо запустить воркера — это такой раб на галерах, который из очереди задания забирает и выполняет. Без него нихуя не сдвинется.

Запускаешь в терминале и наблюдаешь, как он трудится:

celery -A your_project.tasks worker --loglevel=info

Вот теперь всё, блядь. Отправил задачу, воркер её подхватил, выполнил, а ты в это время можешь дальше свой интерфейс рисовать. Красота, ёпта!