Paweł Łukasiewicz
2022-05-20
Paweł Łukasiewicz
2022-05-20
Udostępnij Udostępnij Kontakt
Wprowadzenie

Amazon Kinesis ułatwia zbieranie, przetwarzanie i analizowanie danych strumieniowych w czasie rzeczywistym dzięki czemu można uzyskać aktualny wgląd w dane i szybko reagować na nowe informacje. Amazon Kinesis oferuje kluczowe możliwości efektywnego kosztowo przetwarzania danych strumieniowych w dowolnej skali, wraz z elastycznością związaną z wyborem narzędzia, które najlepiej pasuje do wymagań aplikacji.

Dzięki Amazon Kinesis można pobierać dane w czasie rzeczywistym takie jak audio, video, logi aplikacji, dane telemetryczne IoT do uczenia maszynowego, analizy i innych zastosowań. Usługa pozwala na przetwarzanie i analizowanie danych w miarę ich napływu i natychmiastową reakcję zamiast czekania, aż wszystkie dane zostaną zebrane przed rozpoczęciem przetwarzania.

Kolejne kroki, które wykonamy w ramach tego wpisu:

  1. utworzenie roli z wymaganymi uprawnieniami;
  2. utworzenie strumienia danych Kinesis;
  3. utworzenie funkcji Lambda
  4. dodanie danych do strumienia Kinesis;

Przykład

Zanim przejdziemy do implementacji spójrzmy jeszcze na poniższy diagram: AWS Kinesis: przykładowa architektura

Tym razem nasz przykład jest nieco prostszy. Dodanie danych do strumienia Kinesis spowoduje uruchomienie w tle funkcji Lambda, która dokona prostego przetwarzania danych a następnie korzystając z usługi SES wyślemy wiadomość email z danymi wyjściowymi.

Tworzenie roli

Jak się doskonale domyślacie potrzebujemy roli z uprawnieniami do Kinesis, usługi SES oraz Lambdy: AWS Kinesis: tworzenie roli

Tworzenie strumienia Kinesis

Wykorzystując wyszukiwarkę usług przechodzimy do Amazon Kinesis: AWS Kinesis: wyszukiwarka usług

Z poziomu ekranu wybieramy utworzenie strumienia danych, tj. Data Streams. W nowo otwartym oknie konfiguracyjnym wprowadzamy nazwę strumienia oraz wskazujemy liczbę shardów (jeżeli zapomnieliście czym jest pojedynczy shard odsyłam do odpowiedniego wpisu z poprzedniej serii: AWS - Kinesis): AWS Kinesis: tworzenie strumienia danych

Proces tworzenia strumienia powinien zająć kilka sekund a Wy, po chwili, zobaczycie poniższy ekran: AWS Kinesis: tworzenie strumienia danych

Tworzenie Lambdy i wyzwalacza

Myślę, że po tylu wpisach każdy poradzi sobie z dodaniem funkcji, wybraniem odpowiedniego środowiska uruchomieniowego, przypisaniem odpowiedniej roli oraz utworzeniem wyzwalacza na strumień Kinesis bez żadnych problemów. Dodam jedynie, że w ramach konfiguracji ustawiłem batch size na 50 - jest to maksymalna liczba rekordów pobrana ze strumienia w danym momencie. Tak powinien prezentować się ekran główny Waszej funkcji z ustawionym wyzwalaczem: AWS Kinesis: konfiguracja Lambdy

Implementacja

Podobnie jak w poprzednich przypadkach kod piszemy wykorzystując środowisko Visual Studio 2019 a następnie dokonujemy publikacji wykorzystując zainstalowaną wtyczkę. Samo wysyłanie wiadomości email jest banalnie proste o czym przekonaliście się w poprzednich wpisach. Jedyna zmiana w implementacji wynika z wykorzystania paczki, która potrafi przyjąć strumień danych (rekordy), które następnie "przetwarzamy":

