Paweł Łukasiewicz
2024-03-20
Paweł Łukasiewicz
2024-03-20
Udostępnij Udostępnij Kontakt
Skanowanie tabel i indeksów: .NET

Operacja Scan odczytuje wszystkie elementy w tabeli lub indeksie w DynamoDB.

Kroki, które musimy wykonać w celu wykonania operacji skanowania to utworzenie instancji klasy AmazonDynamoDBClient, przygotowanie żądania i podanie parametrów operacji skanowania przy wykorzystaniu instancji klasy ScanRequest oraz uruchomienie metody Scan i podanie powyższego obiektu jako parametru tej metody.

Nasz pierwszy przykład będzie się opierał na tabeli, którą przygotowaliśmy w jednym z poprzednich wpisów, tj. Reply. Jak doskonale pamiętacie tabela ta gromadzi wszystkie odpowiedzi dla różnych wątków na forum. Spójrzcie na poniższy (najprostszy możliwy) przykład:

public async Task<ActionResult<string>> Scan()
{
    // Setup
    var creatTable = await CreateSampleTable();
    var loadResponse = LoadReplyData();
    StringBuilder sb = new StringBuilder();

    var request = new ScanRequest
    {
        TableName = TableName3
    };

    var response = await _amazonDynamoDB.ScanAsync(request);
    var result = response.Items;

    foreach (var scanItem in result)
    {
        // Zerknicie tutaj: https://www.plukasiewicz.net/AwsLambda/DynamoDbLambda
        // jeżeli checie pobrać wartości danych atrybutów w dokładniejszy sposób
        string json = JsonConvert.SerializeObject(scanItem, Formatting.Indented);
        sb.AppendLine(json);
    }

    return sb.ToString();
}

Parametry opcjonalne

Metoda Scan pozwala na podanie kilku parametrów opcjonalnych. Jednym z przykładów jest filtr skanowania, który możemy zastosować do filtrowania wyniku skanowania. W filtrze skanowania możemy określić warunek oraz nazwę atrybutu, na którym ma zostać zastosowany filtr skanowania. Za chwilę przejdziemy do przykładu w którym na tabeli Reply zastosujemy jeden z takich filtrów. Przykład zawiera poniższe opcjonalne parametry:

  • FilterExpression - w celu pobrania odpowiedzi napisanych przez konkretnego użytkownika;
  • ProjectionExpression - w celu określenia atrybutów, które mają być pobierane dla elementów w wynikach zapytania.

Spójrzcie na poniższy przykład:

public async Task<ActionResult<string>> ScanWithParameters()
{
    // Setup
    StringBuilder sb = new StringBuilder();

    var request = new ScanRequest
    {
        TableName = TableName3,
        // Parametry opcjonalne
        ExpressionAttributeValues = new Dictionary<string, AttributeValue>
        {
            {":user", new AttributeValue { S = "User A"} }
        },
        FilterExpression = "PostedBy = :user",
        ProjectionExpression = "PostedBy"
    };

    var response = await _amazonDynamoDB.ScanAsync(request);
    var result = response.Items;

    foreach (var scanItem in result)
    {
        // Zerknicie tutaj: https://www.plukasiewicz.net/AwsLambda/DynamoDbLambda
        // jeżeli checie pobrać wartości danych atrybutów w dokładniejszy sposób
        string json = JsonConvert.SerializeObject(scanItem, Formatting.Indented);
        sb.AppendLine(json);
    }

    return sb.ToString();
}

W odpowiedzi dostaniemy poniższy zestaw danych:

