На протяжении последних нескольких месяцев перед нами стояла сложная задача, заключавшаяся в выполнении множества микро-услуг для тысяч запросов наиболее эффективным способом. Нашей первой мыслью было использовать Очередь (что намного лучше, чем создавать таблицу x-SQL со своими собственными статусами, датами создания, приоритетами и т.д.) — и оказались правы. Однако существует очень много вариантов работы с очередью. Итак, вот наши находки, основанные не столько на их технических характеристиках, сколько на наших личных предпочтениях:
- RabbitMQ: Эффективный, быстрый и надежный инструмент. Но после нескольких месяцев работы с ним мы просто задыхались среди множества вариантов и терминов. Нам пришлось учить много всего, а нам была нужна всего лишь очередь!
- Amazon SQS: Отличная система с ее легендарным «для работы с AWS (Веб-сервисы Amazon) вам необходимо иметь докторскую степень». Но к нашему удивлению, мы смогли создать систему в очень короткие сроки. К сожалению, сообщения приходят с задержкой, а между публикацией и потреблением приходится ждать «до минуты и более».
- Beanstalkd: В результате поисков мы наткнулись на Beanstalkd. До абсурда простой инструмент, быстро устанавливается, понятная терминология, хороший уровень выполнения. Недостатки: Защита полностью отсутствует (остается только заблокировать порты и молиться). И еще мне придется найти нормальный инструмент для отслеживания сообщений.
Я покажу, как мы устанавливали Beanstalkd. Итак, я исхожу из следующего:
- Вы знаете достаточно о PHP
- Вы обладаете базовыми знаниями о компоновщике
- Вы разбираетесь в объектно-ориентированном PHP
- Вы как минимум следуете стандартам PSR-0 или используете какой-нибудь достойный фреймворк.
Установка (Ubuntu)
aptitude install -y beanstalkd
Готово. Налейте себе чашечку кофе. Отличная работа.
Использование
Все по порядку. Сначала нам нужен так называемый издатель, а затем потребители/рабочие.
Разместите эту библиотеку в вашем компоновщике (composer.json):
"pda/pheanstalk": "~3.0.2",
Я создал простой класс под названием Queue\Publisher.php
и Queue\Consumer.php
:
namespace Queue;
use Pheanstalk\Pheanstalk;
class Publisher
{
private $queue;
public function __construct(array $args)
{
$this->config = ['queue' => ['host' => 'yourdomain.com']; // Не обращайте на это внимания. Обычно у менять есть файл конфигурации, но я переместил его сюда специально для целей этой статьи. В качестве хоста используйте IP или Домен.
$this->queue = $args['queue']; // Просто имя очереди. Я передаю его в качестве параметра. Beanstalkd называет очереди «трубами», но вы можете выбрать любое имя. Если его не существует, оно будет создано.
$this->client = new Pheanstalk($this->config['queue']['host']); // Инстанцируйте объект.
}
public function send($request)
{
return $this->client
->useTube($this->queue)
->put(json_encode($request)); // Отправьте что угодно с кодировкой в формате json – и готово!
}
}
И для потребителя (consumer):
namespace Queue;
use Pheanstalk\Pheanstalk;
class Consumer
{
private $queue;
public function __construct(array $args)
{
$this->config = ['queue' => ['host' => 'yourdomain.com']; // Не обращайте на это внимания. Обычно у меня есть файл конфигурации, но я переместил его сюда специально для целей этой статьи. В качестве хоста используйте IP или Домен.
$this->queue = $args['queue'];
$this->client = new Pheanstalk($this->config['queue']['host']);
}
public function listen()
{
$this->client->watch($this->queue); // Снова передайте имя очереди.
while ($job = $this->client->reserve()) { // Продолжайте это делать... чтобы он всегда слушал.
$message = json_decode($job->getData(), true); // Расшифруйте сообщение
$status = $this->process($message);
if ($status)
$this->client->delete($job);
else
$this->client->delete($job); // Удаляйте в любом случае. Позже попытка будет повторена.
}
}
public function process($msg)
{
// Выполните какую-нибудь операцию и возвратите значение `true` в случае успеха или значение `false`
}
Наконец, вам нужно вызвать это:
$publish = new Queues\Publisher(['queue' => "someQueue"]);
$publish->send("Do this..."); // Это может быть совокупность, имя класса и т.д.
А на другой стороне извлечь это:
$consumer = new Queues\Consumer(['queue' => "someQueue"]); // Обычно это выполняется через консоль.
Готово! В нашем приложении мы используем PhalconPHP, и у нас есть такое понятие, как Задачи. Мы используем эти задачи для управления Потребителем, а Cronjob мы используем для «публикации» сообщений. Аналогично, мы можем выполнять большое количество потребительских задач для более быстрого обслуживания очереди.
Для увеличения масштабируемости мы можем настроить любое количество серверов с одним и тем же кодом, направить потребителя на один и тот же IP/Домен Beanstalkd’а и обслуживать еще больше потребителей. Так что возможности для расширения фактически безграничны.
И совет напоследок. Мы выполняем задачи через Supervisor. Скажем, у нас есть 10 рабочих (workers), что позволяет нам очистить очереди где-то за 15 минут. (Задержка происходит во внешних API, к которым мы делаем запрос на каждую тысячу обрабатываемых нами сообщений). Если ваши операции носят местный характер или в состоянии контролировать их выполнение, вы можете обрабатывать сотни сообщений в секунду.