Persistent Read-Side

§Persistent Read-Side

Event Sourcing and CQRS is a recommended introduction to this section.

Persistent Entities are used for holding the state of individual entities, but they cannot be used for serving queries that span more than one entity. You need to know the identifier of the entity to be able to interact with it. Therefore you need to create another view of the data that is tailored to the queries that the service provides. Lagom has support for populating this read-side view of the data and also for building queries of the read-side.

This separation of the write-side and the read-side of the persistent data is often referred to as the CQRS (Command Query Responsibility Segregation) pattern. The CQRS Journey is a great resource for learning more about CQRS.

§Read-side design

In Lagom, the read-side can be implemented using any datastore, using any library that runs on the JVM to populate and query it. Lagom does provide some helpers for using Cassandra and relational databases, but they are optional, they don’t need to be used.

If you’re familiar with a more traditional approach to persistence that uses tables with rows and columns, then implementing the read-side may be a little more familiar than implementing the persistent entities. There is one primary rule though, the read-side should only be updated in response to receiving events from persistent entities.

To handle these events, you need to provide a ReadSideProcessor. A read-side processor is responsible not just for handling the events produced by the persistent entity, it’s also responsible for tracking which events it has handled. This is done using offsets.

Each event produced by a persistent entity has an offset. When a read-side processor first starts, it needs to load the offset of the last event that it processed. After processing an event, it should store the offset of the event it just processed. If the storing of offsets is done atomically with any updates produced by the events, then event processing will happen exactly once for each event, otherwise it will happen at least once.

That said, if you use Lagom’s built in Cassandra or relational database read-side support, then offset tracking is done automatically for you, and you only need to worry about handling the events themselves. The rest of this page will be about implementing a read-side using Lagom’s generic read-side processor support. If you’re using Cassandra or relational database for your read-side, you should read the remainder of this page to understand how read-sides work, but then also read the following documentation on Lagom’s specific support in order to take advantage of Lagom’s built in offset handling and other features for those databases:

§Query the Read-Side Database

How you query the read-side database depends on your database, but there are two things to be aware of:

  • Ensure that any connection pools are started once, and then shut down when Lagom shuts down. Lagom is built on Play, and uses Play’s lifecycle support to register callbacks to execute on shutdown. For information on how to hook into this, see the Play documentation.
  • Ensure that any blocking actions are done in an appropriate execution context. Lagom assumes that all actions are asynchronous, and has thread pools tuned for asynchronous tasks. The use of unmanaged blocking can cause your application to stop responding at very low loads. For details on how to correctly manage thread pools for blocking database calls, see Play’s documentation on thread pools.

§Update the Read-Side

We need to transform the events generated by the Persistent Entities into database tables that can be queried as illustrated in the previous section. For that we will implement a ReadSideProcessor. It will consume events produced by persistent entities and update the database table.

§Event tags

In order to consume events from a read-side, the events need to be tagged. All events with a particular tag can be consumed as a sequential, ordered stream of events. Events can be tagged by making them implement the AggregateEvent interface. The tag is defined using the aggregateTag method.

The simplest way to tag events is to give all events for a particular entity the same tag. To do this, define a static tag, and return it from the aggregateTag method of your events:

object BlogEvent {
  val BlogEventTag = AggregateEventTag[BlogEvent]
}

sealed trait BlogEvent extends AggregateEvent[BlogEvent] {
  override def aggregateTag: AggregateEventTag[BlogEvent] =
    BlogEvent.BlogEventTag
}

While this is quite straight forward, it does mean that you can only consume the events one at a time, which may be a bottleneck to scale. If you expect events to only occur in the order of a few times per second, then this might be fine, but if you expect hundreds or thousands of events per second, then you may want to shard your read-side event processing load.

Sharding can be done in two ways, either manually by returning different tags based on information in the event, or automatically by returning a AggregateEventShards tag, which will tell Lagom to shard the tag used based on the entity’s persistence ID. It’s important to ensure all the events for the same entity end up with the same tag (and hence in the same shard), otherwise, event processing for that entity may be out of order, since the read side nodes will consume the event streams for their tags at different paces.

