Очередь сообщений и асинхронные задачи с помощью Celery и RabbitMQ

Асинхронное или неблокирующее выполнение операций — это такой метод исполнения, при котором некоторые задачи выполняются отдельно от основного потока программы. Такой подход даёт несколько преимуществ, одно из которых — непрерывная работа кода на стороне пользователя.

Передача сообщений — это метод, при помощи которого компоненты программы могут взаимодействовать друг с другом и передавать информацию. Она может быть выполнена как синхронно, так и асинхронно, а так же она обеспечивает процесс коммуникации между отдельными частями программы. Передача сообщений часто применяется в качестве альтернативы базам данных, причем такой метод обеспечивает дополнительный функционал как улучшение производительности и работа в оперативной памяти.

Celery — очередь задач, построенная на системе асинхронной передачи сообщений. В программировании, Celery можно использовать в качестве хранилища для отложенных задач. Программа, передавшая задачу, может беспрепятственно продолжать работать, а впоследствии она обращается к celery для подтверждения окончания процесса вычисления, чтобы получить необходимые данные.

Несмотря на то, что celery написана на Python, работать с ней можно при использовании любого языка программирования при помощи webhooks.

Внедрение очереди задач в свое приложение позволит вам выгружать эти задачи и продолжать работать с пользовательским интерфейсом без каких-либо задержек. Таким образом, вы легко избегаете блокировки GUI при запуске долгих и сложных вычислений.

В этой статье мы установим и настроим очередь задач celery при помощи RabbitMQ в качестве системы сообщений на Ubuntu сервере.

Установка компонентов

Установка Celery

Так как Celery написана на Python, устанавливается она, как и любой другой пакет для Python. Сделаем как положено, а именно создадим виртуальную среду, в которую установим систему сообщений. Таким образом, мы избежим возможных конфликтов с другими проектами на Python, которые могут быть в вашей системе.

Установим пакет для виртуальной среды Python из стандартного репозитория пакетов Ubuntu:

sudo apt-get update
sudo apt-get install python-virtualenv

Создадим отдельный каталог для системы сообщений:

mkdir ~/messaging
cd ~/messaging

Теперь мы готовы создать виртуальную среду для установки celery:

virtualenv --no-site-packages venv

Активируем только что созданную среду:

source venv/bin/activate

Приглашение к вводу в командной строке должно измениться, тем самым показав, что вы работаете в виртуальной среде. То есть все пакеты, которые мы будет устанавливать, находясь в виртуальной среде, будут действовать только в её пределах.

В любой момент вы можете выйти из виртуальной среды:

deactivate

Убедившись, что мы работаем в виртуальной среде, можно установить celery при помощи pip:

pip install celery

Установка RabbitMQ

Для celery требуется менеджер сообщений для обработки сообщений поступающих извне. Обычно такого менеджера называют брокером. Существует целый ряд брокеров — реляционные БД, NoSQL БД, хранилища типа ключ-значение и готовые приложения системы сообщений.

Мы воспользуемся системой сообщений RabbitMQ, так как она отлично взаимодействует с celery и может похвастаться стабильной работой. Нам такое решение подходит как нельзя лучше, так как она уже включает в себя необходимый нам функционал.

Установим RabbitMQ из стандартных репозиториев Ubuntu:

sudo apt-get install rabbitmq-server

После установки RabbitMQ будет автоматически запущен.

Создание экземпляра Celery

Чтобы начать работать с Celery нам потребуется создать его экземпляр. Этот процесс включает в себя импорт необходимого пакета, создания «приложения» и настройки задач, которые celery будет запускать в фоновом режиме.

Создадим скрипт Python tasks.py в каталоге для системы сообщений, в котором мы будем устанавливать необходимые задачи.

sudo nano ~/messaging/tasks.py

Первое что необходимо сделать — импортировать функцию Celery из пакета celery:

from celery import Celery

После чего создадим объект Celery, который подключится к службе RabbitMQ:

from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')

Первый аргумент класса Celery — приставка, которая будет добавлена ко всем задачам. Параметр backendнеобязательный, его задают при необходимости получения статуса выполнения задачи в фоновом режиме или получения результата её выполнения.

Если ваши задачи просто выполняют серию операций и не возвращают ничего в программу, то можно опустить этот параметр. Если некоторые из них все-таки что-то возвращают, то лучше активировать этот параметр и при необходимости отключать его для конкретного случая.

Параметр broker указывает URL для соединения с брокером. В нашем случае это служба RabbitMQ, запущенная на сервере. RabbitMQ работает по протоколу amqp. Если вы не вносили изменений в настройку RabbitMQ, то введенного значения достаточно для подключения celery.

Построение задач Celery

