RabbitMQ

Теги: php

Как установить RabbitMQ?

Для windows очень просто - скачать с официального сайта инсталлятор и установить. :)

Плюс нужно прописать в PATH путь к консольным скриптам (c:\Program Files\RabbitMQ Server\rabbitmq_server-{VERSION}\sbin\).

И добавить в .bashrc: alias rabbitmqctl='rabbitmqctl.bat', чтобы удобнее было запускать консольные команды.

Для установки на Linux и macOS инструкции есть на официальном сайте.

Как проверить запущен ли сервис?

Выполнить в консоли:

rabbitmqctl status

Как посмотреть список сообщений и количество сообщений в них?

Выполнить в консоли:

rabbitmqctl list_queues

Как добавить сообщение в очередь из php?

Устанавливаем библиотеку php-amqplib:

composer install php-amqplib/php-amqplib

Создаём объект соединения:

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

Получаем канал:

$channel = $connection->channel();

Декларируем точку доступа (будет создана при первом запуске, при последующих запусках проверит соответствие флагов):

$channel->exchange_declare(
    // любое название точки доступа
    'hobocta_demo_exchange',

    // тип точки доступа,
    // "fanout" отправляет сообщения во все очереди, которые доступны этой точке доступа
    'fanout',

    // passive:
    //
    // в пассивном режиме точка доступа не будет создаваться;
    // если такой точки доступа не было создано ранее,
    // то будет выведена ошибка;
    // если она была создана ранее,
    // то всё ok - она будет использована;
    //
    // а если поставить false, то это активный режим,
    // точка доступа будет создана при её отсутствии
    false,

    // durable: true - долговечный, точка доступа не пропадёт при перезагрузке сервера 
    true,

    // auto_delete:
    // false - не удалить точку доступа даже когда все очереди завершат её использование
    false,

    // internal:
    // false - чтобы точка доступа была видна publisher'ам;
    // если true, то точка доступа будем видна только другим точкам доступа
    false,
);

Декларируем очередь (будет создана при первом запуске, при последующих запусках проверит соответствие флагов):

$this->channel->queue_declare(
    // любое название очереди
    'hobocta_demo',

    // passive: означает тоже самое, что и для точки доступа, только для очереди
    false,

    // durable: долговечная, очередь не пропадёт при перезагрузке сервера
    true,

    // exclusive:
    // если true, то это эксклюзивная очередь,
    // и она будет доступна только текущему сообщению,
    // и она будет удалена, когда соединение закроется;
    // нам это не нужно, поэтому ставим false
    false,

    // auto_delete:
    // false - не удалять даже после отключения всех подписчиков
    false
);

Привязываем очередь к точке доступа:

$this->channel->queue_bind('hobocta_demo', 'hobocta_demo_exchange');

Создаём тестовое сообщение со случайным содержимым:

// любое содержимое сообщения в виде строки, например json
$messageString = json_encode(array(
    'title' => 'Random title ' . uniqid(),
    'timestamp' => time(),
));

$message = new AMQPMessage(
    $messageString,
    array('delivery_mode' => 2)
);

Устанавливаем delivery_mode=2, чтобы пометить сообщение как устойчивое, снижая тем самым вероятность потери сообщения.

Добавляем сообщение в очередь:

$channel->basic_publish($message, 'hobocta_demo_exchange');

Закрываем соединение:

$channel->close();
$connection->close();

Отправка сообщений готова.

Как прочитать очередь из php?

Чтобы прочитать сообщения (в воркере) нужно для начала сделать ровно тоже самое, что и перед публикацией сообщения: создать \PhpAmqpLib\Connection\AMQPStreamConnection, получить канал, декларировать точку доступа, декларировать очередь, привязать очередь к точке доступа.

После этого нужно повесить закрытие очереди на shutdown функцию, на случай прекращения работы воркера в результате ошибки:

/**
 * @param \PhpAmqpLib\Channel\AMQPChannel $channel
 * @param \PhpAmqpLib\Connection\AbstractConnection $connection
 */
function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}

register_shutdown_function('shutdown', $channel, $connection);

Настраиваем QoS (quality of service - качество обслуживания) - порядок передачи сообщений подписчикам.

$channel->basic_qos(
    // prefetch_size - размер предварительной выборки, т. е.количество возможных обработчиков:
    // ставим null - без ограничения, т. к. "RabbitMQ doesn't implement prefetch_size"
    null,

    // prefetch_count - количество предварительных выборок,
    // определяет сколько сообщений мы можем отдать одном подписчику за раз;
    // ставим единицу, чтобы следующее сообщение было отправлено подписчику только тогда,
    // когда подписчик подтвердит прочение предыдущего сообщения с помощью вызва basic_ack в callback
    1,

    // a_global - глобальный:
    // null - настройки QoS должны применяться для получателей
    // true - настройки QoS должны применяться к каналу
    null
);

Создаём функцию, которая обработает каждое сообщение. Например, просто выведет содержимое сообщения в консоль:

$callback = function (AMQPMessage $message) {
    echo sprintf(
        "Body: '%s'. Properties: '%s'" . PHP_EOL,
        $message->getBody(),
        json_encode($message->get_properties())
    );

    // отправляем подтверждение получения сообщения
    /** @var \PhpAmqpLib\Channel\AMQPChannel $channel */
    $channel = $message->delivery_info['channel'];
    $channel->basic_ack($message->delivery_info['delivery_tag']);
};

Пример вывода в консоль:

Body: '{"title":"Random title 5bfbc7c501cf0","timestamp":1543227333}'. Properties: '{"delivery_mode":2}'

Выполняем подписку на сообщения:

$channel->basic_consume(
    'hobocta_demo',

    // consumer_tag: тег получателя, валидный в рамках канала, необязательно для заполнения
    '',

    // no_local:
    // если true, то сервер не будет отправлять сообщения соединениям, которые сам опубликовал
    false,

    // no_ack:
    // true - без подтверждения;
    // false - обработчик должен получить оповещение как только сообщение будет обработано
    false,

    // exclusive:
    // true - сообщения могут будут получены только в рамках текущего соединения
    false,

    // nowait:
    // если true, то сервер не будет отвечать клиенту,
    // соответстенно клиент не должен ждать ответа
    false,

    // обработчик сообщения
    $callback
);

И добавляем циклическую обработку сообщений.

// тут происходит магия бесконечной обработки опубликованных сообщений
// и бесконечного ожидания публикации новых сообщений
while (count($this->channel->callbacks)) {
    $this->channel->wait();
}

Воркер готов.

Этот код вместе с обёрткой для удобного подключения RabbitMQ я положил на github: publish, consume