Как установить RabbitMQ?
Для windows очень просто - скачать с официального сайта инсталлятор и установить. :)
Плюс нужно прописать в PATH путь к консольным скриптам (c:\Program Files\RabbitMQ Server\rabbitmq_server-{VERSION}\sbin\
).
И добавить в .bashrc: alias rabbitmqctl='rabbitmqctl.bat'
, чтобы удобнее было запускать консольные команды.
Для установки на Linux и macOS инструкции есть на официальном сайте.
Как проверить запущен ли сервис?
Выполнить в консоли:
rabbitmqctl status
Как посмотреть список сообщений и количество сообщений в них?
Выполнить в консоли:
rabbitmqctl list_queues
Как добавить сообщение в очередь из php?
Устанавливаем библиотеку php-amqplib
:
composer install php-amqplib/php-amqplib
Создаём объект соединения:
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
Получаем канал:
$channel = $connection->channel();
Декларируем точку доступа (будет создана при первом запуске, при последующих запусках проверит соответствие флагов):
$channel->exchange_declare(
// любое название точки доступа
'hobocta_demo_exchange',
// тип точки доступа,
// "fanout" отправляет сообщения во все очереди, которые доступны этой точке доступа
'fanout',
// passive:
//
// в пассивном режиме точка доступа не будет создаваться;
// если такой точки доступа не было создано ранее,
// то будет выведена ошибка;
// если она была создана ранее,
// то всё ok - она будет использована;
//
// а если поставить false, то это активный режим,
// точка доступа будет создана при её отсутствии
false,
// durable: true - долговечный, точка доступа не пропадёт при перезагрузке сервера
true,
// auto_delete:
// false - не удалять точку доступа даже когда все очереди завершат её использование
false,
// internal:
// false - чтобы точка доступа была видна publisher'ам;
// если true, то точка доступа будем видна только другим точкам доступа
false,
);
Декларируем очередь (будет создана при первом запуске, при последующих запусках проверит соответствие флагов):
$this->channel->queue_declare(
// любое название очереди
'hobocta_demo',
// passive: означает тоже самое, что и для точки доступа, только для очереди
false,
// durable: долговечная, очередь не пропадёт при перезагрузке сервера
true,
// exclusive:
// если true, то это эксклюзивная очередь,
// и она будет доступна только текущему сообщению,
// и она будет удалена, когда соединение закроется;
// нам это не нужно, поэтому ставим false
false,
// auto_delete:
// false - не удалять даже после отключения всех подписчиков
false
);
Привязываем очередь к точке доступа:
$this->channel->queue_bind('hobocta_demo', 'hobocta_demo_exchange');
Создаём тестовое сообщение со случайным содержимым:
// любое содержимое сообщения в виде строки, например json
$messageString = json_encode(array(
'title' => 'Random title ' . uniqid(),
'timestamp' => time(),
));
$message = new AMQPMessage(
$messageString,
array('delivery_mode' => 2)
);
Устанавливаем delivery_mode=2
, чтобы пометить сообщение как устойчивое, снижая тем самым вероятность потери сообщения.
Добавляем сообщение в очередь:
$channel->basic_publish($message, 'hobocta_demo_exchange');
Закрываем соединение:
$channel->close();
$connection->close();
Отправка сообщений готова.
Как прочитать очередь из php?
Чтобы прочитать сообщения (в воркере) нужно для начала сделать ровно тоже самое, что и перед публикацией сообщения: создать \PhpAmqpLib\Connection\AMQPStreamConnection
, получить канал, декларировать точку доступа, декларировать очередь, привязать очередь к точке доступа.
После этого нужно повесить закрытие очереди на shutdown функцию, на случай прекращения работы воркера в результате ошибки:
/**
* @param \PhpAmqpLib\Channel\AMQPChannel $channel
* @param \PhpAmqpLib\Connection\AbstractConnection $connection
*/
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);
Настраиваем QoS (quality of service - качество обслуживания) - порядок передачи сообщений подписчикам.
$channel->basic_qos(
// prefetch_size - размер предварительной выборки, т. е.количество возможных обработчиков:
// ставим null - без ограничения, т. к. "RabbitMQ doesn't implement prefetch_size"
null,
// prefetch_count - количество предварительных выборок,
// определяет сколько сообщений мы можем отдать одному подписчику за раз;
// ставим единицу, чтобы следующее сообщение было отправлено подписчику только тогда,
// когда подписчик подтвердит прочтение предыдущего сообщения с помощью вызва basic_ack в callback
1,
// a_global - глобальный:
// null - настройки QoS должны применяться для получателей
// true - настройки QoS должны применяться к каналу
null
);
Создаём функцию, которая обработает каждое сообщение. Например, просто выведет содержимое сообщения в консоль:
$callback = function (AMQPMessage $message) {
echo sprintf(
"Body: '%s'. Properties: '%s'" . PHP_EOL,
$message->getBody(),
json_encode($message->get_properties())
);
// отправляем подтверждение получения сообщения
/** @var \PhpAmqpLib\Channel\AMQPChannel $channel */
$channel = $message->delivery_info['channel'];
$channel->basic_ack($message->delivery_info['delivery_tag']);
};
Пример вывода в консоль:
Body: '{"title":"Random title 5bfbc7c501cf0","timestamp":1543227333}'. Properties: '{"delivery_mode":2}'
Выполняем подписку на сообщения:
$channel->basic_consume(
'hobocta_demo',
// consumer_tag: тег получателя, валидный в рамках канала, необязательно для заполнения
'',
// no_local:
// если true, то сервер не будет отправлять сообщения соединениям, которые сам опубликовал
false,
// no_ack:
// true - без подтверждения;
// false - обработчик должен получить оповещение как только сообщение будет обработано
false,
// exclusive:
// true - сообщения могут будут получены только в рамках текущего соединения
false,
// nowait:
// если true, то сервер не будет отвечать клиенту,
// соответстенно клиент не должен ждать ответа
false,
// обработчик сообщения
$callback
);
И добавляем циклическую обработку сообщений.
// тут происходит магия бесконечной обработки опубликованных сообщений
// и бесконечного ожидания публикации новых сообщений
while (count($channel->callbacks)) {
$channel->wait();
}
Воркер готов.
Этот код вместе с обёрткой для удобного подключения RabbitMQ я положил на github: publish, consume