Amqplib-plus: práce s RabbitMQ v Node.js
RabbitMQ je celosvětově oblíbený message broker, který umožňuje různorodým aplikacím spolu navzájem asynchronně komunikovat prostřednictvím zpráv. Při jejím používání nám ale některé věci v knihovně stále chyběli, a tak jsme se rozhodli publikovat vlastní open-source rozšíření – amqlib-plus.
Motivace k vytvoření amqplib-plus
Samotný RabbtMQ má velmi pěkně sepsanou dokumentaci, ve které naleznete jak základy pro začátečníky, tak i zajímavé tipy a možnosti nastavení pro pokročilé. Na samotných oficiálních stránkách RabbitMQ také naleznete tutoriály a ukázky práce s brokerem hned v několika programovacích jazycích. Jedním z uvedených jazyků je i Javascript (Node.js). Abyste nemuseli programovat a obstarávat komunikaci s brokerem na té nejnižší úrovni, vybízí vás tutoriál na webu k použití již existující open-source knihovny amqplib, která komunikaci s brokerem obstará za vás a zpřístupní vám vše potřebné přes definované rozhraní. I nám se knihovna amqplib líbila a rozhodli jsme se ji používat i pro produkční použití. Neocenitelným pomocníkem při práci s amqplib je její dokumentace, kterou naleznete na adrese https://www.squaremobius.net/amqp.node/. V ní jsou obsaženy popisy jednotlivých objektů knihovny i toho, jak s těmito objekty pracovat.
I přesto, že je amqplib velmi propracovanou a dobře použitelnou knihovnou stále nám v ní něco málo chybělo a proto jsme se rozhodli vytvořit si nad ní naši vlastní, obohacenou knihovnu. Nazvali jsme ji amqplib-plus a je taktéž publikovaná jako open-source a dostupná přes npm.
Amqplib-plus doplňuje do původní amqplib následující funkcionality:
- OOP přístup
- Automatický reconnect amqp spojení s brokerem
- Automatická příprava kanálu po auto-reconnectu
- Bezztrátové publikování zpráv
OOP přístup
Abychom usnadnili našim programátorům, kteří primárně používají a znají jazyk PHP, přechod do Node.js a zrychlili jejich dobu učení s touto technologií, snažili jsme se napsat kód podobně jako ten, na který jsou zvyklí z objektového PHP. Amqplib-plus je proto vytvořena z objektově orientovaných tříd, které zapouzdřují logiku knihovny amqplib. Zdrojové kódy naší knihovny jsou také napsány v Typescriptu, aby programátorům typovost pomohla s orientací ve zdrojovém kódu.
Auto-reconnect
Při provozu jakékoliv aplikace v produkčním prostředí musíte počítat s nejrůznějšími výpadky. Komunikujete-li s RabbitMQ, může se stát , že server na kterém instance brokeru běží nebude odpovídat, případně že zhavaruje rabbitmq proces. Při používáním knihovny amqplib-plus si nemusíte zabývat implementací reconnectu k RabbitMQ, pokud přestane odpovídat. Třída Connection, kterou amqplib-plus obsahuje reaguje na výpadky za vás a snaží se periodicky připojit znovu k RabbitMQ serveru. Jakmile se jí to podaří, zpracuje veškeré úlohy, které nemohly být kvůli nefunkčnímu spojení dokončeny a pokračuje v běžné práci. Vy se tak z pohledu uživatele programátora nemusíte starat o to, zda je spojení stále navázáno nebo ne a můžete se věnovat vašim vlastním business funkcím a starost o reconnect přenechat na amqplib-plus.
Příprava kanálu
Všem klientským třídám knihovny amqplib při jejich inicializaci předáváte funkci, která se zavolá při každém navázání spojení s RabbitMQ. V rámci této funkce si můžete připravit fronty, Exchange, bindy apod. Předejdete pak tomu, že si pracně ručně vytvoříte tyto objekty v RabbitMQ brokeru, ve kterém se může dojít při pádu brokera k jejich smazání a následnému pádu vaší aplikace po auto-reconnectu například kvůli tomu, že se snaží publikovat zprávu do neexistující fronty.
Bezztrátová publikace zpráv
Publikujete-li mnoho zpráv v krátkém časovém úseku, může se stát, že se write buffer zaplní a nestíhá zapisovat odchozí zprávy. V tomto momentě nelze z klienta odesílat zprávy a musí se počkat na tzv. drain event, pomocí kterého nám broker oznámí, že už je zase možné zapisovat do bufferu a tím pádem i odesílat zprávy. Při použití Publisher objektu z amqplib-plus odchytávání drain eventu vůbec ve svém kódu řešit nemusíte. Můžete se spolehnout, že pokud zprávu z tohoto důvodu nelze odeslat, Publisher si ji uloží do cache a při nejbližším drain eventu se pokusí zprávu publikovat znovu.
Používání amqplib-plus
Třída Connection
K dobrému porozumění a práci s amqlib-plus je nutné znát knihovnu amqplib a její objekty. Základním prvkem pro komunikaci s RabbitMQ je třída Connection, která za nás obstará fyzické spojení s brokerem. Vytvoříme-li si instanci třídy Connection, můžeme na ní zavolat metodu connect, která nám vrátí promise s amqplib spojením. Zpravidla nám v našich aplikacích stačí jedno spojení pro každou instanci aplikace, protože spojení lze multiplexovat přes tzv. kanály, které na instanci třídy connection získáme přes zavolání metody createChannel. Tato metoda vrací promise s amplib kanálem. Kanál nam umožní veškerou práci s rabbitmq. Můžeme přes něj publikovat zprávy, konzumovat frontu, vytvářet fronty a exchange apod. Kanál je objekt knihovny amqplib a můžeme ho využít na vše co je amqplib umožňuje (https://www.squaremobius.net/amqp.node/channel_api.html#channel).
Metoda createChannel vyžaduje 1 parametr, kterým je funkce, která zavolá ihned poté, co je kanál vytvořen, a pomocí níž můžeme připravit prvky v brokerovi pro další práci v aplikaci. Tato funkce se také zavolá při každém reconnectu dojde-li k přerušení spojení s brokerem tak, aby jsme měli zaručeno, že prvky v brokerovi, které jsme vytvořili po spuštění aplikace a které mohly být případným pádem brokera ztraceny, se znovy vytvoří. To, že se ve skutečnosti reálné fyzické spojení během výkonu naší aplikace přerušilo nás však nemusí zajímat, protože nás od něj odstiňuje instance třídy Connection. I když tedy spojení zrovna reálně neexistuje, můžeme klidně zavolat metodu createChannel, a promise s kanálem bude vyresolvena jakmile se spojení opět obnoví.
Spustíme-li předešlý kód, do konzole se vypíše řádek „Connected to RabbitMQ“. Následně se vytvoří fronta „queue-name“ a odešle se do ní zpráva s obahem „First message“. Poté se odešle zprává s obsahem „Second message“. Při pádu rabbita (restartoval jsem instanci) se do konzole vypíše informace o tom, že spojení bylo uzavřeno a případná chyba proč k tomu došlo. Následně se instance třídy Connection snaží znovu vytvořit spojení o čemž nás infomruje i v logu. Jakmile se jí to podaří, opět se do logu vypíše „Connected to RabbitMQ“. Pokud bychom v době kdy spojení bylo přerušené vytvářeli kanál a následně nad ním pracovali, provedly by se všechny tyto operace až po znovupřipojení k brokerovi.
Třída Publisher
Publisher slouží k odesílání zpráv do brokera, který je poté dle předem daných pravidel nasměruje do cílové fronty. Publisher dědí od třídy Client, jež se stará o to, že vytvoří nad dodaným spojením kanál o který se stará a znovuvytváří ho, dojde-li k jeho zániku. Při vytváření instance třídy Publisher předáme do konstruktoru instanci třídy Connection a funkci, která se vykoná vždy při vytváření nového kanálu, tedy při vytvoření instance Publisher i při každém převytvoření kanálu z důvodu pádu spojení apod.
Vše si ukážeme na následujícím jednoduchém příkladu:
Spuštění následujícího kódu nám do konzole vypíše tyto řádky:
Pokud nyní zrestartujeme instanci RabbitMQ brokera, přibudou do logu další řádky.
Spojení bylo i přes jeho předchozí pád znovu obnoveno a stejně tak i kanál, který publisher používá pro odesílání zpráv. Navíc byla zavolána i námi definovaná funkce, kterou vytváříme frontu, exchange a bind, takže si můžeme být i nadále jisti, že tyto prvky v brokeru existují a můžeme do nich posílat zprávy i nadále.
Třída Consumer
Consumer slouží ke zpracování zpráv z fronty, na kterou je consumer navěšený. Třída consumer, stejně jako Publisher, dědí z třídy Client, díky čemuž máme zajištěnu reakci na pád brokera a zavolání námi definované funcke kdykoliv je spojení vytvořena, tak abychom si mohli připravit potřebné fronty, exchange apod.
Consumer disponuje metodou consume, jejíž zavoláním říkáme, že má Consumer začít konzumovat. Funkce consume je za nás také automaticky zavolána vždy po znovuobnovení spojení se stejnými parametry. Consumer tak naváže na svou práci před pádem spojení a my toto vůbec nemusíme v našem kódu řešit.
Consumerovi musíme říci jak zpracovat zprávy, které přijdou do fronty již odebírá. Toto uděláme jednoduše tak, že mu do konstruktoru předáme funkci processMessage, která se o zpracování zprávy postará. Tato funkce bude mít vždy k dispozici samotnou zprávu, ale také i kanál, přes který byla zpráva přijata tak, abychom tuto zprávu mohli potvrdit (ack) nebo zamítnout (nack, reject). To co se zprávou uděláme je zcela na nás. Můžeme zavolat nějakou naší službu, kterou zprávu předáme, můžeme zprávu přeposlat pomocí Publisher instance apod.
Předpis funcke processMessage je: processMessage(msg: Message, channel: Channel): void;
Pojďmě se podívat jak vytvoříme jednoduchého Consumera (SimpleConsumer), který obsah každé přijaté zprávu zapíše do logu a následně zprávu potvrdí (ack). Nejpve si vutvoříme třídu pro našeho SimleConsumera:
V kódu naší aplikace použijeme nově vytvořenou třídu:
Po spuštení aplikace začne instance SimpleConsumera zpracovávat zprávy, které pošleme do fronty „input-queue“. Ukázka výstupu do logu může vypadat například takto:
Do fronty „input-queue“ pošleme např např z rabbitmq management console zprávu s obsahem „First“. Do konzole přibude řádek:
Nyní můžeme zkusit restarotvat instanci brokera a následně poslat do stejné fronty zprávu s obsahem „Second“. Ta by měla být také zkonzumována po opětovném navázaní spojení. V konzoli bychom tedy viděli:
Další rozvoj knihovny amqplib-plus
Knihovnu bychom chtěli dále udržovat, rozvíjet a vylepšovat. Neklademe si za cíl nahradit původní amqplib, ze které vycházíme, protože podle nás funguje dobře a pracuje se nám s ní dobře, nicméně některé věci nám v ní chybí a proto jsme vytvořili amqplib-plus. budeme velmi rádi pokud nás oslovíte se svými nápady na rozšíření amqplib-plus svými případnými pull requesty (httpss://github.com/hanaboso/amqplib-plus). Oceníme jistě i jakékoliv Vaše další postřehy a tipy. Pokud se rozhodnete knihovnu amqplib-plus používat a doporučovat svým známým, budeme velmi rádi.
Tým Hanaboso s.r.o.
Máte zájem o naše služby?
Napište nám a my se Vám ozveme…