Теперь добавим несколько задач.

Каждая задача Celery должна начинаться с объявления декоратора @app.task. Таким образом, celery определяет функции, к которым она еще может добавить свои функции связанные с построением очереди. После каждого декоратора мы создаем простую функцию для запуска.

В качестве первой задачи создадим функцию, которая будет выводить сообщение в консоль.

from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task
def print_hello():
    print 'hello there'

Так как эта функция ничего не возвращает (она просто выводит сообщение в консоль), то мы можем указать celery, чтобы та не хранила информацию о ходе выполнения этой задачи.

На данном этапе это сделать довольно просто:

from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task(ignore_result=True)
def print_hello():
    print 'hello there'

Теперь создадим задачу для генерации целых чисел (взято из RossettaCode). Работа этой задачи может занять определенное время, поэтому она является хорошим примером работы с системой асинхронных сообщений.

from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task(ignore_result=True)
def print_hello():
    print 'hello there'
@app.task
def gen_prime(x):
    multiples = []
    results = []
    for i in xrange(2, x+1):
        if i not in multiples:
            results.append(i)
            for j in xrange(i*i, x+1, i):
                multiples.append(j)
  return results

Так как нам важен результат, который вернет функция, и время когда она закончит свою работу (чтобы мы могли использовать результат её выполнения), мы не будем добавлять параметр ignore_result для этой задачи.

Сохраните и закройте файл.

Запуск рабочих процессов Celery

Теперь мы готовы запустить процессы Celery, чтобы те начали принимать соединения от приложений. Система будет использовать только что созданный нами файл, для получения списка задач.

Запуск рабочего процесса включает в себя вызов самого приложения с командой celery. Укажем знак &для запуска задачи в фоновом режиме:

celery worker -A tasks &

Таким образом, приложение будет запущено вне терминала, что позволит нам дальше работать с ним. Если вам требуется запустить несколько процессов, то можно вызвать их несколько раз, указав имя для каждого при помощи ключа -n:

celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &

%h автоматически изменится на имя хоста.

Для остановки процессов можно воспользоваться командой kill. Мы получим id процессов и с помощью них закроем работающие процессы.

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill

Таким образом, процесс закончит текущую задачу перед завершением. Если же вы хотите закрыть процессы не дожидаясь окончания выполнения задачи, то воспользуйтесь следующей командой:

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

Использование очереди

Мы можем использовать рабочие процессы для выполнения задач программы в фоновом режиме. Не будем создавать целые программы чтобы продемонстрировать это. Воспользуемся возможностями интерпретатора Python:

python

Импортируем наши функции в командной строке:

from tasks import print_hello
from tasks import gen_prime

Как вы сами видите, ничего особенного в них нет. Первая выводит строку:

print_hello()
# hello there

А вторая возвращает список целых чисел:

primes = gen_prime(1000)
print primes

Если мы увеличим второй параметр функции get_prime, то её выполнение затянется.

primes = get_primes(50000)

Остановить выполнение функции можно нажав CTRL+C. Таким образом, очевидно, что процесс не работает в фоновом режиме.

Чтобы воспользоваться фоновым обработчиком, необходимо использовать метод .delay. При помощи этого метода мы передаем выполнение функции обработчику celery и функция должна сразу же вернуть нас в консоль без каких-либо задержек:

primes = gen_prime.delay(50000)

Теперь выполнение задачи взяли на себя рабочие процессы, которые мы создали ранее. Так как мы задали параметр backend, мы можем проверить состояние выполнения задачи и получить результат её выполнения.

Для проверки выполнения задачи используйте метод .ready:

primes.ready()
# False

Значение False говорит о том, что задача еще не закончена и результат недоступен. Если мы получили True в ответ, то можем получить резульат.

primes.ready()
# True

Получить результат мы можем при помощи метода .get. Сначала надо проверить, что задача выполнена, а потом использовать метод .get для получения результата задачи:

print primes.get()
# результат
'''
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .
'''

Если же вы не использовали метод ready перед вызовом get, то стоит добавить параметр timeout, чтобы программа не зависла в ожидании результата, иначе мы лишаемся всех преимуществ работы с системой сообщений:

print primes.get(timeout=2)

Если по истечению timeout результат не будет получен, то будет создано исключение, которое вы уже обработаете в своей программе.

Заключение

Конечно того, что я рассказал, уже достаточно для начала работы с celery, но это только верхушка айсберга. Celery позволяет связывать задачи между собой, группировать и комбинировать их.

Несмотря на то, что Celery написана на Python, с ней можно работать на любом языке программирования благодаря webhooks. Таким образом, задачи можно переносить в фоновую обработку, не ограничивая себя одним языком.