{
  "PostedBy": {
    "B": null,
    "BOOL": false,
    "IsBOOLSet": false,
    "BS": [],
    "L": [],
    "IsLSet": false,
    "M": {},
    "IsMSet": false,
    "N": null,
    "NS": [],
    "NULL": false,
    "S": "User A",
    "SS": []
  }
}
{
  "PostedBy": {
    "B": null,
    "BOOL": false,
    "IsBOOLSet": false,
    "BS": [],
    "L": [],
    "IsLSet": false,
    "M": {},
    "IsMSet": false,
    "N": null,
    "NS": [],
    "NULL": false,
    "S": "User A",
    "SS": []
  }
}
{
  "PostedBy": {
    "B": null,
    "BOOL": false,
    "IsBOOLSet": false,
    "BS": [],
    "L": [],
    "IsLSet": false,
    "M": {},
    "IsMSet": false,
    "N": null,
    "NS": [],
    "NULL": false,
    "S": "User A",
    "SS": []
  }
}

Zwróćcie uwagę, że dostaliśmy jedynie odpowiedzi od użytkownika wskazanego w warunku filtrowania dodanego do naszej operacji skanowania.

Podobnie jak w przypadku operacji Query, tak i tutaj możemy (opcjonalnie) ograniczyć rozmiar strony lub ilość elementów na stronie dodając opcjonalny parametr Limit. Za każdym razem, kiedy uruchomimy operację Scan otrzymamy jedną stronę wyników, która ma określoną ilość elementów. W celu pobrania następnej strony musimy ponownie uruchomić metodę Scan podając wartość klucza głównego ostatniego elementu na poprzedniej stronie, aby operacja mogła zwrócić następny zestaw elementów. Informacje tę przekazujemy w żądaniu ustawiając właściwość ExclusiveStartKey. Początkowo wartość ta może mieć wartość null. Pobranie kolejnych stron jest możliwe po zaktualizowaniu wartości tego klucza na wartość klucza głównego ostatniego elementu na poprzedniej stronie.

Poniższy przykład bazuje na tym z poprzedniej części wpisu definiując jednak opcjonalne parametry Limit oraz ExclusiveStartKey. Pętla do/while kontynuuje skanowanie po jednej stronie w danym momencie aż LastEvaluatedKey zwróci wartość null:

public async Task<ActionResult<string>> ScanWithParametersLimit()
{
    // Setup
    Dictionary<string, AttributeValue> lastKeyEvaluated = null;
    StringBuilder sb = new StringBuilder();

    do
    {

        var request = new ScanRequest
        {
            TableName = TableName3,
            Limit = 10,
            ExclusiveStartKey = lastKeyEvaluated,
        };

        var response = await _amazonDynamoDB.ScanAsync(request);
        var result = response.Items;

        foreach (var scanItem in result)
        {
            // Zerknicie tutaj: https://www.plukasiewicz.net/AwsLambda/DynamoDbLambda
            // jeżeli checie pobrać wartości danych atrybutów w dokładniejszy sposób
            string json = JsonConvert.SerializeObject(scanItem, Formatting.Indented);
            sb.AppendLine(json);
        }

        lastKeyEvaluated = response.LastEvaluatedKey;

    } while (lastKeyEvaluated != null && lastKeyEvaluated.Count != 0);

    return sb.ToString();
}

Skanowanie równoległe

Zanim przejdziemy dalej spójmy jeszcze na skanowanie równoległe. W tym przypadku rozszerzmy nieco istniejącą implementację, dodamy nową tabelę pod nazwą CarPartsCatalog do której następnie dodamy dane. Po pomyślnym załadowaniu danych utworzymy wiele wątków i wydamy równoległe żądania skanowania. Na koniec zbierzemy i przedstawimy podsumowanie statystyk czasu pracy.

public async Task<ActionResult<string>> ParallelScan()
{
    // Tworznie nowej tabeli
    CreateCarPartsCatalogTable();

    // Wypełnienie danymi
    FillDataIntoCarPartsCatalog();

    // Skanowanie równoległe
    var summary = RunParallelScan();

    return summary;
}

#region Useful methods

