Persistent Read-Side

§Persistent Read-Side

Event Sourcing and CQRS is a recommended introduction to this section.

Persistent Entities are used for holding the state of individual entities, but they cannot be used for serving queries that span more than one entity. You need to know the identifier of the entity to be able to interact with it. Therefore you need to create another view of the data that is tailored to the queries that the service provides. Lagom has support for populating this read-side view of the data and also for building queries of the read-side.

This separation of the write-side and the read-side of the persistent data is often referred to as the CQRS (Command Query Responibility Segregation) pattern. The CQRS Journey is a great resource for learning more about CQRS.

§Dependency

In Maven:

<dependency>
    <groupId>com.lightbend.lagom</groupId>
    <artifactId>lagom-javadsl-cluster_2.11</artifactId>
    <version>${lagom.version}</version>
</dependency>

In sbt:

libraryDependencies += lagomJavadslPersistence

§Query the Read-Side Database

Lagom has support for Cassandra as data store, both for the write-side entities and the read-side queries. It is a very scalable distributed database, and also flexible enough to support most of the use cases that reactive services may have.

Let us first look at how a service implementation can retrieve data from Cassandra.

import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;

public class BlogServiceImpl2 implements BlogService2 {

  private final CassandraSession cassandraSession;

  @Inject
  public BlogServiceImpl2(CassandraSession cassandraSession) {
    this.cassandraSession = cassandraSession;
  }

  @Override
  public ServiceCall<NotUsed, Source<PostSummary, ?>> getPostSummaries() {
    return request -> {
      Source<PostSummary, ?> summaries = cassandraSession.select(
          "SELECT id, title FROM postsummary;").map(row ->
            PostSummary.of(row.getString("id"), row.getString("title")));
      return CompletableFuture.completedFuture(summaries);
    };
  }
}

Note that the CassandraSession is injected in the constructor. CassandraSession provides several methods in different flavors for executing queries. The one used in the above example returns a Source, i.e. a streamed response. There are also methods for retrieving a list of rows, which can be useful when you know that the result set is small, e.g. when you have included a LIMIT clause.

All methods in CassandraSession are non-blocking and they return a CompletionStage or a Source. The statements are expressed in Cassandra Query Language (CQL) syntax. See Querying tables for information about CQL queries.

§Update the Read-Side

We need to transform the events generated by the Persistent Entities into database tables that can be queried as illustrated in the previous section. For that we will use the CassandraReadSideProcessor. It will Consume events produced by persistent entities and update one or more tables in Cassandra that are optimized for queries.

This is how a CassandraReadSideProcessor class looks like before filling in the implementation details:

import com.lightbend.lagom.javadsl.persistence.AggregateEventTag;

import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraReadSideProcessor;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;

public class BlogEventProcessor1 extends CassandraReadSideProcessor<BlogEvent> {

  @Override
  public AggregateEventTag<BlogEvent> aggregateTag() {
    // TODO return the tag for the events
    return null;
  }

  @Override
  public CompletionStage<Optional<UUID>> prepare(CassandraSession session) {
    // TODO prepare statements, fetch offset
    return noOffset();
  }

  @Override
  public EventHandlers defineEventHandlers(EventHandlersBuilder builder) {
    // TODO define event handlers
    return builder.build();
  }

}

To make the events available for read-side processing the events must implement the aggregateTag method of the AggregateEvent interface to define which events belong together. Typically you define this aggregateTag on the top level event type of a PersistentEntity class.

The AggregateEventTag for the BlogEvent is defined as a constant like this:

public class BlogEventTag {

  public static final AggregateEventTag<BlogEvent> INSTANCE =
    AggregateEventTag.of(BlogEvent.class);

}

The BlogEvent classes:

interface BlogEvent extends Jsonable, AggregateEvent<BlogEvent> {

  @Override
  default public AggregateEventTag<BlogEvent> aggregateTag() {
    return BlogEventTag.INSTANCE;
  }

  @Value.Immutable
  @ImmutableStyle
  @JsonDeserialize(as = PostAdded.class)
  interface AbstractPostAdded extends BlogEvent {
    String getPostId();

