In Reih und Glied: Was ist eigentlich RabbitMQ?

Gerade als Webentwickler stehen wir hin und wieder vor der Herausforderung, dass wir trotz rechenintensiver Aufgaben im Hintergrund den User schnell bedienen müssen. Die gegebene Natur von HTTP-Requests, gekoppelt mit synchronen Sprachen wie PHP, können mit steigender Nutzerzahl schnell zum Problem werden.
An dieser Stelle rückt RabbitMQ ins Bild. Im Anzug. Mit Krawatte.

Um RabbitMQ besser zu verstehen, sollten wir am Besten ganz vorne anfangen.

Die Abkürzung MQ steht für “Messaging Queue” und deutet auf AMQP - das “Advanced Messaging Queue Protocol” - hin. Aber…

Was ist AMQP?

AMQP ist ein Netzwerk-Transportprotokoll und beschreibt den Austausch von Nachrichten zwischen zwei Applikations-Endpunkten.

Dem Protokoll ist es dabei völlig egal, auf welcher Plattform die Endpunkte laufen und in welcher Sprache sie geschrieben sind, sie müssen lediglich das Protokoll befolgen.
So kann z.B. eine Ruby-Anwendung die unter Linux läuft, eine Nachricht an eine Go-Anwendung auf einer Windows-Kiste schicken.

Im Grunde geht es darum, dass ein Endpunkt, welcher Publisher oder Producer genannt wird, eine Nachricht an einen anderen Endpunkt - den Consumer - schickt.

In der Regel erfolgt dies über einen so genannten AMQP Broker.

Was ist ein Broker?

Der Broker übernimmt verschiedene Aufgaben. In einer Warteschleife (Queue) merkt er sich alle eingehenden Nachrichten, denn diese sollen ja nicht verloren gehen, wenn der Consumer ausfällt oder bereits beschäftigt ist.
Der Broker entscheidet außerdem, wer eine Nachricht bekommt und ob es nicht vielleicht mehrere Empfänger gibt.

Man könnte sagen, der Broker ist eine Tochtergesellschaft der DHL, UPS oder FedEx…

Was soll das Ganze?

Mit MQ-Systemen strebt man Nebenläufigkeit bzw. Parallelisierung von Prozessen an.

In PHP geschriebene Applikationen sind hier ein perfektes Beispiel. Ein Request erzeugt einen PHP-Prozess, welcher alle Aufgaben der Reihe nach abarbeitet.

Wenn wir z.B. in einer Schleife über eine Liste von URLs iterieren und diese einzeln abrufen (Stichwort crawling), muss sich der Prozess der Reihenfolge nach um die Requests und danach um die Verarbeitung kümmern, bevor die nächste URL bedient werden kann.
Lässt eine Zielseite auf sich warten oder benötigt die Verarbeitung eine gewisse Zeit, kann der Prozess nicht mit der nächsten URL weitermachen.
Mit einer Message Queue können wir aber dafür sorgen, dass die URLs auf mehrere einzelne Prozesse verteilt werden, die zwar jeder für sich nach wie vor synchron arbeiten, in der Gesamtheit aber parallel URLs aus der Liste bedienen.

Ein anderes gutes Beispiel:
Es soll eine Rechenaufgabe gelöst werden, welche hochkomplex ist und selbst auf guter Hardware mehrere Stunden Rechenleistung benötigt.
Nun kann man aber manche Algorithmen auch aufteilen, so dass mehrere Maschinen parallel nur einen Bruchteil des Problems lösen müssen. Auch wenn durch das Aufteilen der Aufgabe eventuell die benötigte CPU-Zeit erhöht wird, kann die tatsächlich benötigte Echtzeit hierdurch verringert werden.

Und was ist jetzt RabbitMQ?

Kurz gesagt, ist RabbitMQ ein open source AMQP Broker, welcher auf allen gängigen Betriebssystemen läuft und Bibliotheken für verdammt viele Sprachen mitbringt, sogar abgefahrene Sachen wie Haskell.

Gleichzeitig ist die in Erlang geschriebene Software unglaublich stabil und schnell, kann persistent arbeiten, unterstützt Clustering und hat noch einiges mehr auf Lager.
Der Fokus von RabbitMQ liegt also in der Hochverfügbarkeit.

RabbitMQ installieren

Zur Installation des Servers an sich gibt es eigentlich wenig zu sagen, denn sie ist nicht schwieriger als das Installieren eines simplen Texteditors. Auf der Homepage kann man fertige Binaries (.exe/.deb/.rpm) runterladen, oder auf diverse Repositories wie apt oder Homebrew zurückgreifen.

Die Konfiguration der Standardinstallation reicht vollkommen aus, um direkt mit RabbitMQ los zu legen. Wenn du doch etwas anpassen möchtest, kannst du dich in der Dokumentation informieren.

In der Firewall sollte man folgende (bzw. die entsprechend selbst eingestellten) Ports freigeben:

4369 und 25672 für Erlang
5672 und 5671 für AMQP 0-9-1 mit und ohne TLS
15672 für das Management-Plugin

RabbitMQ legt einen Standardbenutzer fest, welcher aber aus Sicherheitsgründen nur über localhost Zugriff bekommt. Benutzername und Passwort lauten im Auslieferungszustand jeweils “guest”.