private async Task CreateCarPartsCatalogTable()
{
    var request = new CreateTableRequest
    {
        TableName = TableName4,
        AttributeDefinitions = new List<AttributeDefinition>()
        {
            new AttributeDefinition
            {
                AttributeName = "Id",
                AttributeType = "N"
            }
        },
        KeySchema = new List<KeySchemaElement>
        {
            new KeySchemaElement
            {
                AttributeName = "Id",
                KeyType = "HASH" // partition key
            }
        },
        ProvisionedThroughput = new ProvisionedThroughput
        {
            ReadCapacityUnits = 10,
            WriteCapacityUnits = 20
        }
    };

    var response = await _amazonDynamoDB.CreateTableAsync(request);
    var tableDescription = response.TableDescription;

    WaitUntilTableReady(TableName4);
}

private void FillDataIntoCarPartsCatalog()
{
    int exampleItemCount = 500;

    for (int i = 0; i < exampleItemCount; i++)
    {
        AddItem(i.ToString());
    }
}

private void AddItem(string itemIndex)
{
    var request = new PutItemRequest
    {
        TableName = TableName4,
        Item = new Dictionary<string, AttributeValue>()
        {
            { "Id", new AttributeValue { N = itemIndex } },
            { "Name", new AttributeValue { S = $"Car Part: {itemIndex}" } },
            { "SerialNumber", new AttributeValue { S = $"Part_{itemIndex}_{itemIndex}_{itemIndex}" } },
            { "PartBrand", new AttributeValue { SS = new List<string> { "Audi", "Porsche", "Lamborghini" } } },
            { "Price", new AttributeValue { N = "88"} },
            { "IsAvaiable", new AttributeValue {BOOL = true } }
        }
    };

    _amazonDynamoDB.PutItemAsync(request);
}

private string RunParallelScan()
{
    // Setup
    int totalSegments = 5;

    // Tylko na potrzeby tego przykładu - moglibyśmy zrobić to znacznie lepiej ;)
    StringBuilder internalLogger = new StringBuilder();

    internalLogger.AppendLine($"Tworzenie {totalSegments} Równoległych Zadań Skanowania dla tabeli {TableName4}.");
    Task[] tasks = new Task[totalSegments];

    for (int segment = 0; segment < totalSegments; segment++)
    {
        int tpmSegment = segment;
        Task task = Task.Factory.StartNew(() =>
        {
            ScanSegment(totalSegments, tpmSegment, internalLogger);
        });

        tasks[segment] = task;
    }

    internalLogger.AppendLine("Wszystkie zadania skanowania utworzone - oczekiwanie na ich zakończenie.");
    Task.WaitAll(tasks);

    internalLogger.AppendLine("Wszystkie zadania skanowania zostały ukończone.");

    return internalLogger.ToString();
}

private void ScanSegment(int totalSegments, int segment, StringBuilder internalLogger)
{
    Dictionary<string, AttributeValue> lastEvaluatedKey = null;
    int scanItemLimit = 10;
    int totalScannedItemCount = 0;
    int totalScanRequestCount = 0;

    internalLogger.AppendLine($"Rozpoczynanie skanowania segmentu {segment} z {totalSegments} dla tabeli {TableName4}");

    do
    {
        var request = new ScanRequest
        {
            TableName = TableName4,
            Limit = scanItemLimit,
            ExclusiveStartKey = lastEvaluatedKey,
            Segment = segment,
            TotalSegments = totalSegments,
        };

        var response = _amazonDynamoDB.ScanAsync(request);
        lastEvaluatedKey = response.Result.LastEvaluatedKey;
        totalScanRequestCount++;
        totalScannedItemCount += response.Result.ScannedCount;

        foreach (var item in response.Result.Items)
        {
            internalLogger.AppendLine($"Segment: {segment}, Zeskanowane pozycje z nazwą części: {item["Name"].S}");
        }

        internalLogger.AppendLine($"Ukończono skanowanie segmentu {segment} z {totalSegments}.");
        internalLogger.AppendLine($"Całkowita liczba żądań skanowania: {totalScanRequestCount}.");
        internalLogger.AppendLine($"Całkowita liczba zeskanowanych elementów: {totalScannedItemCount}.");

    } while (lastEvaluatedKey.Count != 0);
}

#endregion