When you shard events, you need to decide up front how many shards you want to use. The more shards, the more you can scale your service horizontally across many nodes, however, shards come at a cost, each additional shard increases the number of read side processors that query your database for new events. It is very difficult to change the number of shards without compromising the ordering of events within an entity, so it’s best to work out up front what the peak rate of events you expect to need to handle over the lifetime of the system will be, then work out how many nodes you’ll need to handle that load, and then use that as the number of shards.

Lagom provides some utilities for helping create sharded tags. To create the sharded tags, define the number of shards in a static variable, as the shards tag, and implement the aggregateTag method to return the shards tag:

object BlogEvent {
  // will produce tags with shard numbers from 0 to 9
  val NumShards = 10
  val Tag       = AggregateEventTag.sharded[BlogEvent](NumShards)
}

sealed trait BlogEvent extends AggregateEvent[BlogEvent] {
  override def aggregateTag: AggregateEventShards[BlogEvent] = BlogEvent.Tag
}

Now Lagom here will generate a tag name that appends the hash code of the entity ID modulo the number of shards to the class name.

Note: if you’re using a JDBC database to store your journal, the number of sharded tags (NumShards) should not be greater then 10. This is due to an existing bug in the plugin. Failing to follow this directive will result in some events being delivered more than once on the read-side or topic producers.

§Defining a read side processor

This is how a ReadSideProcessor class looks like before filling in the implementation details:

class BlogEventProcessor extends ReadSideProcessor[BlogEvent] {

  override def buildHandler(): ReadSideProcessor.ReadSideHandler[BlogEvent] = {
    // TODO build read side handler
    ???
  }

  override def aggregateTags: Set[AggregateEventTag[BlogEvent]] = {
    // TODO return the tag for the events
    ???
  }
}

The first method we’ll implement is the aggregateTags method. This method has to return a list of all the tags that our processor will handle - if you return more than one tag, Lagom will shard these tags across your services cluster. To implement this method, simply return the list of all the events for your class:

override def aggregateTags: Set[AggregateEventTag[BlogEvent]] =
  BlogEvent.Tag.allTags

Now we need to implement the buildHandler method. Let’s assume that you have created a component to interact with your preferred database, it provides the following methods:

trait MyDatabase {

  /**
   * Create the tables needed for this read side if not already created.
   */
  def createTables(): Future[Done]

  /**
   * Load the offset of the last event processed.
   */
  def loadOffset(tag: AggregateEventTag[BlogEvent]): Future[Offset]

  /**
   * Handle the post added event.
   */
  def handleEvent(event: BlogEvent, offset: Offset): Future[Done]
}

The createTables method will create the tables used by the read side processor if they don’t already exist - this is completely optional, but may be useful in development and test environments as it alleviates the need for developers to manually set up their environments.

The loadOffset method reads the last Offset that was processed by this read side processor for the particular tag. Typically this will be stored in a table that has the tag name and the eventProcessorId as a compound primary key. Offsets come in two varieties, a akka.persistence.query.Sequence offset represented using a long, and a akka.persistence.query.TimeBasedUUID offset represented using a UUID. Your database will need to be able to persist both of these types. If there is no offset stored for a particular tag, such as when the processor runs for the very first time, then you can return akka.persistence.query.NoOffset.

Finally, the handleEvent method is responsible for handling the actual events. It gets passed both the event and the offset, and should persist the offset once the event handling is successful.

Given this interface, we can now implement the buildHandler method:

override def buildHandler(): ReadSideProcessor.ReadSideHandler[BlogEvent] = {
  new ReadSideHandler[BlogEvent] {

    override def globalPrepare(): Future[Done] =
      myDatabase.createTables()

    override def prepare(tag: AggregateEventTag[BlogEvent]): Future[Offset] =
      myDatabase.loadOffset(tag)

    override def handle(): Flow[EventStreamElement[BlogEvent], Done, NotUsed] = {
      Flow[EventStreamElement[BlogEvent]]
        .mapAsync(1) { eventElement =>
          myDatabase.handleEvent(eventElement.event, eventElement.offset)
        }
    }
  }
}

