Czym jest RabbitMQ?

RabbitMQ jest najbardziej rozpowszechnionym brokerem wiadomości o otwartym kodzie źródłowym. Oparty jest na protokole AMQP (Advanced Message Queuing Protocol). Protokół ten jest używany do komunikacji z brokerem komunikatów takim jak RabbitMQ. "AMQP określa zachowanie usługi oraz klienta komunikacji w stopniu, który powoduje, że implementacje różnych dostawców są interoperacyjne, w taki sam sposób jak SMTP, HTTP, FTP i tym podobne stworzyły interoperacyjne systemy. Poprzednie standaryzacje oprogramowania pośredniczącego działy się na poziomie API (np. Java Message Service) i skupiały się na standaryzowaniu interakcji programisty z różnymi implementacjami, zamiast dostarczaniu interoperacyjności między wieloma implementacjami. W przeciwieństwie do JMS, które określa API i zestaw zachowań, które implementacja komunikacji musi dostarczyć, AMQP jest protokołem "poziomu kabla". Protokół "poziomu kabla" to określenie na format danych, który jest wysyłany w sieci jako strumień bajtów. W konsekwencji każde narzędzie, które może tworzyć i interpretować komunikaty, które spełniają ten format danych, może współpracować z każdym zgodnym narzędziem niezależnie od języka implementacji." (źródło: wikipedia).

Wracając jednak do naszego głównego tematu, RabbitMQ. Oprogramowanie bogate jest w różne funkcjonalności do których możemy zaliczyć m.in.:

  • możliwość wysyłania wiadomości w sposób ciągły z gwarantowanym dostarczeniem nawet jeżeli aplikacja ulegnie awarii bądź broker RabbitMQ zostanie zresetowany;
  • równoważne obciążenie (load balancing) pomiędzy użytkownikami wiadomości jeżeli wielu klientów czeka w tej samej kolejce.

RabbitMQ to jednak coś więcej niż zwykłe przesyłanie wiadomości. Jest to platforma, która pozwala na łączenie aplikacji. Za pomocą RabbitMQ aplikacja napisania w języku Java może komunikować się z serwerem Linux i/lub aplikacją .NET czy Ruby on Rails. Do tej listy możemy zaliczyć prawie wszystko co znajduje swoje zastosowanie w zespołowym rozwoju aplikacji webowych.

Dlaczego to rozwiązanie jest nam potrzebne?

W pierwszej kolejności musimy wrócić do poprzedniej części artykułu. RabbitMQ pozwala nam na nawiązanie komunikacji pomiędzy projektami opartymi na różnych technologiach.

Dzięki powyższej definicji idealnie pasuje do architektury mikroserwisu. Pozwala nam na komunikację pomiędzy różnymi mikroserwisami a dzięki jej funkcjom będziemy zawsze pewni, że wiadomość zostanie dostarczna – bez względu na okoliczności, które możemy napotkać, np. wspominane wcześniej awarie.

Komunikacja jest również znacznie szybsza niż w przypadku żądań HTTP, ponieważ protokół AMQP jest protokołem specyficznym, zorientowanym na taki typ operacji, podczas gdy protkół HTTP jest protokołem ogólnego przeznaczenia.

Aplikacja wysyłająca wiadomości

Aby używać RabbitMQ należy zainstalować kilka rzeczy na swoim komputerze bądź serwerze. Wszystkie informacje można znaleźć na stronie http://www.rabbitmq.com/. Nie ma jednak powodów do zmartwień, przejdziemy przez wszystkie niezbędne kroki w tym artykule.

W pierwszym kroku musimy zainstalować język programowania Erlang - do pobrania z adresu: http://www.erlang.org/downloads - jest to język w którym został napisany RabbitMQ. Pamiętajcie, żeby pobrać wersję odpowiednią dla Waszego systemu operacyjnego.

Po poprawnej instalacji pora na pobranie serwera RabbitMQ - plik instalacyjny znajdziecie pod adresem http://www.rabbitmq.com/download.html.

Jesteśmy już gotowi do kolejnych krok. Przykład będzie wykonany na aplikacji konsolowej wykonanej w technologii .NET Core: .NET Core: aplikacja konsolowa

Kolejną aplikacją, którą musimy utworzyć jest odbiorca wiadomości. W naszym przypadku będzie to również projekt konsolowy pod nazwą ConsoleReceiver. Do każdego z utworzonych projektów musimy teraz dodać paczkę RabbitMq.Client: Manage Nuget Package

