Wprowadzenie

Artykuł jest kontynuacją wpisu, który możesz znaleźć klikając tutaj. W tej częsci skupiamy się na niezywkle przydatnym pluginie jakim jest rabbitmq_management. Przeprowadzimy również testy pozwalające sprawdzić co dzieje się w momencie, gdy nasz odbiorca uległ "awarii". W ostatnim kroku dodamy więcej niż jednego odbiorcę i odpowiednio skonfigurujemy ruch.

Plugin do zarządzania jest zawarty w naszym pakiecie instalacyjnym RabbitMQ. Zanim jednak zaczniemy go używać musimy dokonać jego włączenia. Możemy tego dokonać za pomocą polecenia:

rabbitmq-plugins enable rabbitmq_management
Po aktywacji plugin’a restart całego serwera nie jest konieczny.

Powyższe, oraz kolejne polecenia, wykonujemy z poziomu linii poleceń uruchamianej dla RabbitMQ: RabbitMQ: wiersz poleceń

Najprostszy sposób na dostęp do interfejsu użytkownika to wpisanie adresu: http://localhost:15672/. Jest to domyślny adres dla naszej instalacji. Jeżeli proces instalacji przebiegl pomyślnie powinniście zobaczyć poniższy ekran: RabbitMQ: wiersz poleceń Tutaj zapewne pojawia się pytanie: jak zdefiniować naszego użytkownika? Musimy skorzystać z poniższego polecenia:

rabbitmqctl add_user nazwa_uzytkownika haslo
Powyższe polecenie jest jasne. W kolejnym kroku nadamy użytkownikowi odpowiednie uprawnienia:
rabbitmqctl set_user_tags pawel administrator
oraz przejdziemy do procesu logowania. Jeżeli wszystko przebiegło pomyślnie uzyskacie dostęp do panelu zarządzania (dashboard ): RabbitMQ: dashboard

"Awaria" odbiorcy

W poprzedniej części artykułu pisałem o funkcjonalnościach, które dostarcza nam RabbitMQ. Jedną z nich jest gwarancja dostarczenia wiadomości "mimo wszystko". Zrobimy prosty test. Zmodyfikujemy nieznaczenie naszą aplikację wysyłającą wiadomości (pozwoli nam na wielokrotność nadania wiadomości) ale w tym samym czasie nasz odbiorca będzie wyłączony. Sprawdzimy czy wiadomości zostaną dostarczone do kolejki a po "restracie" naszego odbiory dojdzie do otrzymania wszystkich wiadomości.

Poniżej prosta modyfikacja kodu pozwalająca na wielokrotne wysyłanie wiadomości:

using RabbitMQ.Client;
using System;
using System.Text;
namespace ConsoleSender
{
	class Program
	{
		static void Main(string[] args)
		{
			Console.WriteLine("Witajcie w aplikacji, która wysyła wiadomości!");
			Console.WriteLine("\n--------------------\n");
			Console.WriteLine("Wciśnij ESC, aby zakończyć działanie aplikacji");
			Console.WriteLine("\n--------------------\n");

			var factory = new ConnectionFactory() { HostName = "localhost" };
			// otwarcie połączenia
			using (var connection = factory.CreateConnection())
			{
				// utworzenie kanału komunikacji
				using (var channel = connection.CreateModel())
				{
					channel.QueueDeclare(queue: "msgKey",
					durable: false,
					exclusive: false,
					autoDelete: false,
					arguments: null);
					
					do
					{
						Console.WriteLine("Wprowadz wiadomość do wysłania: ");
						string msg = Console.ReadLine();
						var msgBody = Encoding.UTF8.GetBytes(msg);

						channel.BasicPublish(exchange: "",
						routingKey: "msgKey",
						basicProperties: null,
						body: msgBody);

						Console.WriteLine($" [x] wysłano {msgBody}");

					} while (Console.ReadKey(true).Key != ConsoleKey.Escape);
				}
			}
			Console.WriteLine("Wciśnij [Enter], aby wyłączyć aplikację");
			Console.ReadLine();
		}
	}
}
Uruchamiamy teraz naszą aplikację konsolową wysyłając kilka wiadomości: .NET Core: aplikacja konsolowa W między czasie uruchamiamy nasz dashboard celem sprawdzenia ruchu. RabbitMQ: dashboard Możemy zaobserwować, iż 4 wiadomości zostały nadane i dodane do kolejki: RabbitMQ: dashboard

