Amqplib-plus: práce s RabbitMQ v Node.js

by Tomáš Sedláček | 7. 2. 2018

RabbitMQ je celosvětově oblíbený message broker, který umožňuje různorodým aplikacím spolu navzájem asynchronně komunikovat prostřednictvím zpráv. Při jejím používání nám ale některé věci v knihovně stále chyběli, a tak jsme se rozhodli publikovat vlastní open-source rozšíření – amqlib-plus.

Motivace k vytvoření amqplib-plus

Samotný RabbtMQ má velmi pěkně sepsanou dokumentaci, ve které naleznete jak základy pro začátečníky, tak i zajímavé tipy a možnosti nastavení pro pokročilé. Na samotných oficiálních stránkách RabbitMQ také naleznete tutoriály a ukázky práce s brokerem hned v několika programovacích jazycích. Jedním z uvedených jazyků je i Javascript (Node.js). Abyste nemuseli programovat a obstarávat komunikaci s brokerem na té nejnižší úrovni, vybízí vás tutoriál na webu k použití již existující open-source knihovny amqplib, která komunikaci s brokerem obstará za vás a zpřístupní vám vše potřebné přes definované rozhraní. I nám se knihovna amqplib líbila a rozhodli jsme se ji používat i pro produkční použití. Neocenitelným pomocníkem při práci s amqplib je její dokumentace, kterou naleznete na adrese https://www.squaremobius.net/amqp.node/. V ní jsou obsaženy popisy jednotlivých objektů knihovny i toho, jak s těmito objekty pracovat.

I přesto, že je amqplib velmi propracovanou a dobře použitelnou knihovnou stále nám v ní něco málo chybělo a proto jsme se rozhodli vytvořit si nad ní naši vlastní, obohacenou knihovnu. Nazvali jsme ji amqplib-plus a je taktéž publikovaná jako open-source a dostupná přes npm.

Amqplib-plus doplňuje do původní amqplib následující funkcionality:

  • OOP přístup
  • Automatický reconnect amqp spojení s brokerem
  • Automatická příprava kanálu po auto-reconnectu
  • Bezztrátové publikování zpráv

OOP přístup

Abychom usnadnili našim programátorům, kteří primárně používají a znají jazyk PHP, přechod do Node.js a zrychlili jejich dobu učení s touto technologií, snažili jsme se napsat kód podobně jako ten, na který jsou zvyklí z objektového PHP. Amqplib-plus je proto vytvořena z objektově orientovaných tříd, které zapouzdřují logiku knihovny amqplib. Zdrojové kódy naší knihovny jsou také napsány v Typescriptu, aby programátorům typovost pomohla s orientací ve zdrojovém kódu.

Auto-reconnect

Při provozu jakékoliv aplikace v produkčním prostředí musíte počítat s nejrůznějšími výpadky. Komunikujete-li s RabbitMQ, může se stát , že server na kterém instance brokeru běží nebude odpovídat, případně že zhavaruje rabbitmq proces. Při používáním knihovny amqplib-plus si nemusíte zabývat implementací reconnectu k RabbitMQ, pokud přestane odpovídat. Třída Connection, kterou amqplib-plus obsahuje reaguje na výpadky za vás a snaží se periodicky připojit znovu k RabbitMQ serveru. Jakmile se jí to podaří, zpracuje veškeré úlohy, které nemohly být kvůli nefunkčnímu spojení dokončeny a pokračuje v běžné práci. Vy se tak z pohledu uživatele programátora nemusíte starat o to, zda je spojení stále navázáno nebo ne a můžete se věnovat vašim vlastním business funkcím a starost o reconnect přenechat na amqplib-plus.

Příprava kanálu

Všem klientským třídám knihovny amqplib při jejich inicializaci předáváte funkci, která se zavolá při každém navázání spojení s RabbitMQ. V rámci této funkce si můžete připravit fronty, Exchange, bindy apod. Předejdete pak tomu, že si pracně ručně vytvoříte tyto objekty v RabbitMQ brokeru, ve kterém se může dojít při pádu brokera k jejich smazání a následnému pádu vaší aplikace po auto-reconnectu například kvůli tomu, že se snaží publikovat zprávu do neexistující fronty.

Bezztrátová publikace zpráv