Da es unzählige Client-Bibliotheken gibt, kann ich hier nicht auf jede einzeln eingehen. In der Regel sind diese aber auch immer sehr einfach zu installieren und sind gut dokumentiert.

Für meine PHP-Beispiele nutze ich php-amqplib. Diesen Client installiert man am besten mit composer.

In unserem Projekt legen wir hierfür eine composer.json an:

{
  "require": {
      "videlalvaro/php-amqplib": "2.5.*"
  }
}

und installieren via composer:

$ composer.phar install

Um den Client zu verwenden müssen wir in unserem Code den autoloader einbinden und per use die benötigten Klassen laden, zum Beispiel so:

<?php
require_once __DIR__.'/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

RabbitMQ verwenden

Für ein einfaches Beispiel, habe ich zunächst Wrapper-Klassen für Producer und Consumer angelegt. Da der Verbindungsaufbau für beide Komponenten identisch ist, erben beide von der Klasse Node:

<?php
namespace Orkork\RabbitTest;

use PhpAmqpLib\Connection\AMQPConnection;
use Orkork\Tools\Config;

abstract class Node
{
    protected $_config;
    protected $_connection;
    protected $_channel;

    /**
     * Sets config and initializes connection to RabbitMQ
     *
     * @param Config $config
     */
    public function __construct(Config $config)
    {
        $this->_config = $config;
        $this->_initConnection();
    }

    public function closeConnection()
    {
        if($this->_channel) {
            $this->_channel->close();
        }
        
        if($this->_connection) {
            $this->_connection->close();
        }
    }

    protected function _initConnection()
    {
        // Create connection
        $this->_connection = new AMQPConnection(
            $this->_config->rabbitMqHost,
            $this->_config->rabbitMqPort,
            $this->_config->rabbitMqUser,
            $this->_config->rabbitMqPassword
        );

        // Create channel
        $this->_channel = $this->_connection->channel();

        $this->_channel->queue_declare(
            $this->_config->queueName,
            $this->_config->queueIsPassive,
            $this->_config->queueIsDurable,
            $this->_config->queueIsExclusive,
            $this->_config->queueIsAutodelete
        );
    }
}

Die producer-Klasse beschränkt sich somit auf das reine Senden der Nachricht:

<?php
/**
 * Sends given message via configured exchange and routingKey
 *
 * @param string $message
 */
public function publishMessage($message)
{
    $this->_channel->basic_publish(
        new AMQPMessage($message),
        $this->_config->exchange,
        $this->_config->routingKey
    );
}

Analog benötigt die Cosumer-Klasse ausschließlich eine Methode um auf eingehende Nachrichten zu hören:

<?php
public function listen($callback)
{
    $this->_channel->basic_consume(
        $this->_config->queueName,
        '',
        false,
        true,
        false,
        false,
        $callback
    );

    while(count($this->_channel->callbacks)) {
        $this->_channel->wait();
    }
}

Um die Wrapper zu nutzen, legen wir zwei Scripte, publisher.php und listener.php an:

<?php
require_once __DIR__.'/vendor/autoload.php';

use Orkork\RabbitTest\Producer;
use Orkork\Tools\Config;

$config = new Config(require 'config.php');

$producer = new Producer($config);

for($i = 1; $i <= $config->messageAmount; $i++) {
    $producer->publishMessage(uniqid('', true));
    usleep(ceil($config->publishSleepSeconds * 1000000)); 
}
<?php
require_once __DIR__.'/vendor/autoload.php';

use Orkork\RabbitTest\Consumer;
use Orkork\Tools\Config;
use PhpAmqpLib\Message\AMQPMessage;

$config = new Config(require 'config.php');

$consumer = new Consumer($config);
$consumer->listen(
    function (AMQPMessage $message) {
        echo "-> Received message '", $message->body, "'
";
        usleep(0.15 * 1000000); // simulate 150ms processing time
});

Nun brauchen wir nur noch die Scripte zu starten und können im Manager den Vorgang verfolgen, dazu kommen wir gleich.

Den kompletten Quellcode findest du auf GitHub.

RabbitMQ beobachten

Mit Plugins lässt sich RabbitMQ wunderbar erweitern. Ein must-have ist das Management-Plugin, welches RabbitMQ um eine HTTP-API erweitert, das CLI-Tool rabbitmqadmin mitbringt und eine Web-basierte GUI zur Verfügung stellt.

Mit diesen Werkzeugen können wir den Server sehr angenehm konfigurieren und beobachten. Die Web-Oberfläche zeigt uns zum Beispiel in Echtzeit, wie viele Verbindungen aktuell bestehen, wie schnell einzelne Queues gefüllt und abgearbeitet werden und wieviel Arbeitsspeicher die einzelnen Prozesse verbraten.

Wir können das Plugin mit einem einfachen Befehl aktivieren:

$ rabbitmq-plugins enable rabbitmq_management

Danach können wir die Weboberfläche unter http://server-name:15672/ aufrufen.
Mehr zu diesem Thema findest du auf der Webseite.

Interessant, nicht wahr? Das Ganze ist ja leicht abstrakt und für gewöhnlich fragt man sich, “Was soll ich jetzt damit anfangen? Wer benutzt das überhaupt und warum?”.
Diese Fragen möchte ich in einem weiteren Artikel mit einem konkreten Beispiel beantworten. Real-world examples sind halt doch die Besten.
Was interessiert dich ganz besonders an RabbitMQ?

Auch interessant