Przychodzi teraz czas na sprawdzenie czy odbiorca otrzyma każdą z tych wiadomości. Włączamy naszą drugą aplikację konsolową celem sprawdzenia: .NET Core: aplikacja konsolowa Wiadomości zostały dotarczone. Dodatkowo możemy zauważyć, że wiadomości zostały zdjęte z kolejki co świadczy o poprawnym działaniu systemu: RabbitMQ: dashboard

Wielu odbiorców

Pod tym pojęciem kryje się zastosowanie w realnym świecie. Tworzymy kilku odbiorców, przychodzące wiadomości dodajemy do odpowiednich kolejek i odpowiednio je przetwarzamy. Najprostyszym sposobem jest utworzenie drugiej aplikacji konsolowej, która będzie odbierała wiadomości. Możemy używać tego samego kodu. Po uruchomieniu obu aplikacji odbiorców i sprawdzeniu naszego dashboard’u możemy zaobserwować, że mamy aktywnych dwóch konsumentów naszych wiadomości: RabbitMQ: dashboard Load balancer automatycznie zarządza przekazywaniem wiadomości do różnych odbiorców: .NET Core: aplikacja konsolowa To jednak nie jest to co chcieliśmy osiągnać. Wyobraźcie sobie typ operacji w której chcemy zrobić różne rzeczy z tą samą wiadomością. Otrzymana wiadomość musi zostać niezależnie przetworzona przez dwóch różnych odbiorców. W naszym przypadku powinniśmy powiązać osobną kolejkę dla każdego z obiorców. Dzięki temu mogą oni odbierać wiadomości całkowicie niezależnie.

To jest idealny moment do powrotu do wspomnianego w poprzednim artykule exchange. Tzw. wymiana nie jest skomplikowana. Odbiera ona wiadomości od dostawców oraz przekazuje je do kolejek. Exchange musi dokładnie wiedzieć co zrobić z wiadomością, którą otrzymuje, tzn. czy powinna być dodana do konkretnej kolejki a może dołączona do wielu kolejek? Z drugiej strony może zostać również odrzucona. Te zasady określane są przez rodzaj wymiany.

Dostępnych jest kilka typów wymiany:

  • direct
  • topic
  • headers
  • fanout
My skupimy się na ostatnim typie, tj. funout. Polskie tłumaczenie będzie wymowne (rozchodzić się) – to dokładnie to czego potrzebujemy. Przekazanie wiadomości do wszystkich znanych kolejek.

Nasz kod wysyłający wiadomości nie będzie znacznie się różnił od poprzedniego artykułu. Musimy mieć jednak na uwadze punkty wspomniane powyżej:

using RabbitMQ.Client;
using System;
using System.Text;
namespace ConsoleSenderExchange
{
	class Program
	{
		static void Main(string[] args)
		{
			Console.WriteLine("Witajcie w aplikacji, która wysyła wiadomości!");
			Console.WriteLine("\n--------------------\n");
			Console.WriteLine("Wciśnij ESC, aby zakończyć działanie aplikacji");
			Console.WriteLine("\n--------------------\n");
			var factory = new ConnectionFactory() { HostName = "localhost" };
			// otwarcie połączenia
			using (var connection = factory.CreateConnection())
			{
				// utworzenie kanału komunikacji
				using (var channel = connection.CreateModel())
				{
					// tworzymy 'exchange', ktorego nazwę defniujemy jako cars
					channel.ExchangeDeclare(exchange: "cars", type: "fanout");
					do
					{
						Console.WriteLine("Wprowadz wiadomość do wysłania: ");
						string msg = Console.ReadLine();
						var msgBody = Encoding.UTF8.GetBytes(msg);
						// publikacja do 'exchange' - nie do kolejki jak w poprzednim przykładzie
						channel.BasicPublish(exchange: "cars",
							routingKey: "",
							basicProperties: null,
							body: msgBody);
						Console.WriteLine($" [x] wysłano {msgBody}");
					} while (Console.ReadKey(true).Key != ConsoleKey.Escape);
				}
			}
			Console.WriteLine("Wciśnij [Enter], aby wyłączyć aplikację");
			Console.ReadLine();
		}
	}
}
W powyższym kodzie możecie zobaczyć, że po nawiązaniu połączenia doszło do deklaracji exchange. Ten krok jest niezbędny ponieważ publikacja do nieistniejącego exchange jest niedozwolona.

Musimy jeszcze nieco zmodyfikować stronę odbiorcy:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace ConsoleReceiverExchange
{
	class Program
	{
		static void Main(string[] args)
		{
			Console.WriteLine("Witajcie w aplikacji, która odbiera wiadomości!");
			var factory = new ConnectionFactory() { HostName = "localhost" };
			// otwarcie połączenia
			using (var connection = factory.CreateConnection())
			// utworzenie kanału komunikacji
			using (var channel = connection.CreateModel())
			{
				channel.ExchangeDeclare(exchange: "cars", type: "fanout");
				// tworzenie nietrwałej, wyłącznej, automatycznej kolejki z wygenerowaną nazwą
				var queueName = channel.QueueDeclare().QueueName;
				// exchange i queue zostały utwrzone. Musimy teraz powiadomić 'exchange', aby wysyłała wiadomości do naszej kolejki
				// jest to proste wiązanie pomiędzy 'exchange' a 'queue'
				channel.QueueBind(queue: queueName,
								  exchange: "cars",
								  routingKey: "");
				Console.WriteLine(" [x] Oczekiwanie na wiadomości!");
				var consumer = new EventingBasicConsumer(channel);
				consumer.Received += (model, ea) =>
				{
					var body = ea.Body;
					var message = Encoding.UTF8.GetString(body);
					Console.WriteLine($" [x] otrzymano {message}");
				};
				channel.BasicConsume(queue: queueName,
					autoAck: true,
					consumer: consumer);
				Console.WriteLine("Wciśnij [Enter], aby wyłączyć aplikację");
				Console.ReadLine();
			}
		}
	}
}
Pamiętajcie, żeby zdublować projekt odbiorcy ale zmodyfikować nieco wiadomość wyświtlaną na konsoli tak, aby mieć pewność, że jedna wiadomość została "przetworzona" na dwa różne sposoby – na tym na zależało, aby odwzorować zachowanie mikroserwisu w realnym świecie.

Po uruchomieniu naszych odbiorców warto również sprawdzić definicję kolejek. RabbitMQ: kolejki Zależy nam, żeby po zamknięciu naszych aplikacji zostały automatycznie usunięte z listy.

Na poniższym przykładzie możecie zobaczyć, że wiadomości zostały "przetworzone" przez dwóch niezależnych odbiorców w odmienny sposób: .NET Core: aplikacja konsolowa Co równie istotne, po wyłączeniu naszych odbiorców, definicje kolejek (nietrwałe) nie są już dłużej używane: RabbitMQ: kolejki

Podsumowanie

W tej cześci artykułu skupiliśmy się na nieco bardziej złożonych przykładach. Wykonaliśmy kilka testów i modyfikacji w porównaniu do pierwszego wpisu wykorzystując dodatkowo niezwykle przydatny Dashboard. Temat RabbitMQ jest niezwykle szeroki a sposób jego możliwego zastosowania jeszcze większy. Poznałeś podstawowe możliwości konfiguracyjne więc będzie Ci zdecydowanie łatwiej dopasować to narzędzie do swoich potrzeb.