    PostContent getContent();
  }

  @Value.Immutable
  @ImmutableStyle
  @JsonDeserialize(as = BodyChanged.class)
  interface AbstractBodyChanged extends BlogEvent {
    @Value.Parameter
    String getBody();
  }

  @Value.Immutable
  @ImmutableStyle
  @JsonDeserialize(as = PostPublished.class)
  interface AbstractPostPublished extends BlogEvent {
    @Value.Parameter
    String getPostId();
  }

}

In the service implementation you need to inject the CassandraReadSide and at startup (in the constructor) register the class that implements the CassandraReadSideProcessor. This will make sure that one instance of the processor is always running on one of the nodes in the cluster of the service.

@Inject
public BlogServiceImpl3(
    PersistentEntityRegistry persistentEntityRegistry,
    CassandraSession cassandraSession,
    CassandraReadSide cassandraReadSide) {

  this.persistentEntityRegistry = persistentEntityRegistry;
  this.cassandraSession = cassandraSession;

  cassandraReadSide.register(BlogEventProcessor.class);
}

§aggregateTag

Define the AggregateEventTag in the method aggregateTag of the processor. The tag defines which events to process. You should return the same constant value as in the events.

@Override
public AggregateEventTag<BlogEvent> aggregateTag() {
  return BlogEventTag.INSTANCE;
}

§prepare

You must tell where in the event stream the processing should start. This is the primary purpose of the prepare method. Each event is associated with a unique offset, a time based UUID. The offset is a parameter to the event handler for each event and it should typically be stored so that it can be retrieved with a select statement in the prepare method. Use the CassandraSession to get the stored offset.

private CompletionStage<Optional<UUID>> selectOffset(CassandraSession session) {
  return session.selectOne("SELECT offset FROM blogevent_offset").thenApply(
      optionalRow -> optionalRow.map(r -> r.getUUID("offset")));
}

Return noOffset() if you want to processes all events, e.g. when starting the first time or if the number of events are known to be small enough to processes all events.

Typically prepare is also used to create prepared statements that are later used when processing the events. Use CassandraSession.prepare to create the prepared statements.

private PreparedStatement writeTitle = null; // initialized in prepare
private PreparedStatement writeOffset = null; // initialized in prepare

private void setWriteTitle(PreparedStatement writeTitle) {
  this.writeTitle = writeTitle;
}

private void setWriteOffset(PreparedStatement writeOffset) {
  this.writeOffset = writeOffset;
}

private CompletionStage<NotUsed> prepareWriteTitle(CassandraSession session) {
  return session.prepare("INSERT INTO blogsummary (partition, id, title) VALUES (1, ?, ?)")
      .thenApply(ps -> {
        setWriteTitle(ps);
        return NotUsed.getInstance();
      });
}

private CompletionStage<NotUsed> prepareWriteOffset(CassandraSession session) {
  return session.prepare("INSERT INTO blogevent_offset (partition, offset) VALUES (1, ?)")
      .thenApply(ps -> {
        setWriteOffset(ps);
        return NotUsed.getInstance();
      });
}

Composing those asynchronous CompletionStage tasks may look like this:

@Override
public CompletionStage<Optional<UUID>> prepare(CassandraSession session) {
  return
    prepareWriteTitle(session).thenCompose(a ->
    prepareWriteOffset(session).thenCompose(b ->
    selectOffset(session)));
}

§defineEventHandlers

The events are processed by event handlers that are defined in the method defineEventHandlers. One handler for each event class.

A handler is a BiFunction that takes the event and the offset as parameters and returns zero or more bound statements that will be executed before processing next event.

@Override
public EventHandlers defineEventHandlers(EventHandlersBuilder builder) {
  builder.setEventHandler(PostAdded.class, this::processPostAdded);
  return builder.build();
}