using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Amazon;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Amazon.SimpleEmail;
using Amazon.SimpleEmail.Model;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda4
{
    public class Function
	{
    	public async Task FunctionHandler(KinesisEvent kinesisEvent, ILambdaContext context)
    	{
        	context.Logger.LogLine($"Beginning to process {kinesisEvent.Records.Count} records...");

        	foreach (var record in kinesisEvent.Records)
        	{
            	context.Logger.LogLine($"Event ID: {record.EventId}");
            	context.Logger.LogLine($"Event Name: {record.EventName}");

            	string recordData = GetRecordContents(record.Kinesis);
            	context.Logger.LogLine($"Record Data:");
            	context.Logger.LogLine(recordData);

   			 // Ustawcie adres email zweryfikowany w obrębie konkretnego regionu
   			 string senderAddress = "zweryfikowany_adres_email@gmail.com";
   			 // Pamiętajcie o ustawieniu regionu w którym dokonaliście weryfikacji adresu email
   			 // W przeciwnym wypadku zobaczycie poniższy błąd:
   			 // Email address is not verified.
   			 // The following identities failed the check in region EU-WEST-2: zweryfikowany_adres_email@gmail.com
   			 using (var client = new AmazonSimpleEmailServiceClient(RegionEndpoint.USEast1))
   			 {
   				 var sendRequest = new SendEmailRequest
   				 {
   					 Source = senderAddress,
   					 Destination = new Destination
   					 {
   						 ToAddresses =
   						 new List<string> { "zweryfikowany_adres_email@gmail.com" }
   					 },
   					 Message = new Message
   					 {
   						 Subject = new Content("Wpisz temat wysyłanej wiadomości"),
   						 Body = new Body
   						 {
   							 Html = new Content
   							 {
   								 Charset = "UTF-8",
   								 Data = recordData
   							 },
   							 Text = new Content
   							 {
   								 Charset = "UTF-8",
   								 Data = "body"
   							 }
   						 }
   					 }
   				 };
   				 var sendEmail = await client.SendEmailAsync(sendRequest);
   			 }
   		 }
   		 context.Logger.LogLine("Stream processing complete.");
    	}

    	private string GetRecordContents(KinesisEvent.Record streamRecord)
    	{
        	using (var reader = new StreamReader(streamRecord.Data, Encoding.ASCII))
        	{
            	return reader.ReadToEnd();
        	}
    	}
	}
}

Testowanie

Zastanawiacie się pewnie jak dodać dane do strumienia? W przypadku SNS mogliśmy w prosty sposób dodać nową wiadomość wykorzystując konsole usług AWS. Tym razem możemy posłużyć się gotowymi przykładami lub wykorzystać AWS CLI: AWS Kinesis: dodawanie danych do strumienia

Jeżeli jeszcze nie zainstalowaliście AWS CLI zerknijcie w ten wpis: AWS Lambda - AWS CLI w którym omówiłem proces instalacji oraz niezbędnej konfiguracji.

My przechodzimy do konkretów. Posługując się poniższym poleceniem konsoli jesteśmy w stanie dodać dane do naszego strumienia:

aws kinesis put-record --stream-name kinesislambda --data "Witaj Swiecie - przetwarzamy rekord z Kinesis" --partition-key "789675"
W momencie wykonania polecenia doszło do uruchomienia funkcji Lambda - sprawdźcie czy dostaliście wiadomość na skrzynkę mailową: AWS Kinesis: potwierdzenie dodania sturmienia

Powyższe polecenie raczej nie wzbudza żadnych wątpliwości. Poświęcimy jedynie chwilę na parametr partition-key o którym możecie również przeczytać w oficjalnej dokumentacji https://docs.aws.amazon.com/cli/latest/reference/kinesis/put-record.html

Klucz ten jest używany do grupowania danych w obrębie strumienia. AWS Kinesis segreguje rekordy należące do strumienia na wiele bloków. Partition key jest powiązany z każdym rekordem danych w celu określenia do którego bloku należy dany rekord danych. Spróbujcie dodać (w ramach własnych testów) jeszcze parę rekordów z innym kluczem i spójrzcie na shardId.