§Persistent Read-Side
Event Sourcing and CQRS is a recommended introduction to this section.
Persistent Entities are used for holding the state of individual entities, but they cannot be used for serving queries that span more than one entity. You need to know the identifier of the entity to be able to interact with it. Therefore you need to create another view of the data that is tailored to the queries that the service provides. Lagom has support for populating this read-side view of the data and also for building queries of the read-side.
This separation of the write-side and the read-side of the persistent data is often referred to as the CQRS (Command Query Responibility Segregation) pattern. The CQRS Journey is a great resource for learning more about CQRS.
§Dependency
In Maven:
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-cluster_2.11</artifactId>
<version>${lagom.version}</version>
</dependency>
In sbt:
libraryDependencies += lagomJavadslPersistence
§Query the Read-Side Database
Lagom has support for Cassandra as data store, both for the write-side entities and the read-side queries. It is a very scalable distributed database, and also flexible enough to support most of the use cases that reactive services may have.
Let us first look at how a service implementation can retrieve data from Cassandra.
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;
public class BlogServiceImpl2 implements BlogService2 {
private final CassandraSession cassandraSession;
@Inject
public BlogServiceImpl2(CassandraSession cassandraSession) {
this.cassandraSession = cassandraSession;
}
@Override
public ServiceCall<NotUsed, Source<PostSummary, ?>> getPostSummaries() {
return request -> {
Source<PostSummary, ?> summaries = cassandraSession.select(
"SELECT id, title FROM postsummary;").map(row ->
PostSummary.of(row.getString("id"), row.getString("title")));
return CompletableFuture.completedFuture(summaries);
};
}
}
Note that the CassandraSession is injected in the constructor. CassandraSession
provides several methods in different flavors for executing queries. The one used in the above example returns a Source
, i.e. a streamed response. There are also methods for retrieving a list of rows, which can be useful when you know that the result set is small, e.g. when you have included a LIMIT
clause.
All methods in CassandraSession
are non-blocking and they return a CompletionStage
or a Source
. The statements are expressed in Cassandra Query Language (CQL) syntax. See Querying tables for information about CQL queries.
§Update the Read-Side
We need to transform the events generated by the Persistent Entities into database tables that can be queried as illustrated in the previous section. For that we will use the CassandraReadSideProcessor. It will Consume events produced by persistent entities and update one or more tables in Cassandra that are optimized for queries.
This is how a CassandraReadSideProcessor class looks like before filling in the implementation details:
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraReadSideProcessor;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
public class BlogEventProcessor1 extends CassandraReadSideProcessor<BlogEvent> {
@Override
public AggregateEventTag<BlogEvent> aggregateTag() {
// TODO return the tag for the events
return null;
}
@Override
public CompletionStage<Optional<UUID>> prepare(CassandraSession session) {
// TODO prepare statements, fetch offset
return noOffset();
}
@Override
public EventHandlers defineEventHandlers(EventHandlersBuilder builder) {
// TODO define event handlers
return builder.build();
}
}
To make the events available for read-side processing the events must implement the aggregateTag
method of the AggregateEvent interface to define which events belong together. Typically you define this aggregateTag
on the top level event type of a PersistentEntity
class.
The AggregateEventTag for the BlogEvent
is defined as a constant like this:
public class BlogEventTag {
public static final AggregateEventTag<BlogEvent> INSTANCE =
AggregateEventTag.of(BlogEvent.class);
}
The BlogEvent
classes:
interface BlogEvent extends Jsonable, AggregateEvent<BlogEvent> {
@Override
default public AggregateEventTag<BlogEvent> aggregateTag() {
return BlogEventTag.INSTANCE;
}
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = PostAdded.class)
interface AbstractPostAdded extends BlogEvent {
String getPostId();
PostContent getContent();
}
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = BodyChanged.class)
interface AbstractBodyChanged extends BlogEvent {
@Value.Parameter
String getBody();
}
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = PostPublished.class)
interface AbstractPostPublished extends BlogEvent {
@Value.Parameter
String getPostId();
}
}
In the service implementation you need to inject the CassandraReadSide and at startup (in the constructor) register the class that implements the CassandraReadSideProcessor
. This will make sure that one instance of the processor is always running on one of the nodes in the cluster of the service.
@Inject
public BlogServiceImpl3(
PersistentEntityRegistry persistentEntityRegistry,
CassandraSession cassandraSession,
CassandraReadSide cassandraReadSide) {
this.persistentEntityRegistry = persistentEntityRegistry;
this.cassandraSession = cassandraSession;
cassandraReadSide.register(BlogEventProcessor.class);
}
§aggregateTag
Define the AggregateEventTag in the method aggregateTag
of the processor. The tag defines which events to process. You should return the same constant value as in the events.
@Override
public AggregateEventTag<BlogEvent> aggregateTag() {
return BlogEventTag.INSTANCE;
}
§prepare
You must tell where in the event stream the processing should start. This is the primary purpose of the prepare
method. Each event is associated with a unique offset, a time based UUID. The offset is a parameter to the event handler for each event and it should typically be stored so that it can be retrieved with a select
statement in the prepare
method. Use the CassandraSession
to get the stored offset.
private CompletionStage<Optional<UUID>> selectOffset(CassandraSession session) {
return session.selectOne("SELECT offset FROM blogevent_offset").thenApply(
optionalRow -> optionalRow.map(r -> r.getUUID("offset")));
}
Return noOffset()
if you want to processes all events, e.g. when starting the first time or if the number of events are known to be small enough to processes all events.
Typically prepare
is also used to create prepared statements that are later used when processing the events. Use CassandraSession.prepare
to create the prepared statements.
private PreparedStatement writeTitle = null; // initialized in prepare
private PreparedStatement writeOffset = null; // initialized in prepare
private void setWriteTitle(PreparedStatement writeTitle) {
this.writeTitle = writeTitle;
}
private void setWriteOffset(PreparedStatement writeOffset) {
this.writeOffset = writeOffset;
}
private CompletionStage<NotUsed> prepareWriteTitle(CassandraSession session) {
return session.prepare("INSERT INTO blogsummary (partition, id, title) VALUES (1, ?, ?)")
.thenApply(ps -> {
setWriteTitle(ps);
return NotUsed.getInstance();
});
}
private CompletionStage<NotUsed> prepareWriteOffset(CassandraSession session) {
return session.prepare("INSERT INTO blogevent_offset (partition, offset) VALUES (1, ?)")
.thenApply(ps -> {
setWriteOffset(ps);
return NotUsed.getInstance();
});
}
Composing those asynchronous CompletionStage
tasks may look like this:
@Override
public CompletionStage<Optional<UUID>> prepare(CassandraSession session) {
return
prepareWriteTitle(session).thenCompose(a ->
prepareWriteOffset(session).thenCompose(b ->
selectOffset(session)));
}
§defineEventHandlers
The events are processed by event handlers that are defined in the method defineEventHandlers
. One handler for each event class.
A handler is a BiFunction
that takes the event and the offset as parameters and returns zero or more bound statements that will be executed before processing next event.
@Override
public EventHandlers defineEventHandlers(EventHandlersBuilder builder) {
builder.setEventHandler(PostAdded.class, this::processPostAdded);
return builder.build();
}
private CompletionStage<List<BoundStatement>> processPostAdded(PostAdded event, UUID offset) {
BoundStatement bindWriteTitle = writeTitle.bind();
bindWriteTitle.setString("id", event.getPostId());
bindWriteTitle.setString("title", event.getContent().getTitle());
BoundStatement bindWriteTitleOffset = writeOffset.bind(offset);
return completedStatements(Arrays.asList(bindWriteTitle, bindWriteTitleOffset));
}
In this example we add one row to the blogsummary
table and update the current offset in the blogevent_offset
table for each PostAdded
event. Other event types are ignored.
Note how the prepared statements that were initialized in the prepare
method are used here.
You can keep state in variables of the enclosing class and update that state safely from the event handlers. The events are processed sequentially, one at a time. An example of such state could be values for calculating a moving average.
If there is a failure when executing the statements the processor will be restarted after a backoff delay. This delay is increased exponentially in case of repeated failures.
§Raw Stream of Events
There is another tool that can be used if you want to do something else with the events than updating tables in Cassandra. You can get a stream of the persistent events with the eventStream
method of the PersistentEntityRegistry.
public ServiceCall<NotUsed, Source<PostSummary, ?>> newPosts() {
final PartialFunction<BlogEvent, PostSummary> collectFunction =
new PFBuilder<BlogEvent, PostSummary>()
.match(PostAdded.class, evt ->
PostSummary.of(evt.getPostId(), evt.getContent().getTitle()))
.build();
return request -> {
Source<PostSummary, ?> stream = persistentEntityRegistry
.eventStream(BlogEventTag.INSTANCE, Optional.empty())
.map(pair -> pair.first()).collect(collectFunction);
return CompletableFuture.completedFuture(stream);
};
}
The eventStream
method takes the event class that implements the AggregateEventType
and an optional offset, which is the starting point of the stream. It returns a Source
of Pair
elements, which contains the event and the associated offset.
This stream will never complete, unless there is failure from retrieving the events from the database. It will continue to deliver new events as they are persisted.
Each such stream of events will continuously generate queries to Cassandra to fetch new events and therefore this tool should be used carefully. Do not run too many such streams. It should typically not be used for service calls invoked by unknown number of clients, but it can be useful for a limited number of background processing jobs.
§Refactoring Consideration
If you use a class name of a event type as the aggregate tag in AggregateEventTag you have to retain the original tag if you change the event class name because this string is part of the stored event data. AggregateEventTag
has a factory method (and constructor) with a String tag
parameter for this purpose. Instead of using a class name as tag identifier you can consider to use a string tag up-front. The tag should be unique among the event types of the service.
§Configuration
The default configuration should be good starting point, and the following settings may later be amended to customize the behavior if needed.
lagom.persistence.read-side {
cassandra {
# Comma-separated list of contact points in the Cassandra cluster
contact-points = ["127.0.0.1"]
# Port of contact points in the Cassandra cluster
port = ${lagom.defaults.persistence.read-side.cassandra.port}
# The implementation of akka.persistence.cassandra.SessionProvider
# is used for creating the Cassandra Session. By default the
# the ServiceLocatorSessionProvider is building the Cluster from configuration
# and contact points are looked up with ServiceLocator using the configured
# cluster-id as the service name.
# Use akka.persistence.cassandra.ConfigSessionProvider to read the contact-points
# from configuration instead of using the ServiceLocator.
# It is possible to replace the implementation of the SessionProvider
# to reuse another session or override the Cluster builder with other
# settings.
# The implementation class may optionally have a constructor with an ActorSystem
# and Config parameter. The config parameter is the enclosing config section.
session-provider = com.lightbend.lagom.internal.persistence.cassandra.ServiceLocatorSessionProvider
# The identifier that will be passed as parameter to the
# ServiceLocatorSessionProvider.lookupContactPoints method.
cluster-id = "cas_native"
cluster-id = ${?CASSANDRA_SERVICE_NAME}
# Write consistency level
write-consistency = "QUORUM"
# Read consistency level
read-consistency = "QUORUM"
# The name of the Cassandra keyspace
keyspace = ${lagom.defaults.persistence.read-side.cassandra.keyspace}
# Parameter indicating whether the journal keyspace should be auto created
keyspace-autocreate = true
# replication strategy to use when creating keyspace.
# SimpleStrategy or NetworkTopologyStrategy
replication-strategy = "SimpleStrategy"
# Replication factor to use when creating keyspace.
# Is only used when replication-strategy is SimpleStrategy.
replication-factor = 1
# Replication factor list for data centers, e.g. ["dc1:3", "dc2:2"].
# Is only used when replication-strategy is NetworkTopologyStrategy.
data-center-replication-factors = []
# To limit the Cassandra hosts that it connects to a specific datacenter.
# (DCAwareRoundRobinPolicy withLocalDc)
# The id for the local datacenter of the Cassandra hosts it should connect to.
# By default, this property is not set resulting in Datastax's standard round robin policy being used.
local-datacenter = ""
# To connect to the Cassandra hosts with credentials.
# Authentication is disabled if username is not configured.
authentication.username = ""
authentication.password = ""
# SSL can be configured with the following properties.
# SSL is disabled if the truststore is not configured.
# For detailed instructions, please refer to the DataStax Cassandra chapter about
# SSL Encryption: http://docs.datastax.com/en/cassandra/2.0/cassandra/security/secureSslEncryptionTOC.html
# Path to the JKS Truststore file
ssl.truststore.path = ""
# Password to unlock the JKS Truststore
ssl.truststore.password = ""
# Path to the JKS Keystore file (optional config, only needed for client authentication)
ssl.keystore.path = ""
# Password to unlock JKS Truststore and access the private key (both must use the same password)
ssl.keystore.password = ""
# Maximum size of result set
max-result-size = 50001
# Max delay of the ExponentialReconnectionPolicy that is used when reconnecting
# to the Cassandra cluster
reconnect-max-delay = 30s
# Cassandra driver connection pool settings
# Documented at https://datastax.github.io/java-driver/features/pooling/
connection-pool {
# Create new connection threshold local
new-connection-threshold-local = 800
# Create new connection threshold remote
new-connection-threshold-remote = 200
# Connections per host core local
connections-per-host-core-local = 1
# Connections per host max local
connections-per-host-max-local = 4
# Connections per host core remote
connections-per-host-core-remote = 1
# Connections per host max remote
connections-per-host-max-remote = 4
# Max requests per connection local
max-requests-per-connection-local = 32768
# Max requests per connection remote
max-requests-per-connection-remote = 2000
# Sets the timeout when trying to acquire a connection from a host's pool
pool-timeout-millis = 0
}
# Set the protocol version explicitly, should only be used for compatibility testing.
# Supported values: 3, 4
protocol-version = ""
}
# Exponential backoff for failures in CassandraReadSideProcessor
failure-exponential-backoff {
# minimum (initial) duration until processor is started again
# after failure
min = 3s
# the exponential back-off is capped to this duration
max = 30s
# additional random delay is based on this factor
random-factor = 0.2
}
# The Akka dispatcher to use for read-side actors and tasks.
use-dispatcher = "lagom.persistence.dispatcher"
}
lagom.defaults.persistence.read-side.cassandra {
# Port of contact points in the Cassandra cluster
port = 9042
keyspace = "lagom_read"
}
§Underlying Implementation
The CassandraSession
is using the Datastax Java Driver for Apache Cassandra.
Each CassandraReadSideProcessor
instance is executed by an Actor that is managed by Akka Cluster Singleton. The processor consumes a stream of persistent events delivered by the eventsByTag
Persistence Query implemented by akka-persistence-cassandra. The tag corresponds to the tag
defined by the AggregateEventTag
.
The eventStream
of the PersistentEntityRegistry
is also implemented by the eventsByTag
query.