private CompletionStage<List<BoundStatement>> processPostAdded(PostAdded event, UUID offset) {
  BoundStatement bindWriteTitle = writeTitle.bind();
  bindWriteTitle.setString("id", event.getPostId());
  bindWriteTitle.setString("title", event.getContent().getTitle());
  BoundStatement bindWriteTitleOffset = writeOffset.bind(offset);
  return completedStatements(Arrays.asList(bindWriteTitle, bindWriteTitleOffset));
}

In this example we add one row to the blogsummary table and update the current offset in the blogevent_offset table for each PostAdded event. Other event types are ignored.

Note how the prepared statements that were initialized in the prepare method are used here.

You can keep state in variables of the enclosing class and update that state safely from the event handlers. The events are processed sequentially, one at a time. An example of such state could be values for calculating a moving average.

If there is a failure when executing the statements the processor will be restarted after a backoff delay. This delay is increased exponentially in case of repeated failures.

§Raw Stream of Events

There is another tool that can be used if you want to do something else with the events than updating tables in Cassandra. You can get a stream of the persistent events with the eventStream method of the PersistentEntityRegistry.

public ServiceCall<NotUsed, Source<PostSummary, ?>> newPosts() {
  final PartialFunction<BlogEvent, PostSummary> collectFunction =
      new PFBuilder<BlogEvent, PostSummary>()
      .match(PostAdded.class, evt ->
         PostSummary.of(evt.getPostId(), evt.getContent().getTitle()))
      .build();

  return request -> {
    Source<PostSummary, ?> stream = persistentEntityRegistry
      .eventStream(BlogEventTag.INSTANCE, Optional.empty())
        .map(pair -> pair.first()).collect(collectFunction);
    return CompletableFuture.completedFuture(stream);
  };
}

The eventStream method takes the event class that implements the AggregateEventType and an optional offset, which is the starting point of the stream. It returns a Source of Pair elements, which contains the event and the associated offset.

This stream will never complete, unless there is failure from retrieving the events from the database. It will continue to deliver new events as they are persisted.

Each such stream of events will continuously generate queries to Cassandra to fetch new events and therefore this tool should be used carefully. Do not run too many such streams. It should typically not be used for service calls invoked by unknown number of clients, but it can be useful for a limited number of background processing jobs.

§Refactoring Consideration

If you use a class name of a event type as the aggregate tag in AggregateEventTag you have to retain the original tag if you change the event class name because this string is part of the stored event data. AggregateEventTag has a factory method (and constructor) with a String tag parameter for this purpose. Instead of using a class name as tag identifier you can consider to use a string tag up-front. The tag should be unique among the event types of the service.

§Configuration

The default configuration should be good starting point, and the following settings may later be amended to customize the behavior if needed.