Wszystkie komponenty zostały dodane, utworzyliśmy dwa projekty i dodaliśmy odpowiednie paczki. Pora otworzyć projekt ConsoleReceiver i dodać poniższy kod:

using RabbitMQ.Client;
using System;
using System.Text;
namespace ConsoleSender
{
	class Program
	{
		static void Main(string[] args)
		{
			Console.WriteLine("Witajcie w aplikacji, która wysyła wiadomości!");
			var factory = new ConnectionFactory() { HostName = "localhost" };
			// otwarcie połączenia
			using (var connection = factory.CreateConnection())
			{
				// utworzenie kanału komunikacji
				using (var channel = connection.CreateModel())
				{
					channel.QueueDeclare(queue: "msgKey",
									durable: false,
									exclusive: false,
									autoDelete: false,
									arguments: null);

					Console.WriteLine("Wprowadz wiadomość do wysłania: ");
					string msg = Console.ReadLine();
					var msgBody = Encoding.UTF8.GetBytes(msg);
					channel.BasicPublish(exchange: "",
						routingKey: "msgKey",
						basicProperties: null,
						body: msgBody);
					Console.WriteLine($" [x] wysłano {msgBody}");
				}
			}
			Console.WriteLine("Wciśnij [Enter], aby wyłączyć aplikację");
			Console.ReadLine();
		}
	}
}

Przejdźmy teraz do szczegółowego omówienia tak przygotowanego kodu.

var factory = new ConnectionFactory() { HostName = "localhost" };
Powyższy kod pozwala na utworzenie instancji klasy ConnectionFactor, która zostanie użyta do nawiązania połączenia z zainstalowanym serwerem RabbitMQ. Nazwa hosta localhost związana jest z naszą lokalną instalacją serwera. W tym miejscu możemy podać również identyfikator użytkownika, hasło oraz wiele innych parametrów zgodnych z ustawieniami i wymaganiami. W tym miejscu definiujemy tak naprawdę nazwę serwera i niezbędne parametry połączenia na którym znajduje się zainstalowany serwer RabbitMQ. W kolejnym kroku otwieramy połączenie oraz tworzymy kanał komunikacji.

Wszystkie nasze wiadomości przesyłamy za pomocą utworzonego kanału komunikacji:

