Что такое RabbitMQ, зачем он нужен и как его использовать

Около полугода назад на одном проекте мы с напарником столкнулись с проблемой масштабирования, которая в тот момент внезапно ударила по серверу и весело его уронила. Количество задач, которые ставили пользователи, превысило барьеры вычислительных мощностей. Факторов, которые к этому привели, было несколько:

  1. Во-первых, мы паршиво построили архитектуру: вся работа сервиса была заточена только на 1 сервер и каких-либо заделов к масштабированию на 2 или 3, как ты понимаешь, не было.
  2. Во-вторых, мы — не профессионалы, а самоучки, которые пилили проект на скорость, так как «готово должно быть вчера».
  3. Не идёт речи о грамотном распределение ресурсов — вся многопоточность строилась на инструментах, с которыми умел работать PHP, а PHP, как можно догадаться, работает с ними не особо хорошо.

Проблема была решена — был арендован второй сервер, куда были перенесены соответствующие скрипты и с помощью небольшого API делегирована часть задач. Но в момент, когда мы искали решение и думали, что же лучше: по запросу передавать JSON-массив или что-то другое, нашли много интересной информации. В частности google выдавал что-то про «брокеры сообщений», «очереди», какого-то кролика… Причём тут кролик я тогда не понял, чтиво про брокеры — бросил, подумав, что это слишком сложно. Но через некоторое время появилась задача делегировать уже другие данные на ещё один сервер, и тут, на одном из форумов, мне уже явно посоветовали покурить в сторону злополучного кролика — RabbitMQ. К слову, задачу я решил, RabbitMQ оказался не таким и сложным (вернее — его конфигурация), а решение моей конкретной задачи заняло весьма немного кода. Так что же такое RabbitMQ?

RabbitMQ — это платформа позволяющая обмениваться сообщениями. Что может быть в сообщении — решать только тебе. Обмениваться можно как на одном сервере, так и с одного на другой. Это отличный способ масштабирования, так как с хорошо настроенным RabbitMQ мы можем просто подключать новые сервера, настроив на них нужное ПО и прописав конфигурацию, а RabbitMQ будет сам делегировать работу между всеми серверами. На официальном сайте есть мануалы по основным способам использования кролика — ознакомься. Я не буду описывать основные сущности, которые есть в RabbitMQ и на основе который строится протокол AMQP — это можно легко найти в сети.

Я сегодня рассмотрю, как интегрировать RabbitMQ в Symfony 4.1 на примере двух серверов — один будет отправлять сообщения, а другой получать. Какая актуальность данной статьи, если подобные маны уже есть? Ну по-первых, она на русском. Во-вторых, есть несколько ошибок, которые крайне сложно загуглить и решаются они методом научного тыка — вот их-то я и опишу.

У нас есть 2 сервера (хотя все примеры можно проделать и на одном, честно говоря) — один собирает задачи, другой их выполняет. Задачи могут быть любые — на твой вкус и цвет: от отправки email-сообщений, до обработки изображений. Можешь просто выводить данные на экран. Итак, на одном сервере у тебя идёт постановка задач, а на другом — обработка этих задач. 

Сначала я показываю все действия на сервере, который собирает задачи.

Для начала — нужно установить бандл, который является обёрткой над библиотекой, реализующей протокол AMQP:

composer require php-amqplib/rabbitmq-bundle

Ошибок быть не должно, а если и будут — внимательно читай вывод и ставь библиотеки, которые будут указаны после слов, похожих на слово require. Или пиши в комментариях — я с радостью помогу.

Дальше нужно установить rabbitmq-server. Да, у кролика есть свой полноценный сервер, который разворачивается на одном из серверов, если мы рассматриваем обмен между двумя серверами. Тут нужно подумать, что у тебя будет сервером, а что клиентом.  Так как обрабатывает задачи (а значит и выбрасывает их в очередь) сервер, с которым мы сейчас работаем — будет логично установить rabbitmq-server на него Если я тут не прав — пиши в комментариях. Ставим:

sudo apt-get install rabbitmq-server

И тут у тебя может появится первая проблема, об которую ты не слабо разобьёшь лоб:

rabbitmq/bin/rabbitmq-server-wait (code=exited, status=70)

Спустя почти час гугления я пришёл к выводу, что rabbitmq-server не запускается, собака такая, если в файле /etc/hostname у тебя прописано имя с точками. А мне поставили сервер, где в этом имени были точки… Точки эта штука распознавать не умеет.

Теперь добавляем пользователя и выдаём ему права, с помощью rabbitmqcli:

rabbitmqctl add_user username password
rabbitmqctl set_user_tags username administrator
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"