lagom.persistence.read-side {

  cassandra {
  
    # Comma-separated list of contact points in the Cassandra cluster
    contact-points = ["127.0.0.1"]
  
    # Port of contact points in the Cassandra cluster
    port = ${lagom.defaults.persistence.read-side.cassandra.port}
    
    # The implementation of akka.persistence.cassandra.SessionProvider
    # is used for creating the Cassandra Session. By default the 
    # the ServiceLocatorSessionProvider is building the Cluster from configuration 
    # and contact points are looked up with ServiceLocator using the configured
    # cluster-id as the service name.
    # Use akka.persistence.cassandra.ConfigSessionProvider to read the contact-points
    # from configuration instead of using the ServiceLocator.
    # It is possible to replace the implementation of the SessionProvider
    # to reuse another session or override the Cluster builder with other
    # settings.
    # The implementation class may optionally have a constructor with an ActorSystem
    # and Config parameter. The config parameter is the enclosing config section.
    session-provider = com.lightbend.lagom.internal.persistence.cassandra.ServiceLocatorSessionProvider
    
    # The identifier that will be passed as parameter to the
    # ServiceLocatorSessionProvider.lookupContactPoints method. 
    cluster-id = "cas_native"
    cluster-id = ${?CASSANDRA_SERVICE_NAME}
    
    # Write consistency level
    write-consistency = "QUORUM"

    # Read consistency level
    read-consistency = "QUORUM"
    
    # The name of the Cassandra keyspace 
    keyspace = ${lagom.defaults.persistence.read-side.cassandra.keyspace}
  
    # Parameter indicating whether the journal keyspace should be auto created
    keyspace-autocreate = true
    
    # replication strategy to use when creating keyspace. 
    # SimpleStrategy or NetworkTopologyStrategy
    replication-strategy = "SimpleStrategy"
    
    # Replication factor to use when creating keyspace. 
    # Is only used when replication-strategy is SimpleStrategy.
    replication-factor = 1
    
    # Replication factor list for data centers, e.g. ["dc1:3", "dc2:2"]. 
    # Is only used when replication-strategy is NetworkTopologyStrategy.
    data-center-replication-factors = []
    
    # To limit the Cassandra hosts that it connects to a specific datacenter.
    # (DCAwareRoundRobinPolicy withLocalDc)
    # The id for the local datacenter of the Cassandra hosts it should connect to. 
    # By default, this property is not set resulting in Datastax's standard round robin policy being used.
    local-datacenter = ""
  
    # To connect to the Cassandra hosts with credentials.
    # Authentication is disabled if username is not configured.
    authentication.username = ""
    authentication.password = ""
  
    # SSL can be configured with the following properties.
    # SSL is disabled if the truststore is not configured.
    # For detailed instructions, please refer to the DataStax Cassandra chapter about 
    # SSL Encryption: http://docs.datastax.com/en/cassandra/2.0/cassandra/security/secureSslEncryptionTOC.html
    # Path to the JKS Truststore file 
    ssl.truststore.path = ""
    # Password to unlock the JKS Truststore
    ssl.truststore.password = ""
    # Path to the JKS Keystore file (optional config, only needed for client authentication)
    ssl.keystore.path = ""
    # Password to unlock JKS Truststore and access the private key (both must use the same password)
    ssl.keystore.password = "" 
    
    # Maximum size of result set
    max-result-size = 50001

    # Max delay of the ExponentialReconnectionPolicy that is used when reconnecting
    # to the Cassandra cluster
    reconnect-max-delay = 30s

    # Cassandra driver connection pool settings
    # Documented at https://datastax.github.io/java-driver/features/pooling/
    connection-pool {

      # Create new connection threshold local
      new-connection-threshold-local = 800

      # Create new connection threshold remote
      new-connection-threshold-remote = 200

      # Connections per host core local
      connections-per-host-core-local = 1

      # Connections per host max local
      connections-per-host-max-local = 4

      # Connections per host core remote
      connections-per-host-core-remote = 1

      # Connections per host max remote
      connections-per-host-max-remote = 4

      # Max requests per connection local
      max-requests-per-connection-local = 32768

      # Max requests per connection remote
      max-requests-per-connection-remote = 2000

      # Sets the timeout when trying to acquire a connection from a host's pool
      pool-timeout-millis = 0
    }
    
    # Set the protocol version explicitly, should only be used for compatibility testing.
    # Supported values: 3, 4
    protocol-version = ""
    
  }

  # Exponential backoff for failures in CassandraReadSideProcessor    
  failure-exponential-backoff {
    # minimum (initial) duration until processor is started again
    # after failure
    min = 3s
    
    # the exponential back-off is capped to this duration
    max = 30s
    
    # additional random delay is based on this factor
    random-factor = 0.2
  }
  
  # The Akka dispatcher to use for read-side actors and tasks.
  use-dispatcher = "lagom.persistence.dispatcher"
}

lagom.defaults.persistence.read-side.cassandra {
	# Port of contact points in the Cassandra cluster
	port = 9042
	keyspace = "lagom_read"
}

§Underlying Implementation

The CassandraSession is using the Datastax Java Driver for Apache Cassandra.

Each CassandraReadSideProcessor instance is executed by an Actor that is managed by Akka Cluster Singleton. The processor consumes a stream of persistent events delivered by the eventsByTag Persistence Query implemented by akka-persistence-cassandra. The tag corresponds to the tag defined by the AggregateEventTag.

The eventStream of the PersistentEntityRegistry is also implemented by the eventsByTag query.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.