The globalPrepare method is used for tasks that should only be run once globally. Remember that Lagom will create many read side processors, one for each shard, if each of these are writing to the one table, you only want one of them to attempt to create that table, otherwise it could create race conditions in your database. Lagom will ensure that the globalPrepare method executes at least once before any read side processing begins. It may be executed multiple times, particularly when your cluster restarts, but those executions will only ever happen one at a time. If the globalPrepare fails, Lagom will retry, backing off exponentially on subsequent failures, until it succeeds.

The prepare method is used to load the last offset, and is useful for any other things that need to be prepared, such as optimizing update statements, before processing begins. It will be executed once per read side processor.

The handle method must return an Akka streams Flow to handle the event stream. Typically it will use mapAsync, with a parallelism of 1, to handle the events.

§Registering your read-side processor

Once you’ve created your read-side processor, you need to register it with Lagom. This is done using the ReadSide component:

class BlogServiceImpl(persistentEntityRegistry: PersistentEntityRegistry, readSide: ReadSide, myDatabase: MyDatabase)
    extends BlogService {

  readSide.register[BlogEvent](new BlogEventProcessor(myDatabase))

§Raw Stream of Events

There is another tool that can be used if you want more flexible event processing. You can get a stream of the persistent events directly from Lagom with the eventStream method of the PersistentEntityRegistry.

override def newPosts(): ServiceCall[NotUsed, Source[PostSummary, _]] =
  ServiceCall { request =>
    val response: Source[PostSummary, NotUsed] =
      persistentEntityRegistry
        .eventStream(BlogEvent.Tag.forEntityId(""), NoOffset)
        .collect {
          case EventStreamElement(entityId, event: PostAdded, offset) =>
            PostSummary(event.postId, event.content.title)
        }
    Future.successful(response)
  }

The eventStream method takes the event class that implements the AggregateEventType and an optional offset, which is the starting point of the stream. It returns a Source of EventStreamElement elements, which contains the event and the associated offset.

This stream will never complete, unless there is failure from retrieving the events from the database. It will continue to deliver new events as they are persisted.

Each such stream of events will continuously generate queries to the persistent entity implementation (eg, Cassandra) to fetch new events and therefore this tool should be used carefully. Do not run too many such streams. It should typically not be used for service calls invoked by unknown number of clients, but it can be useful for a limited number of background processing jobs.

§Refactoring Consideration

If you use a class name of a event type as the aggregate tag in AggregateEventTag you have to retain the original tag if you change the event class name because this string is part of the stored event data. AggregateEventTag has a factory method (and constructor) with a String tag parameter for this purpose. Instead of using a class name as tag identifier you can consider to use a string tag up-front. The tag should be unique among the event types of the service.

§Configuration

The default configuration should be good starting point, and the following settings may later be amended to customize the behavior if needed.

lagom.persistence.read-side {

  # how long should we wait when retrieving the last known offset
  offset-timeout = 5s

  # Exponential backoff for failures in ReadSideProcessor
  failure-exponential-backoff {
    # minimum (initial) duration until processor is started again
    # after failure
    min = 3s

    # the exponential back-off is capped to this duration
    max = 30s

    # additional random delay is based on this factor
    random-factor = 0.2
  }

  # The amount of time that a node should wait for the global prepare callback to execute
  global-prepare-timeout = 20s

  # Specifies that the read side processors should run on cluster nodes with a specific role.
  # If the role is not specified (or empty) all nodes in the cluster are used.
  run-on-role = ""

  # The Akka dispatcher to use for read-side actors and tasks.
  use-dispatcher = "lagom.persistence.dispatcher"
}

§Underlying Implementation

The eventStream of the PersistentEntityRegistry is also implemented by the eventsByTag query.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.