§Domain Modelling with Akka Persistence Typed
This section presents all the steps to model an Aggregate, as defined in Domain-Driven Design, using Akka Persistence Typed and following the CQRS principles embraced by Lagom. While Akka Persistence Typed provides an API for building event-sourced actors, the same does not necessarily apply for CQRS Aggregates. To build CQRS applications, we need to use a few rules to our design.
We use a simplified shopping cart example to guide you through the process. You can find a full-fledged shopping cart sample on our samples repository.
§Encoding the model
First, create a class (ie: ShoppingCartEntity
) extending the abstract class EventSourcedBehaviorWithEnforcedReplies
. It’s recommended to use EventSourcedBehaviorWithEnforcedReplies
when modeling a CQRS Aggregate. Using enforced replies requires command handlers to return a ReplyEffect
forcing the developers to be explicit about replies.
public class ShoppingCartEntity
extends EventSourcedBehaviorWithEnforcedReplies<
ShoppingCartEntity.Command, ShoppingCartEntity.Event, ShoppingCartEntity.ShoppingCart>
EventSourcedBehaviorWithEnforcedReplies
has three type parameters (Command, Event and State) and a few mandatory and optional methods that will be implemented as we progress through the guide.
All model classes (ShoppingCartEntity.Command
, ShoppingCartEntity.Event
, ShoppingCartEntity.ShoppingCart
) will be defined as static inner classes of ShoppingCartEntity
.
§Modelling the State
The state of the shopping cart is defined as following:
@Value
@JsonDeserialize
static final class ShoppingCart implements CompressedJsonable {
public final PMap<String, Integer> items;
public final Optional<Instant> checkoutDate;
@JsonCreator
ShoppingCart(PMap<String, Integer> items, Instant checkoutDate) {
this.items = Preconditions.checkNotNull(items, "items");
this.checkoutDate = Optional.ofNullable(checkoutDate);
}
ShoppingCart removeItem(String itemId) {
PMap<String, Integer> newItems = items.minus(itemId);
return new ShoppingCart(newItems, null);
}
ShoppingCart updateItem(String itemId, int quantity) {
PMap<String, Integer> newItems = items.plus(itemId, quantity);
return new ShoppingCart(newItems, null);
}
boolean isEmpty() {
return items.isEmpty();
}
boolean hasItem(String itemId) {
return items.containsKey(itemId);
}
ShoppingCart checkout(Instant when) {
return new ShoppingCart(items, when);
}
boolean isOpen() {
return !this.isCheckedOut();
}
boolean isCheckedOut() {
return this.checkoutDate.isPresent();
}
public static final ShoppingCart EMPTY = new ShoppingCart(HashTreePMap.empty(), null);
}
The ShoppingCart
has a checkedOutTime
that can be set when transitioning from one state (open shopping cart) to another (checked-out shopping cart). As we will see later, each state encodes the commands it can handle, which events it can persist, and to which other states it can transition.
Note: The sample shown above is a simplified case. Whenever your model goes through different state transitions, a better approach is to have an interface and implementations of it for each state. See examples in the style guide for Akka Persistence Typed.
§Modelling Commands and Replies
Next, we define the commands that we can send to it.
Each command defines a reply through a ActorRef<R> replyTo
field where R
is the reply type that will be sent back to the caller. Replies are used to communicate back if a command was accepted or rejected or to read the aggregate data (ie: read-only commands). It is also possible to have a mix of both. For example, if the command succeeds, it returns some updated data; if it fails, it returns a rejected message. Or you can have commands without replies (ie: fire-and-forget
). This is a less common pattern in CQRS Aggregate modeling though and not covered in this example.
interface Command<R> extends Jsonable {}
@Value
@JsonDeserialize
static final class AddItem implements Command<Confirmation>, CompressedJsonable {
public final String itemId;
public final int quantity;
public final ActorRef<Confirmation> replyTo;
@JsonCreator
AddItem(String itemId, int quantity, ActorRef<Confirmation> replyTo) {
this.itemId = Preconditions.checkNotNull(itemId, "itemId");
this.quantity = quantity;
this.replyTo = replyTo;
}
}
static final class Get implements Command<Summary> {
private final ActorRef<Summary> replyTo;
@JsonCreator
Get(ActorRef<Summary> replyTo) {
this.replyTo = replyTo;
}
}
static final class Checkout implements Command<Confirmation> {
private final ActorRef<Confirmation> replyTo;
@JsonCreator
Checkout(ActorRef<Confirmation> replyTo) {
this.replyTo = replyTo;
}
}
In Akka Typed, unlike Akka classic and Lagom Persistence, it’s not possible to return an exception to the caller. All communication between the actor and the caller must be done via the ActorRef<R> replyTo
passed in the command. Therefore, if you want to signal a rejection, you most have it encoded in your reply protocol.
The replies used by the commands above are defined like this:
interface Reply extends Jsonable {}
interface Confirmation extends Reply {}
@Value
@JsonDeserialize
static final class Summary implements Reply {
public final Map<String, Integer> items;
public final boolean checkedOut;
public final Optional<Instant> checkoutDate;
@JsonCreator
Summary(Map<String, Integer> items, boolean checkedOut, Optional<Instant> checkoutDate) {
this.items = items;
this.checkedOut = checkedOut;
this.checkoutDate = checkoutDate;
}
}
@Value
@JsonDeserialize
static final class Accepted implements Confirmation {
public final Summary summary;
@JsonCreator
Accepted(Summary summary) {
this.summary = summary;
}
}
@Value
@JsonDeserialize
static final class Rejected implements Confirmation {
public final String reason;
@JsonCreator
Rejected(String reason) {
this.reason = reason;
}
}
Here there are two different kinds of replies: Confirmation
and Summary
. Confirmation
is used when we want to modify the state. A modification request can be Accepted
or Rejected
. Then, the Summary
is used when we want to read the state of the shopping cart.
Note: Keep in mind that
Summary
is not the shopping cart itself, but the representation we want to expose to the external world. It’s a good practice to keep the internal state of the aggregate private because it allows the internal state, and the exposed API to evolve independently.
§Modelling Events
Next, we define the events that our model will persist. The events must extend Lagom’s AggregateEvent
. This is important for tagging events. We will cover it soon in the tagging events section.
public interface Event extends Jsonable, AggregateEvent<Event> {
// #shopping-cart-event-tag
/** The tag for shopping cart events used for consuming the journal event stream. */
AggregateEventShards<Event> TAG = AggregateEventTag.sharded(Event.class, 10);
// #shopping-cart-event-tag
@Override
default AggregateEventTagger<Event> aggregateTag() {
return TAG;
}
}
@Value
@JsonDeserialize
static final class ItemAdded implements Event {
public final String shoppingCartId;
public final String itemId;
public final int quantity;
public final Instant eventTime;
@JsonCreator
ItemAdded(String shoppingCartId, String itemId, int quantity, Instant eventTime) {
this.shoppingCartId = Preconditions.checkNotNull(shoppingCartId, "shoppingCartId");
this.itemId = Preconditions.checkNotNull(itemId, "itemId");
this.quantity = quantity;
this.eventTime = eventTime;
}
}
@Value
@JsonDeserialize
static final class CheckedOut implements Event {
public final String shoppingCartId;
public final Instant eventTime;
@JsonCreator
CheckedOut(String shoppingCartId, Instant eventTime) {
this.shoppingCartId = Preconditions.checkNotNull(shoppingCartId, "shoppingCartId");
this.eventTime = eventTime;
}
}
§Defining Commands Handlers
Once you’ve defined your model in terms of Commands, Replies, Events, and State, you need to specify the business rules. The command handlers define how to handle each incoming command, which validations must be applied, and finally, which events will be persisted if any.
You can encode it in different ways. The recommended style is to add the command handlers in your state classes. For ShoppingCart
, we can define the command handlers based on the two possible states:
@Override
public CommandHandlerWithReply<Command, Event, ShoppingCart> commandHandler() {
CommandHandlerWithReplyBuilder<Command, Event, ShoppingCart> builder =
newCommandHandlerWithReplyBuilder();
builder
.forState(ShoppingCart::isOpen)
.onCommand(AddItem.class, this::onAddItem)
.onCommand(Checkout.class, this::onCheckout);
builder
.forState(ShoppingCart::isCheckedOut)
.onCommand(
AddItem.class,
cmd ->
Effect()
.reply(cmd.replyTo, new Rejected("Cannot add an item to a checked-out cart")))
.onCommand(
Checkout.class,
cmd -> Effect().reply(cmd.replyTo, new Rejected("Cannot checkout a checked-out cart")));
builder.forAnyState().onCommand(Get.class, this::onGet);
return builder.build();
}
private ReplyEffect<Event, ShoppingCart> onAddItem(ShoppingCart shoppingCart, AddItem cmd) {
if (shoppingCart.hasItem(cmd.getItemId())) {
return Effect()
.reply(cmd.replyTo, new Rejected("Item was already added to this shopping cart"));
} else if (cmd.getQuantity() <= 0) {
return Effect().reply(cmd.replyTo, new Rejected("Quantity must be greater than zero"));
} else {
return Effect()
.persist(new ItemAdded(cartId, cmd.getItemId(), cmd.getQuantity(), Instant.now()))
.thenReply(cmd.replyTo, s -> new Accepted(toSummary(s)));
}
}
private ReplyEffect<Event, ShoppingCart> onGet(ShoppingCart shoppingCart, Get cmd) {
return Effect().reply(cmd.replyTo, toSummary(shoppingCart));
}
private ReplyEffect<Event, ShoppingCart> onCheckout(ShoppingCart shoppingCart, Checkout cmd) {
if (shoppingCart.isEmpty()) {
return Effect().reply(cmd.replyTo, new Rejected("Cannot checkout empty shopping cart"));
} else {
return Effect()
.persist(new CheckedOut(cartId, Instant.now()))
.thenReply(cmd.replyTo, s -> new Accepted(toSummary(s)));
}
}
Note: of course, it is possible to organize the command handlers in a way that you consider more convenient for your use case, but we recommend the
onCommand
pattern since it can help to keep the logic for each command well isolated.
Command handlers are the meat of the model. They encode the business rules of your model and act as a guardian of the model consistency. The command handler must first validate that the incoming command can be applied to the current model state. In case of successful validation, one or more events expressing the mutations are persisted. Once the events are persisted, they are applied to the state producing a new valid state.
Because an Aggregate is intended to model a consistency boundary, it’s not recommended validating commands using data that is not available in scope. Any decision should be solely based on the data passed in the commands and the state of the Aggregate. Any external call should be considered a smell because it means that the Aggregate is not in full control of the invariants it’s supposed to be protecting.
There are two ways of sending back a reply: using Effect().reply
and Effect().persist(...).thenReply
. The first one is available directly on Effect
and should be used when you reply without persisting any event. In this case, you can use the available state in scope because it’s guaranteed not to have changed. The second variant should be used when you have persisted one or more events. The updated state is then available to you on the function used to define the reply.
You may run side effects inside the command handler. Please refer to Akka documentation for detailed information.
§Defining the Event Handlers
The event handlers are used to mutate the state of the Aggregate by applying the events to it. Event handlers must be pure functions as they will be used when instantiating the Aggregate and replaying the event journal. Similar to the command handlers, a recommended style is to add them in the state classes.
@Override
public EventHandler<ShoppingCart, Event> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onEvent(
ItemAdded.class,
(shoppingCart, evt) -> shoppingCart.updateItem(evt.getItemId(), evt.getQuantity()))
.onEvent(CheckedOut.class, (shoppingCart, evt) -> shoppingCart.checkout(evt.getEventTime()))
.build();
}
§EventSourcingBehaviour - gluing the bits together
With all the model encoded, the next step is to prepare ShoppingCartEntity
so we can let it run as an Actor.
The EventSourcedBehaviorWithEnforcedReplies
has a constructor receiving a PersistenceId
that we need to call from ShoppingCartEntity
own constructor. In order to build a PersistenceId
instance we will need an EntityContext<Command>
instance. We will add it as a constructor argument to ShoppingCartEntity
.
The persistenceId
defines the id that will be used in the event journal. The id is composed of a name (for example, entityContext.entityTypeKey.name
) and a business id (for example, entityContext.entityId
). These two values will be concatenated using a "|"
by default (for example, "ShoppingCart|123456"
). See Akka’s documentation for more details.
private ShoppingCartEntity(EntityContext<Command> entityContext) {
super(
// PersistenceId needs a typeHint (or namespace) and entityId,
// we take then from the EntityContext
PersistenceId.of(
entityContext.getEntityTypeKey().name(), // <- type hint
entityContext.getEntityId() // <- business id
));
// we keep a copy of cartId because it's used in the events
this.cartId = entityContext.getEntityId();
// tagger is constructed from adapter and needs EntityContext
this.tagger = AkkaTaggerAdapter.fromLagom(entityContext, Event.TAG);
}
static ShoppingCartEntity create(EntityContext<Command> entityContext) {
return new ShoppingCartEntity(entityContext);
}
Note: the constructor is
private
and there is a static methodcreate()
to create instances, as recommended by the Akka style guide. This and the need forEntityContext
will be explained when coveringClusterSharding
later in this guide.
Morever, we also initialize a field called tagger
using an AkkaTaggerAdapter
. We will cover it soon in the tagging events section.
Next we need to implement the emptyState()
method. The emptyState
is the state used when the journal is empty. It’s the initial state:
@Override
public ShoppingCart emptyState() {
return ShoppingCart.EMPTY;
}
§Changing behavior – Finite State Machines
If you are familiar with general Akka Actors, you are probably aware that after processing a message, you should return the next behavior to be used. With Akka Persistence Typed this happens differently. Command handlers and event handlers are all dependent on the current state, therefore you can change behavior by returning a new state in the event handler. Consult the Akka documentation for more insight on this topic.
§Tagging the events – Akka Persistence Query considerations
Events are persisted in the event journal and are primarily used to replay the state of the Aggregate each time it needs to be instantiated. However, in CQRS, we also want to consume those same events and generate read-side views or publish them in a message broker (eg: Kafka) for external consumption.
To be able to consume the events on the read-side, the events must be tagged. This is done using the AggregateEventTag
utility. It’s recommended to shard the tags so they can be consumed in a distributed fashion by Lagom’s Read-Side Processor and Topic Producers. Although not recommended, it’s also possible to not shard the events as explained here.
This example splits the tags into 10 shards and defines the event tagger in the ShoppingCartEntity.Event
interface. Note that the tag name must be stable, as well as the number of shards. These two values can’t be changed later without migrating the journal.
/** The tag for shopping cart events used for consuming the journal event stream. */
AggregateEventShards<Event> TAG = AggregateEventTag.sharded(Event.class, 10);
Note: if you’re using a JDBC database to store your journal, the number of sharded tags (
NumShards
) should not be greater then 10. This is due to an existing bug in the plugin. Failing to follow this directive will result in some events being delivered more than once on the read-side or topic producers.
The AggregateEventTag
is a Lagom class used by Lagom’s Read-Side Processor and Topic Producers, however Akka Persistence Typed provides a method accepting an Event
and returning a Set<String>
to tag events before persisting them. Therefore, we need to use an adapter to transform Lagom’s AggregateEventTag
to the required Akka tagger function. As shown in the constructor section, we instantiate a tagger
field using AkkaTaggerAdapter
. This field can then be used when implementing the tagging method.
private final Function<Event, Set<String>> tagger;
@Override
public Set<String> tagsFor(Event event) {
return tagger.apply(event);
}
§Configuring snapshots
Snapshotting is a common optimization to avoid replaying all the events since the beginning.
You can define snapshot rules in two ways: by predicate and by counter. Both can be combined. The example below uses a counter to illustrate the APIs. You can find more details on the Akka documentation.
@Override
public RetentionCriteria retentionCriteria() {
return RetentionCriteria.snapshotEvery(100, 2);
}
§Akka Cluster Sharding
Lagom uses Akka Cluster Sharding to distribute the Aggregates across all the nodes and guarantee that, at any single time, there is only one instance of a given Aggregate loaded in memory over the whole cluster.
§Creating the Aggregate instance
The Aggregate needs to be initialized on the ClusterSharding
before it’s used. That process won’t create any specific Aggregate instance, and it will only create the Shard Regions and prepare it to be used (read more about Shard Regions in the Akka Cluster Sharding docs).
Note: In Akka Cluster, the term to refer to a sharded actor is entity, so a sharded Aggregate can also be referred to as an Aggregate Entity.
You must define an EntityTypeKey
and a function of EntityContext<Command> -> Behavior<Command>
to initialize the the Shopping Cart Aggregate.
The EntityTypeKey
has as name to uniquely identify this model in the cluster. It should be typed on ShoppingCartCommand
which is the type of the messages that the Shopping Cart can receive. The easiest is to define if as a static field in ShoppingCartEntity
.
static EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "ShoppingCart");
Finally, initialize the Aggregate on the ClusterSharding
using the typedKey
and the create()
static method. Lagom provides an instance of the clusterSharding
extension through dependency injection for your convenience. Initializing an entity should be done only once and, in the case of Lagom Aggregates, it is typically done in the constructor of the service implementation. You should inject the ClusterSharding
on your service for that matter.
@Inject
public ShoppingCartServiceImpl(ClusterSharding clusterSharing) {
this.clusterSharing = clusterSharing;
// register entity on shard
this.clusterSharing.init(
Entity.of(
ShoppingCartEntity.ENTITY_TYPE_KEY, // <- type key
ShoppingCartEntity::create // <- create function
));
}
§Getting instances of the Aggregate Entity
To access instances of the Aggregate (which may be running locally or remotely on the cluster), you should use the injected the ClusterSharding
. You can instantiate an EntityRef
using the method entityRefFor
. In our case, the EntityRef
is typed to only accept ShoppingCart.Command
s.
private EntityRef<ShoppingCartEntity.Command> entityRef(String id) {
return clusterSharing.entityRefFor(ShoppingCartEntity.ENTITY_TYPE_KEY, id);
}
To locate the correct actor across the cluster, you need to specify the EntityTypeKey
we used to initialize the entity and the id
for the instance we need. Akka Cluster will create the required actor in one node on the cluster or reuse the existing instance if the actor has already been created and is still alive.
The EntityRef
is similar to an ActorRef
but denotes the actor is sharded. Interacting with an EntityRef
implies the messages exchanged with the actor may need to travel over the wire to another node.
§Considerations on using the ask pattern
Since we want to send commands to the Aggregate and these commands declare a reply we will need to use the ask pattern.
The code we introduced below creates an EntityRef
from inside the ShoppingCartServiceImpl
meaning we are calling the actor (the EntityRef
) from outside the ActorSystem
. EntityRef
provides an ask()
overload out of the box meant to be used from outside actors.
private final Duration askTimeout = Duration.ofSeconds(5);
@Override
public ServiceCall<NotUsed, ShoppingCartView> get(String id) {
return request ->
entityRef(id)
.<ShoppingCartEntity.Summary>ask(
replyTo -> new ShoppingCartEntity.Get(replyTo), askTimeout)
.thenApply(summary -> asShoppingCartView(id, summary));
}
So we declare an askTimeout
and then invoke ask
. The ask
method accepts a function of ActorRef<Res> -> M
in which Res
is the expected response type and M
is the message being sent to the actor. The ask
method will create an instance of ActorRef<Res>
that can be used to build the outgoing message (command). Once the response is sent to ActorRef<Res>
, Akka will complete the returned CompletionStage<Res>
with the response (in this case CompletionStage<ShoppingCartSummary>
).
Finally, we operate over the summary
(in this case, we map it to a different type, ie: ShoppingCartView
, using the thenApply
method).
The ShoppingCartView
and asShoppingCartView
are defined as:
@Value
@JsonDeserialize
public final class ShoppingCartView {
/** The ID of the shopping cart. */
public final String id;
/** The list of items in the cart. */
public final List<ShoppingCartItem> items;
/** Whether this cart has been checked out. */
public final boolean checkedOut;
/** When this cart was checked out. */
public final Optional<Instant> checkedOutTime;
@JsonCreator
public ShoppingCartView(
String id, List<ShoppingCartItem> items, Optional<Instant> checkedOutTime) {
this.id = Preconditions.checkNotNull(id, "id");
this.items = Preconditions.checkNotNull(items, "items");
this.checkedOutTime = checkedOutTime;
this.checkedOut = checkedOutTime.isPresent();
}
public boolean hasItem(String itemId) {
return items.stream().anyMatch(item -> item.getItemId().equals(itemId));
}
public Optional<ShoppingCartItem> get(String itemId) {
return items.stream().filter(item -> item.getItemId().equals(itemId)).findFirst();
}
}
private ShoppingCartView asShoppingCartView(String id, ShoppingCartEntity.Summary summary) {
List<ShoppingCartItem> items = new ArrayList<>();
for (Map.Entry<String, Integer> item : summary.getItems().entrySet()) {
items.add(new ShoppingCartItem(item.getKey(), item.getValue()));
}
return new ShoppingCartView(id, items, summary.getCheckoutDate());
}
Note: We are keeping the internal state of the Aggregate isolated from the exposed service API so they can evolve independently.
§Configuring number of shards
Akka recommends, as a rule of thumb, that the number of shards should be a factor ten higher than the planned maximum number of cluster nodes. It doesn’t have to be exact. Fewer shards than the number of nodes will result in that some nodes will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing overhead, and increased latency because the coordinator is involved in the routing of the first message for each shard.
See the Akka Cluster Sharding documentation for details on how to configure the number of shards.
§Configuring Entity passivation
Keeping all the Aggregates in memory all the time is inefficient. Instead, use the Entity passivation feature, then sharded entities (the Aggregates) are removed from the cluster when they’ve been unused for some time.
Akka supports both programmatic passivation and automatic passivation. The default values for automatic passivation are generally good enough.
§Data Serialization
The messages (commands, replies) and the durable classes (events, state snapshots) need to be serializable to be sent over the wire across the cluster or be stored on the database. Akka recommends Jackson-based serializers –preferably JSON, but CBOR is also supported– as a good default in most cases.
Read more about the serialization setup and configuration in the serialization section.
§Testing
The section in Testing covers all the steps and features you need to write unit tests for your Aggregates.