channel.QueueDeclare(queue: "msgKey",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Powyższy kod jest niezywkle ważny. Zdeklarowaliśmy kolejkę i przekazaliśmy jej kilka parametrów. Pierwszym z nich jest kolejka, która posiada zdefiniowany klucz. Klucz ten zostanie wykorzystany do wysłania wiadomości oraz użyty przez odbiorcę do odebrania wiadomości i przekształcenia jej w tablicę bajtów.

W następnych liniach pozwalamy użytkownikowi na wprowadzenie wiadomości, która zostania wysłana.

Nasze pierwsze wysłanie wiadomości jest tak proste jak to tylko możliwe. Parametery takie jak exchange oraz basicProperties nie zostają przekazane. Parametr exchange określa typ przekazywanej wiadomości. Jego definicja decyduje o odpowiednim routing’u naszej wiadomości – algorytm przekierowania decyduje do której kolejki należy kierować dany komunikat. Listę wszystkich typów wiadomości możesz wyświetlić za pomocą komendy uruchomionej w linii poleceń: RabbitMQ: wiersz poleceń Możecie zapytać dlaczego w takim wypadku (brak definicji typu) byliśmy w stanie wysłać wiadomość? Poniższy zapis:

channel.BasicPublish(exchange: "", ... );
pozwala na wysłanie domyślnego typu wiadomości.

Czym zatem są basicParameters? Właściwości protokołu AMQP są podobne do nagłówków stosowanych w protokole HTTP - reprezentują metadane dotyczące wiadomości. W przypadku brokera RabbitMQ możemy zdefiniować właściwość jaką jest np. expiration (data wygaśnięcia), która pozwala na dodawnie wartości specyficznej dla danego dostawcy wiadomości (w tym przypadku będzie to wymuszenie TTL - https://www.rabbitmq.com/ttl.html#per-message-ttl).

Jeżeli interesują Was bardziej szczegółowe informacje dotyczące właściwości zapraszam do dokumentacji: https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v3.1.4/rabbitmq-dotnet-client-3.1.4-client-htmldoc/html/type-RabbitMQ.Client.IBasicProperties.html. Niestety w ramach jednego artykułu nie da się wyjaśnić wszystkiego.

Mam jednak nadzieję, że wszystko co napisałem powyżej jest dla Was jasne a sam kod wysyłania wiadomości czytelny.

Aplikacja odbiorcy

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace ConsoleReceiver
{
	class Program
	{
		static void Main(string[] args)
		{
			Console.WriteLine("Witajcie w aplikacji, która odbiera wiadomości!");
			var factory = new ConnectionFactory() { HostName = "localhost" };
			// otwarcie połączenia
			using (var connection = factory.CreateConnection())
			// utworzenie kanału komunikacji
			using (var channel = connection.CreateModel())
			{
				channel.QueueDeclare(queue: "msgKey",
								durable: false,
								exclusive: false,
								autoDelete: false,
								arguments: null);
				var consumer = new EventingBasicConsumer(channel);
				consumer.Received += (model, ea) =>
				  {
					  var body = ea.Body;
					  var message = Encoding.UTF8.GetString(body);
					  Console.WriteLine($" [x] otrzymano {message}");
				  };
				channel.BasicConsume(queue: "msgKey",
					autoAck: true,
					consumer: consumer);
				Console.WriteLine("Wciśnij [Enter], aby wyłączyć aplikację");
				Console.ReadLine();
			}
		}
	}
}
W tym projekcie również możemy znaleźć prostą deklarację kolejki:
channel.QueueDeclare(queue: "msgKey",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);

Podobnie jak w przypadku aplikacji wysyłającej wiadomości ta deklaracja jest tak prosta jak to tylko możliwe. Przechodzimy przez najprostszy przykład, którego celem jest jedynie nadanie i otrzymanie wiadomości. Również w tym przypadku nasza kolejka jest tajnym kluczem używanym przez nadawcę do wysłania wiadomości.

W poniższym kodzie możecie zobaczyć zdarzenie (event), którego celem jest automatycznie odebranie wiadomości, gdy ta zostanie wysłana przez nadawcę i wyświetlenie jej w konsoli.

Działanie aplikacji

Pozostało najprzyjemniejsze. Uruchomienie obu aplikacji w tym samym czasie: RabbitMQ: odbiorca i nadawca W pierwszej kolejności wpisujemy wiadomość, którą chcemy dostarczyć do odbiorcy. Następnie dostajemy komunikat o poprawnie wysłanej wiadomości czego efektem jest jej odbiór po drugiej stronie – odbiorca przetworzył wiadomość i był w stanie wyświetlić ją w aplikacji konsolowej. RabbitMQ: pomyślny test przesyłania wiadomości Tak jak wspomniałem wcześniej, jest to najprostszy możliwy przykład. Nie chciałem, aby ten artykuł był za długi i zbyt skomplikowany.

Jaki zatem może być przykład użycia RabbitMQ? Wyobraźcie sobie, że przygotowaliście usługę sieciową, która w każdej sekundzie otrzymuje wiele żądań i żadne z nich nie może zostać zgubione. Każde żądanie musi zostać przetworzone a sam proces jest złożony i nie zajmuje milisekund.

Wasza usługa musi być zawsze wysoko dostępna i gotowa na przyjęcie nowego żądania. Sytuacje w których dochodzi do zatrzymania przetwarzania z uwagi na poprzednie żądania są niedopuszczalne. W takim przypadku powyższe rozwiązanie i umieszczanie żądań na kolejce jest porządane. W naszym przypadku zdefiniowaliśmy tylko jednego odbiorcę (w realnym świecie mikroserwis do przetwarzania żądań), podczas gdy, takich odbiorców może być wielu... wówczas przy bardzo dużej ilości zgłoszeń żądania zostaną rozdzielone pomiędzy różnych odbiorców a my nie doświadczymy blokowania przetwarzania naszych żądań.

Podsumowanie

Mimo, iż głównym zamysłem było szybkie i bezbolesne wprowadzenie do tematu, artykuł nieco się rozrósł. Mam jednak nadzieję, że to nie stanowi dla Was problemu a samo pojęcie RabbitMQ stało się nieco jaśniejsze. W kolejnej cześci artykułu opiszę, co nieco, bardzo przydatny plugin dołączony do naszej instalacji, który pozwala na zarządzanie serwerem. Dodamy również nowego użytkownika, nadamy odpowiednie uprawienienia oraz przeprowadzimy kilka testów pozwalających na sprawdzenie co stanie się, gdy nasz odbiorca będzie miał "awarię".