sorenandersen.com

Use DynamoDB Streams to aggregate data in near real-time

August 20, 2020

Updated 14/12 2020: Added reference to Dynobase DynamoDB Streams guide and use cases.

DynamoDB Streams lets us track data modifications in our tables and have them trigger Lambda functions. Ever since starting my AWS serverless journey I’ve wanted to try and put this feature into use.

Like queues and messaging systems we can use streams to create event-driven, loosely coupled architectures where producers and consumers can be developed and deployed independently. Such architectures allow us to decompose our systems into modules of separate concerns which can provide many desirable attibutes to our software, such as

  • Scalability
  • Maintainability
  • Resilience
  • Developer velocity

So I put together a small example that aggregates game scores to be readily useable in leaderboards. The following diagram - an excerpt from the AWS Database Blog article DynamoDB Streams Use Cases and Design Patterns - depicts the architecture:

alt text

When new game events are inserted into the source table Lambda will invoke the streamProcessor function with the NEW_IMAGE stream view type. With new record(s) included in the passed-in event it couldn’t be easier to get our work done. In this example it’s aggregating the score for the player(s) in question.

Source table which triggers the streamProcessor function

alt text

Aggregation table, storing the always updated total score for each player

alt text

The streamProcessor function

The main logic of this example sits in this function. It’ll iterate newly created records and optimistically try to update with the condition that an item with the given playerId must exist beforehand. If it does not, the ConditionalCheckFailedException - only faced once per player - will be caught and the new player inserted instead.

Invoking the update looks like this, where score and date values are provided in the stream:

await dynamodb
  .update({
    TableName: process.env.LEADERBOARDS_TABLE,
    Key: { playerId },
    ConditionExpression: 'attribute_exists(playerId)',
    UpdateExpression:
      'SET gamesPlayed = gamesPlayed + :increment, totalScore = totalScore + :score, lastPlayedDate = :date',
    ExpressionAttributeValues: {
      ':increment': 1,
      ':score': score,
      ':date': timestamp.substr(0, 10),
    },
    ReturnValues: 'UPDATED_NEW',
  })
  .promise()

I’ve put together a runnable example here. Consult the README for examples on how to easily produce test data with curl.

Limitations and gotchas

  • A maximum of two subscribers are recommended for DynamoDB Streams, or throttling will occur.
  • All data in DynamoDB Streams is subject to a 24-hour lifetime.
  • Per default, if the Lambda function fails, the entire batch of items will be retried until success or expiration. Meaning that repeated errors will block further processing.
  • Production systems must implement proper failure handling and alerting to ensure that processing errors or poison messages 1) will not block, and 2) eventually cause data loss due to messages expiring from the stream.

References


Søren Andersen

Digital garden of Søren Andersen.
Posts on tech that I use and learn.