Introduction à RabbitMQ
Contexte
Nous sommes sur un site type communautaire et un bouton “recommander” permet d’envoyer à chacune de mes amis une recommandation sur un objet.
L’implémentation technique de cette fonctionnalité est :
- Récupérer l’id du user (moi)
- Récupérer l’id de l’objet
- Récupérer l’id de chacun de mes amis
- Pour chaque id ami, insérer dans une table RECO : mon id, l’id de mon ami et l’id de l’objet.
- Envoyer la réponse HTTP
Si j’ai 2500 amis, j’aurais donc à créer 2500 lignes dans la table… Je ne peux donc pas faire ca de façon synchrone, sinon je ne saurais être en mesure d’envoyer ma réponse HTTP avant le timeout standard de 30 secondes.
Nous allons donc déléguer cette tâche d’insertion en base à un composant spécialisé de notre application. Nous n’attendrons aucun acquittement de sa part et continuerons notre processus. Ainsi, même pour 2500 lignes à insérer, nous serons en mesure de rendre la main (envoyer la réponse HTTP) quasi instantanément.
Pour cette délégation, nous allons utiliser RabbitMQ…
RabbitMQ ? Qu’est-ce que c’est ?
RabbitMQ est un Message Broker (MQ = Message Queue). Littéralement, un Broker signifie un Courtier. Dans notre cas, ou plus généralement dans l’informatique, la notion de Broker permet d’indiquer un intermédiaire.
RabbitMQ est donc un intermédiaire entre deux composants souhaitant communiquer :
- Le Producer : qui émet un message
- Le Consumer : qui reçoit un message
RabbitMQ peut donc être vu comme un facteur. Plus encore, RabbitMQ c’est l’équivalent de La Poste (dans une version fiable ;p) avec les boîtes aux lettres, les plateformes de distribution, les facteurs, etc…
Voyons ensemble comment nous pouvons exploiter cette excellente solution !!!
Pour les prochains exemples de codes…
Tous les exemples de codes ci-après, Producer et Consumer, supposent qu’on ait en début de fichier :
require_once(__DIR__ . '/lib/php-amqplib/amqp.inc');
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
Et en fin de fichier :
$channel->close();
$connection->close();
Pile FILO : 1/1/1
Nous avons donc compris qu’il était possible à un composant d’envoyer un message à un autre par l’intermédiaire de Rabbitmq.
Dans le cas de notre exemple, au clic sur le bouton “recommander”, nous avons donc un nouveau workflow :
- Récupérer l’id du user (moi)
- Récupérer l’id de l’objet
- Récupérer l’id de chacun de mes amis
- Envoyer un message (un json par exemple) à RabbitMQ
- Envoyer la réponse HTTP
Le processus se contente d’envoyer un message à RabbitMQ : c’est le Producer. La réponse HTTP nous est donc renvoyée très rapidement et nous retrouvons donc la main sans impression de lenteur.
Pendant ce temps, dans RabbitMQ
Le message est réceptionné dans une file d’attente (la queue) puis envoyé au processus connecté en écoute sur cette queue. Ce processus est appelé Consumer et il va suivre le workflow suivant :
- Charger le message reçu
- Boucler sur le tableau des id ami
- Pour chaque id ami : insérer une ligne dans la table RECO [id ami,id moi, id objet]
Ce Consumer peut prendre le temps qu’il veut pour traiter le message. RabbitMQ s’occupe de stocker les nouveaux messages dans sa queue qui est virtuellement de taille infinie !
A la fin du traitement, le Consumer informe RabbitMQ qu’il a terminé. Le prochain message lui est donc envoyé (cette section sera revue plus en détails).
En synthèse
Nous avons donc mis en place un systeme de recommandation asynchrone : 1 Producer (envoi un message dans) 1 queue (qui le passe à) 1 Consumer
Exemple de code
Le producer est écrit en PHP et exécuté via une requête HTTP i.e. dans un contexte serveur Web (Apache, …).
Procuder :
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
Le consumer est écrit en PHP et exécuté en ligne de commande dans un contexte CLI (Command Line Interface).
Consumer :
$channel->queue_declare('hello', false, false, false, false);
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
Notez 2 choses :
- le Producer ET le Consumer déclarent la queue (très recommandé, on verra plus tard pourquoi)
- il y a une boucle “while” permettant au Consumer de rester à l’écoute des messages suivants. Sans cette boucle, le script s’arrêterait au premier message traité.
Monitoring :
Il est nécessaire de s’assurer qu’un Consumer soit toujours à l’écoute d’une queue. Sans quoi, en cas de plantage ou de deconnexion d’un Consumer, votre queue se remplirait indéfiniment jusqu’à ce que vous reconnectiez un nouveau Consumer. Aussi, je vous conseille aussi de monitorer ce script (via ps) ou le nombre de consumer connectés à la queue (via rabbitmqctl) avec un outil de surveillance ad-hoc afin de pouvoir reconnecté un Consumer dès qu’une alerte est levée par votre système de monitoring.
Dans le cadre d’un projet professionnel, nous avons décidé de ne pas faire ce monitoring avec reconnexion manuelle d’un Consumer. Nous avons opté pour l’utilisation de l’outil supervisord relançant automatiquement le script en cas de terminaison. Une autre solution aurait pu être (non testé, à confirmer) d’utiliser un script shell qui se relance lui-meme en cas de terminaison à l’aide de la commande trap.
Limitation :
S’il y a plus de messages produits que de messages consommés, la file d’attente va grandir progressivement. Ce qui n’est absolument pas un problème pour RabbitMQ au contraire : cela nous permet de gérer efficacement et sans modificaiton d’infrastructure les pics de charge.
Mais si ce cas est constant et qu’il ne sagissent pas d’un simple pics de charge, la file d’attente va croitre indéfiniment ce qui posera un problème à un moment donnée (limitation physique de stockage des messages dans la file d’attente).
Dans ce cas, il faut simplement augmenter le nombre de Consumer connecté à la queue. C’est ce que nous allons voir dans la section suivante.
Dispatch : 1/1/N
Round-Robbin
Tour à tour (indépendament de la charge)
Prefetch
Selon le plus disponible
Broadcast : 1/N
Un Producer envoi à N Consumers via N file d’attente (1 file par Consumer) Le message est envoyé à tous les consumers
Route : 1/N
Un Producer envoi à N Consumers via N file d’attente (1 file par Consumer) Le message est envoyé à tous les consumers abonnés à la route
Routes fixes
Le nom de la route est fixe
Routes dynamiques
Le nom de la route est pattern
Analogie
RabbitMQ est composé de 2 éléments fondamentaux : - Exchange : C’est la plateforme de tri du courrier - Queue : C’est l’espace de stockage avant envoi
Un message suit donc le flux P > E > Q > C suivant : 1. Envoi par le Producer 2. Tri par le Exchange 3. Stockage dans la Queue 4. Réception par le Consumer