Первая строка — добавляем пользователя и устанавливаем ему пароль. Главное, измени username и password на свои данные. Далее ставим пользователю категорию «администратор». Затем, выдаём доступ к просмотру всего и вся. Я записал эти параметры в файл config/services.yaml:

parameters:
    locale: 'en'
    rabbitmq_user: 'username'
    rabbitmq_password: 'password'

Теперь нужно добавить сущность, с которой будет общаться rabbitmq-server — виртуальный хост, через который будет настраиваться конкретный обмен между конкретным постановщиком задач — producer (пишет в очередь) и потребителем задач — consumer (читает из очереди). Я не знаю почему, но в мануалах по интеграции Symfony и RabbitMQ этот пункт пропускают, а искать его самому и думать в чём ошибка — это дополнительное время. Добавляем:

rabbitmqctl add_vhost my_project
rabbitmqctl set_permissions -p my_project username ".*" ".*" ".*"

username — это тот самый пользователь, созданный выше. my_project — имя хоста. Добавляем в параметры:

parameters:
    locale: 'en'
    rabbitmq_user: 'username'
    rabbitmq_password: 'password'
    rabbitmq_statuslayer_vhost: 'my_project'

Идём дальше. А дальше нам нужно настроить конфиги. У тебя должен был появиться файл config/packages/old_sound_rabbit_mq.yaml — открывай его и добавляй:

old_sound_rabbit_mq:
    connections:
        default:
            host: localhost
            port: 5672
            user: '%rabbitmq_user%'
            password: '%rabbitmq_password%'
            vhost: '%rabbitmq_statuslayer_vhost%'
    producers:
        my_task:
            connection: default
            exchange_options:
                name: my_task_exchange
                type: direct

Давай разберёмся. Я надеюсь, что ты ознакомился с примерами на официальном сайте и знаешь, что producer — публикует сообщения, exchange — определяет, в какую (если их несколько) очередь сообщение выбросить. port у rabbitmq по-умолчанию 5672 — таким и оставляем. Теперь нам нужно создать exchange, делаем это с помощью команды:

php bin/console rabbitmq:setup-fabric

Всё успешно? Отлично, мы настроили rabbitmq на сервере, который будет писать в очередь. У кролика есть крутой интерфейс — зайди на ip сервера через порт 15672, увидишь нечто следующее:

Входи под данными, которые были созданы чуть выше и увидишь графики, метрики и прочую интересную штуку.

Теперь переходим на сервер, который будет читать сообщения. Тут всё гораздо проще. Сначала ставь бандл, затем открывай файл конфига и пиши:

old_sound_rabbit_mq:
    connections:
        default:
            host: ip_main_server
            port: 5672
            user: '%rabbitmq_user%'
            password: '%rabbitmq_password%'
            vhost: '%rabbitmq_statuslayer_vhost%'
     consumers:
        cons_name:
            connection:       default
            exchange_options: {name: 'my_project', type: direct}
            queue_options:    {name: 'my_task_queue'}
            callback:         callback_service

Ты уже понял, что параметры нужно определить в config/services.yaml в секции параметры. Имя пользователя и пароль указывай те, который были созданы выше. ip_main_server — это ip сервера, с которым мы работали выше.

Далее настраивается consumer. Параметр exchange_options имеет значение, которые мы создавали выше.

Параметр queue_options — это имя очереди. Можешь задать любое, она будет создана автоматически.

callback — это имя сервиса, который будет вызывать класс, обрабатывающий каждое новое сообщение из очереди. Давай его определим. Иди в файл config/services.yaml и добавляй:

callback_service:
  class: App\RabbitMQ\CallbackService

Как ты догадался, нужно создать этот класс. Создаётся он по определённому шаблону — его нужно унаследовать от класса Command, который нам любезно предоставляет RabbitMQ Bundle.

App\RabbitMQ\CallbackService:

<?php
namespace App\RabbitMQ;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class CheckInstagramAccount implements ConsumerInterface
{
    public function execute(AMQPMessage $msg)
    {
        print_r($msg->body);
    }
}

Просто выводим тело письма, которое пришло. Я тестировал на массивах, поэтому — print_r. Теперь запускаем consumer в режим ожидания сообщений:

php bin/console rabbitmq:consumer cons_name

И теперь с сервера, где мы устанавливали producer, отправим первое сообщение. Создаём простой контроллер:

/**
 * @Route("/check_data_test", name="check_data_test")
 */
public function check_data_test(Request $request)
{
        $data = [];
        $data["login"] = "login";
        $data["pass"] = "pass";
        $myProducer = $this->get('old_sound_rabbit_mq.my_task_producer');
        $myProducer->setContentType('application/json');
        $myProducer->publish(json_encode($data));
        return new JsonResponse($data);
}

Переходи на /check_data_test и наслаждайся — ты отправил и получил первое сообщение ?