Publikujete-li mnoho zpráv v krátkém časovém úseku, může se stát, že se write buffer zaplní a nestíhá zapisovat odchozí zprávy. V tomto momentě nelze z klienta odesílat zprávy a musí se počkat na tzv. drain event, pomocí kterého nám broker oznámí, že už je zase možné zapisovat do bufferu a tím pádem i odesílat zprávy. Při použití Publisher objektu z amqplib-plus odchytávání drain eventu vůbec ve svém kódu řešit nemusíte. Můžete se spolehnout, že pokud zprávu z tohoto důvodu nelze odeslat, Publisher si ji uloží do cache a při nejbližším drain eventu se pokusí zprávu publikovat znovu.

Používání amqplib-plus

Třída Connection

K dobrému porozumění a práci s amqlib-plus je nutné znát knihovnu amqplib a její objekty. Základním prvkem pro komunikaci s RabbitMQ je třída Connection, která za nás obstará fyzické spojení s brokerem. Vytvoříme-li si instanci třídy Connection, můžeme na ní zavolat metodu connect, která nám vrátí promise s amqplib spojením. Zpravidla nám v našich aplikacích stačí jedno spojení pro každou instanci aplikace, protože spojení lze multiplexovat přes tzv. kanály, které na instanci třídy connection získáme přes zavolání metody createChannel. Tato metoda vrací promise s amplib kanálem. Kanál nam umožní veškerou práci s rabbitmq. Můžeme přes něj publikovat zprávy, konzumovat frontu, vytvářet fronty a exchange apod. Kanál je objekt knihovny amqplib a můžeme ho využít na vše co je amqplib umožňuje (https://www.squaremobius.net/amqp.node/channel_api.html#channel).

Metoda createChannel vyžaduje 1 parametr, kterým je funkce, která zavolá ihned poté, co je kanál vytvořen, a pomocí níž můžeme připravit prvky v brokerovi pro další práci v aplikaci. Tato funkce se také zavolá při každém reconnectu dojde-li k přerušení spojení s brokerem tak, aby jsme měli zaručeno, že prvky v brokerovi, které jsme vytvořili po spuštění aplikace a které mohly být případným pádem brokera ztraceny, se znovy vytvoří. To, že se ve skutečnosti reálné fyzické spojení během výkonu naší aplikace přerušilo nás však nemusí zajímat, protože nás od něj odstiňuje instance třídy Connection. I když tedy spojení zrovna reálně neexistuje, můžeme klidně zavolat metodu createChannel, a promise s kanálem bude vyresolvena jakmile se spojení opět obnoví.

Ukázka vytvoření instance třídy Connection
// Načteme si třídu z knihovny
const Connection = require("amqplib-plus/dist/lib/Connection");

// Nastavíme údaje k připojení k RabbitMQ brokerovi
const options = {
host: "localhost",
port: 5672,
user: "guest",
pass: "guest",
vhost: "/",
heartbeat: 60,
};

// Vytvoříme instanci třídy Connection
const connection = new Connection(options);

// Abychom mohli využít async/await musíme zabalit náš kód do bloku async function
async function run() {
// Vytvoříme reálné spojení s brokerem
await connection.connect();

// Vytvoříme channel, přes který můžeme publikovat, konzumovat, vytvářet fronty apod.
const channel = await connection.createChannel(async (ch) => {
await ch.assertQueue("queue-name", { durable: false });
await ch.sendToQueue("queue-name", new Buffer("First message"));
});

await channel.sendToQueue("queue-name", new Buffer("Second Message"));
}

run();

Spustíme-li předešlý kód, do konzole se vypíše řádek „Connected to RabbitMQ“. Následně se vytvoří fronta „queue-name“ a odešle se do ní zpráva s obahem „First message“. Poté se odešle zprává s obsahem „Second message“. Při pádu rabbita (restartoval jsem instanci) se do konzole vypíše informace o tom, že spojení bylo uzavřeno a případná chyba proč k tomu došlo. Následně se instance třídy Connection snaží znovu vytvořit spojení o čemž nás infomruje i v logu. Jakmile se jí to podaří, opět se do logu vypíše „Connected to RabbitMQ“. Pokud bychom v době kdy spojení bylo přerušené vytvářeli kanál a následně nad ním pracovali, provedly by se všechny tyto operace až po znovupřipojení k brokerovi.

Console
Connected to RabbitMQ.
AMQP Connection error Unexpected close
AMQP Connection closed Unexpected close
RabbitMQ connection failure. Retry after 2000 ms. Reason: Socket closed abruptly during opening handshake
RabbitMQ connection failure. Retry after 4000 ms. Reason: Socket closed abruptly during opening handshake
RabbitMQ connection failure. Retry after 6000 ms. Reason: Socket closed abruptly during opening handshake
Connected to RabbitMQ.

Třída Publisher

Publisher slouží k odesílání zpráv do brokera, který je poté dle předem daných pravidel nasměruje do cílové fronty. Publisher dědí od třídy Client, jež se stará o to, že vytvoří nad dodaným spojením kanál o který se stará a znovuvytváří ho, dojde-li k jeho zániku. Při vytváření instance třídy Publisher předáme do konstruktoru instanci třídy Connection a funkci, která se vykoná vždy při vytváření nového kanálu, tedy při vytvoření instance Publisher i při každém převytvoření kanálu z důvodu pádu spojení apod.

Vše si ukážeme na následujícím jednoduchém příkladu:

Publisher použití
const conn = require("./../../dist/lib/Connection");
const pub = require("./../../dist/lib/Publisher");
 
const options = {
 host: "localhost",
 port: 5672,
 user: "guest",
 pass: "guest",
 vhost: "/",
 heartbeat: 60,
};
 
const connection = new conn.Connection(options);
 
async function run() {
 await connection.connect();
 
 // Tato funcke se zavolá po každém připojení k brokerovi
 const preparePublisher = async (ch) => {
 await ch.assertQueue("target-queue", { durable: false });
 await ch.assertExchange("target-exchange", "direct");
 await ch.bindQueue("target-queue", "target-exchange", "routKey");
 console.log("Publisher ready");
 };
 
 // Vytvoříme instanci Publishera
 const publisher = new pub.Publisher(connection, preparePublisher);
 
 // Odešleme zprávy
 await publisher.sendToQueue("target-queue", new Buffer("message content"), {});
 await publisher.publish("target-exchange", "routKey", new Buffer("another content"), {});
 console.log("Two messages sent.");
}
 
run();

Spuštění následujícího kódu nám do konzole vypíše tyto řádky:

Console log
Connected to RabbitMQ.
Publisher ready
Two messages sent.

Pokud nyní zrestartujeme instanci RabbitMQ brokera, přibudou do logu další řádky.

Console log
AMQP Connection error Unexpected close
Channel closed, Reason: undefined
AMQP Connection closed Unexpected close
Channel creation failed. Retry after 2000 ms. Reason: IllegalOperationError: Connection closed (Error: Unexpected close)
RabbitMQ connection failure. Retry after 2000 ms. Reason: Socket closed abruptly during opening handshake
RabbitMQ connection failure. Retry after 4000 ms. Reason: Socket closed abruptly during opening handshake
Connected to RabbitMQ.
Publisher ready

Spojení bylo i přes jeho předchozí pád znovu obnoveno a stejně tak i kanál, který publisher používá pro odesílání zpráv. Navíc byla zavolána i námi definovaná funkce, kterou vytváříme frontu, exchange a bind, takže si můžeme být i nadále jisti, že tyto prvky v brokeru existují a můžeme do nich posílat zprávy i nadále.

Třída Consumer

Consumer slouží ke zpracování zpráv z fronty, na kterou je consumer navěšený. Třída consumer, stejně jako Publisher, dědí z třídy Client, díky čemuž máme zajištěnu reakci na pád brokera a zavolání námi definované funcke kdykoliv je spojení vytvořena, tak abychom si mohli připravit potřebné fronty, exchange apod.

Consumer disponuje metodou consume, jejíž zavoláním říkáme, že má Consumer začít konzumovat. Funkce consume je za nás také automaticky zavolána vždy po znovuobnovení spojení se stejnými parametry. Consumer tak naváže na svou práci před pádem spojení a my toto vůbec nemusíme v našem kódu řešit.

Consumerovi musíme říci jak zpracovat zprávy, které přijdou do fronty již odebírá. Toto uděláme jednoduše tak, že mu do konstruktoru předáme funkci processMessage, která se o zpracování zprávy postará. Tato funkce bude mít vždy k dispozici samotnou zprávu, ale také i kanál, přes který byla zpráva přijata tak, abychom tuto zprávu mohli potvrdit (ack) nebo zamítnout (nack, reject). To co se zprávou uděláme je zcela na nás. Můžeme zavolat nějakou naší službu, kterou zprávu předáme, můžeme zprávu přeposlat pomocí Publisher instance apod.

Předpis funcke processMessage je: processMessage(msg: Message, channel: Channel): void;

Pojďmě se podívat jak vytvoříme jednoduchého Consumera (SimpleConsumer), který obsah každé přijaté zprávu zapíše do logu a následně zprávu potvrdí (ack). Nejpve si vutvoříme třídu pro našeho SimleConsumera:

SimpleConsumer
const Consumer = require("amqplib-plus/dist/lib/Consumer");
 
class CustomConsumer extends Consumer.Consumer {
 
 constructor(conn, prepareFn) {
 super(conn, prepareFn);
 }
 
 processMessage(msg, channel) {
 // Zde zpracujee přijatou zprávu
 console.log("Message received:", msg.content.toString());
 
 // Potvrdíme brokerovi, že jsme zprávu úspěšně přijali a zpracovali
 channel.ack(msg);
 }
 
}
 
module.exports = CustomConsumer;

V kódu naší aplikace použijeme nově vytvořenou třídu:

Consumer aplikace
const conn = require("./../../dist/lib/Connection");
const SimpleConsumer = require("./SimpleConsumer");
 
const options = {
 host: "localhost",
 port: 5672,
 user: "guest",
 pass: "guest",
 vhost: "/",
 heartbeat: 60,
};
 
const connection = new conn.Connection(options);
 
async function startConsumer() {
 await connection.connect();
 
 // Tato metoda se zavolá vždy při vytvoření kanálu
 const prepareConsumer = async (ch) => {
 // Vytvoříme frontu
 await ch.assertQueue("input-queue", { durable: false });
 // Paralelně můžeme mít rozpracovaných až 5 zpráv
 await ch.prefetch(5);
 };
 
 const simpleConsumer = new SimpleConsumer(connection, prepareConsumer);
 simpleConsumer.consume("input-queue", {});
}
 
startConsumer();

Po spuštení aplikace začne instance SimpleConsumera zpracovávat zprávy, které pošleme do fronty „input-queue“. Ukázka výstupu do logu může vypadat například takto:

Console
Connected to RabbitMQ.
Started consuming queue "source-custom-queue". Consumption tag: amq.ctag-_mKA8kYOiZHmpMFHiikeJg

Do fronty „input-queue“ pošleme např např z rabbitmq management console zprávu s obsahem „First“. Do konzole přibude řádek:

Console
Message received: First

Nyní můžeme zkusit restarotvat instanci brokera a následně poslat do stejné fronty zprávu s obsahem „Second“. Ta by měla být také zkonzumována po opětovném navázaní spojení. V konzoli bychom tedy viděli:

Console
Connected to RabbitMQ.
Started consuming queue "source-custom-queue". Consumption tag: amq.ctag-_mKA8kYOiZHmpMFHiikeJg
Message received: First
// Zde došlo k restartu
AMQP Connection error Unexpected close
Channel closed, Reason: undefined
AMQP Connection closed Unexpected close
Channel creation failed. Retry after 2000 ms. Reason: IllegalOperationError: Connection closed (Error: Unexpected close)
RabbitMQ connection failure. Retry after 2000 ms. Reason: Socket closed abruptly during opening handshake
RabbitMQ connection failure. Retry after 4000 ms. Reason: Socket closed abruptly during opening handshake
Connected to RabbitMQ.
Started consuming queue "source-custom-queue". Consumption tag: amq.ctag-T5JyJWiDXe9Ulg2FuxR0UQ
Message received: Second

Další rozvoj knihovny amqplib-plus

Knihovnu bychom chtěli dále udržovat, rozvíjet a vylepšovat. Neklademe si za cíl nahradit původní amqplib, ze které vycházíme, protože podle nás funguje dobře a pracuje se nám s ní dobře, nicméně některé věci nám v ní chybí a proto jsme vytvořili amqplib-plus. budeme velmi rádi pokud nás oslovíte se svými nápady na rozšíření amqplib-plus svými případnými pull requesty (httpss://github.com/hanaboso/amqplib-plus). Oceníme jistě i jakékoliv Vaše další postřehy a tipy. Pokud se rozhodnete knihovnu amqplib-plus používat a doporučovat svým známým, budeme velmi rádi.

Tým Hanaboso s.r.o.

Máte zájem o naše služby?

Napište nám a my se Vám ozveme…

4 + 12 =