Domain Modelling with Akka Persistence Typed

§Domain Modelling with Akka Persistence Typed

This section presents all the steps to model an Aggregate, as defined in Domain-Driven Design, using Akka Persistence Typed and following the CQRS principles embraced by Lagom. While Akka Persistence Typed provides an API for building event-sourced actors, the same does not necessarily apply for CQRS Aggregates. To build CQRS applications, we need to use a few rules to our design.

We use a simplified shopping cart example to guide you through the process. You can find a full-fledged shopping cart sample on our samples repository.

§Encoding the model

First, create a class (ie: ShoppingCartEntity) extending the abstract class EventSourcedBehaviorWithEnforcedReplies. It’s recommended to use EventSourcedBehaviorWithEnforcedReplies when modeling a CQRS Aggregate. Using enforced replies requires command handlers to return a ReplyEffect forcing the developers to be explicit about replies.

public class ShoppingCartEntity
    extends EventSourcedBehaviorWithEnforcedReplies<
        ShoppingCartEntity.Command, ShoppingCartEntity.Event, ShoppingCartEntity.ShoppingCart>

EventSourcedBehaviorWithEnforcedReplies has three type parameters (Command, Event and State) and a few mandatory and optional methods that will be implemented as we progress through the guide.

All model classes (ShoppingCartEntity.Command, ShoppingCartEntity.Event, ShoppingCartEntity.ShoppingCart) will be defined as static inner classes of ShoppingCartEntity.

§Modelling the State

The state of the shopping cart is defined as following:

@Value
@JsonDeserialize
static final class ShoppingCart implements CompressedJsonable {

  public final PMap<String, Integer> items;
  public final Optional<Instant> checkoutDate;

  @JsonCreator
  ShoppingCart(PMap<String, Integer> items, Instant checkoutDate) {
    this.items = Preconditions.checkNotNull(items, "items");
    this.checkoutDate = Optional.ofNullable(checkoutDate);
  }

  ShoppingCart removeItem(String itemId) {
    PMap<String, Integer> newItems = items.minus(itemId);
    return new ShoppingCart(newItems, null);
  }

  ShoppingCart updateItem(String itemId, int quantity) {
    PMap<String, Integer> newItems = items.plus(itemId, quantity);
    return new ShoppingCart(newItems, null);
  }

  boolean isEmpty() {
    return items.isEmpty();
  }

  boolean hasItem(String itemId) {
    return items.containsKey(itemId);
  }

  ShoppingCart checkout(Instant when) {
    return new ShoppingCart(items, when);
  }

  boolean isOpen() {
    return !this.isCheckedOut();
  }

  boolean isCheckedOut() {
    return this.checkoutDate.isPresent();
  }

  public static final ShoppingCart EMPTY = new ShoppingCart(HashTreePMap.empty(), null);
}

The ShoppingCart has a checkedOutTime that can be set when transitioning from one state (open shopping cart) to another (checked-out shopping cart). As we will see later, each state encodes the commands it can handle, which events it can persist, and to which other states it can transition.

Note: The sample shown above is a simplified case. Whenever your model goes through different state transitions, a better approach is to have an interface and implementations of it for each state. See examples in the style guide for Akka Persistence Typed.

§Modelling Commands and Replies

Next, we define the commands that we can send to it.

Each command defines a reply through a ActorRef<R> replyTo field where R is the reply type that will be sent back to the caller. Replies are used to communicate back if a command was accepted or rejected or to read the aggregate data (ie: read-only commands). It is also possible to have a mix of both. For example, if the command succeeds, it returns some updated data; if it fails, it returns a rejected message. Or you can have commands without replies (ie: fire-and-forget). This is a less common pattern in CQRS Aggregate modeling though and not covered in this example.

interface Command<R> extends Jsonable {}

@Value
@JsonDeserialize
static final class AddItem implements Command<Confirmation>, CompressedJsonable {
  public final String itemId;
  public final int quantity;
  public final ActorRef<Confirmation> replyTo;

  @JsonCreator
  AddItem(String itemId, int quantity, ActorRef<Confirmation> replyTo) {
    this.itemId = Preconditions.checkNotNull(itemId, "itemId");
    this.quantity = quantity;
    this.replyTo = replyTo;
  }
}

static final class Get implements Command<Summary> {
  private final ActorRef<Summary> replyTo;

  @JsonCreator
  Get(ActorRef<Summary> replyTo) {
    this.replyTo = replyTo;
  }
}

static final class Checkout implements Command<Confirmation> {
  private final ActorRef<Confirmation> replyTo;

  @JsonCreator
  Checkout(ActorRef<Confirmation> replyTo) {
    this.replyTo = replyTo;
  }
}

In Akka Typed, unlike Akka classic and Lagom Persistence, it’s not possible to return an exception to the caller. All communication between the actor and the caller must be done via the ActorRef<R> replyTo passed in the command. Therefore, if you want to signal a rejection, you most have it encoded in your reply protocol.

The replies used by the commands above are defined like this:

interface Reply extends Jsonable {}

interface Confirmation extends Reply {}

@Value
@JsonDeserialize
static final class Summary implements Reply {

  public final Map<String, Integer> items;
  public final boolean checkedOut;
  public final Optional<Instant> checkoutDate;

  @JsonCreator
  Summary(Map<String, Integer> items, boolean checkedOut, Optional<Instant> checkoutDate) {
    this.items = items;
    this.checkedOut = checkedOut;
    this.checkoutDate = checkoutDate;
  }
}

@Value
@JsonDeserialize
static final class Accepted implements Confirmation {
  public final Summary summary;

  @JsonCreator
  Accepted(Summary summary) {
    this.summary = summary;
  }
}

@Value
@JsonDeserialize
static final class Rejected implements Confirmation {
  public final String reason;

  @JsonCreator
  Rejected(String reason) {
    this.reason = reason;
  }
}

Here there are two different kinds of replies: Confirmation and Summary. Confirmation is used when we want to modify the state. A modification request can be Accepted or Rejected. Then, the Summary is used when we want to read the state of the shopping cart.

Note: Keep in mind that Summary is not the shopping cart itself, but the representation we want to expose to the external world. It’s a good practice to keep the internal state of the aggregate private because it allows the internal state, and the exposed API to evolve independently.

§Modelling Events

Next, we define the events that our model will persist. The events must extend Lagom’s AggregateEvent. This is important for tagging events. We will cover it soon in the tagging events section.

public interface Event extends Jsonable, AggregateEvent<Event> {
  // #shopping-cart-event-tag
  /** The tag for shopping cart events used for consuming the journal event stream. */
  AggregateEventShards<Event> TAG = AggregateEventTag.sharded(Event.class, 10);
  // #shopping-cart-event-tag

  @Override
  default AggregateEventTagger<Event> aggregateTag() {
    return TAG;
  }
}

@Value
@JsonDeserialize
static final class ItemAdded implements Event {
  public final String shoppingCartId;
  public final String itemId;
  public final int quantity;
  public final Instant eventTime;

  @JsonCreator
  ItemAdded(String shoppingCartId, String itemId, int quantity, Instant eventTime) {
    this.shoppingCartId = Preconditions.checkNotNull(shoppingCartId, "shoppingCartId");
    this.itemId = Preconditions.checkNotNull(itemId, "itemId");
    this.quantity = quantity;
    this.eventTime = eventTime;
  }
}

@Value
@JsonDeserialize
static final class CheckedOut implements Event {

  public final String shoppingCartId;
  public final Instant eventTime;

  @JsonCreator
  CheckedOut(String shoppingCartId, Instant eventTime) {
    this.shoppingCartId = Preconditions.checkNotNull(shoppingCartId, "shoppingCartId");
    this.eventTime = eventTime;
  }
}

§Defining Commands Handlers

Once you’ve defined your model in terms of Commands, Replies, Events, and State, you need to specify the business rules. The command handlers define how to handle each incoming command, which validations must be applied, and finally, which events will be persisted if any.

You can encode it in different ways. The recommended style is to add the command handlers in your state classes. For ShoppingCart, we can define the command handlers based on the two possible states:

@Override
public CommandHandlerWithReply<Command, Event, ShoppingCart> commandHandler() {
  CommandHandlerWithReplyBuilder<Command, Event, ShoppingCart> builder =
      newCommandHandlerWithReplyBuilder();
  builder
      .forState(ShoppingCart::isOpen)
      .onCommand(AddItem.class, this::onAddItem)
      .onCommand(Checkout.class, this::onCheckout);

  builder
      .forState(ShoppingCart::isCheckedOut)
      .onCommand(
          AddItem.class,
          cmd ->
              Effect()
                  .reply(cmd.replyTo, new Rejected("Cannot add an item to a checked-out cart")))
      .onCommand(
          Checkout.class,
          cmd -> Effect().reply(cmd.replyTo, new Rejected("Cannot checkout a checked-out cart")));

  builder.forAnyState().onCommand(Get.class, this::onGet);

  return builder.build();
}

private ReplyEffect<Event, ShoppingCart> onAddItem(ShoppingCart shoppingCart, AddItem cmd) {
  if (shoppingCart.hasItem(cmd.getItemId())) {
    return Effect()
        .reply(cmd.replyTo, new Rejected("Item was already added to this shopping cart"));
  } else if (cmd.getQuantity() <= 0) {
    return Effect().reply(cmd.replyTo, new Rejected("Quantity must be greater than zero"));
  } else {
    return Effect()
        .persist(new ItemAdded(cartId, cmd.getItemId(), cmd.getQuantity(), Instant.now()))
        .thenReply(cmd.replyTo, s -> new Accepted(toSummary(s)));
  }
}

private ReplyEffect<Event, ShoppingCart> onGet(ShoppingCart shoppingCart, Get cmd) {
  return Effect().reply(cmd.replyTo, toSummary(shoppingCart));
}

private ReplyEffect<Event, ShoppingCart> onCheckout(ShoppingCart shoppingCart, Checkout cmd) {
  if (shoppingCart.isEmpty()) {
    return Effect().reply(cmd.replyTo, new Rejected("Cannot checkout empty shopping cart"));
  } else {
    return Effect()
        .persist(new CheckedOut(cartId, Instant.now()))
        .thenReply(cmd.replyTo, s -> new Accepted(toSummary(s)));
  }
}

Note: of course, it is possible to organize the command handlers in a way that you consider more convenient for your use case, but we recommend the onCommand pattern since it can help to keep the logic for each command well isolated.

Command handlers are the meat of the model. They encode the business rules of your model and act as a guardian of the model consistency. The command handler must first validate that the incoming command can be applied to the current model state. In case of successful validation, one or more events expressing the mutations are persisted. Once the events are persisted, they are applied to the state producing a new valid state.

Because an Aggregate is intended to model a consistency boundary, it’s not recommended validating commands using data that is not available in scope. Any decision should be solely based on the data passed in the commands and the state of the Aggregate. Any external call should be considered a smell because it means that the Aggregate is not in full control of the invariants it’s supposed to be protecting.

There are two ways of sending back a reply: using Effect().reply and Effect().persist(...).thenReply. The first one is available directly on Effect and should be used when you reply without persisting any event. In this case, you can use the available state in scope because it’s guaranteed not to have changed. The second variant should be used when you have persisted one or more events. The updated state is then available to you on the function used to define the reply.

You may run side effects inside the command handler. Please refer to Akka documentation for detailed information.

§Defining the Event Handlers

The event handlers are used to mutate the state of the Aggregate by applying the events to it. Event handlers must be pure functions as they will be used when instantiating the Aggregate and replaying the event journal. Similar to the command handlers, a recommended style is to add them in the state classes.

@Override
public EventHandler<ShoppingCart, Event> eventHandler() {
  return newEventHandlerBuilder()
      .forAnyState()
      .onEvent(
          ItemAdded.class,
          (shoppingCart, evt) -> shoppingCart.updateItem(evt.getItemId(), evt.getQuantity()))
      .onEvent(CheckedOut.class, (shoppingCart, evt) -> shoppingCart.checkout(evt.getEventTime()))
      .build();
}

§EventSourcingBehaviour - gluing the bits together

With all the model encoded, the next step is to prepare ShoppingCartEntity so we can let it run as an Actor.

The EventSourcedBehaviorWithEnforcedReplies has a constructor receiving a PersistenceId that we need to call from ShoppingCartEntity own constructor. In order to build a PersistenceId instance we will need an EntityContext<Command> instance. We will add it as a constructor argument to ShoppingCartEntity.

The persistenceId defines the id that will be used in the event journal. The id is composed of a name (for example, entityContext.entityTypeKey.name) and a business id (for example, entityContext.entityId). These two values will be concatenated using a "|" by default (for example, "ShoppingCart|123456"). See Akka’s documentation for more details.

private ShoppingCartEntity(EntityContext<Command> entityContext) {
  super(
      // PersistenceId needs a typeHint (or namespace) and entityId,
      // we take then from the EntityContext
      PersistenceId.of(
          entityContext.getEntityTypeKey().name(), // <- type hint
          entityContext.getEntityId() // <- business id
          ));
  // we keep a copy of cartId because it's used in the events
  this.cartId = entityContext.getEntityId();
  // tagger is constructed from adapter and needs EntityContext
  this.tagger = AkkaTaggerAdapter.fromLagom(entityContext, Event.TAG);
}

static ShoppingCartEntity create(EntityContext<Command> entityContext) {
  return new ShoppingCartEntity(entityContext);
}

Note: the constructor is private and there is a static method create() to create instances, as recommended by the Akka style guide. This and the need for EntityContext will be explained when covering ClusterSharding later in this guide.

Morever, we also initialize a field called tagger using an AkkaTaggerAdapter. We will cover it soon in the tagging events section.

Next we need to implement the emptyState() method. The emptyState is the state used when the journal is empty. It’s the initial state:

@Override
public ShoppingCart emptyState() {
  return ShoppingCart.EMPTY;
}

§Changing behavior – Finite State Machines

If you are familiar with general Akka Actors, you are probably aware that after processing a message, you should return the next behavior to be used. With Akka Persistence Typed this happens differently. Command handlers and event handlers are all dependent on the current state, therefore you can change behavior by returning a new state in the event handler. Consult the Akka documentation for more insight on this topic.

§Tagging the events – Akka Persistence Query considerations

Events are persisted in the event journal and are primarily used to replay the state of the Aggregate each time it needs to be instantiated. However, in CQRS, we also want to consume those same events and generate read-side views or publish them in a message broker (eg: Kafka) for external consumption.

To be able to consume the events on the read-side, the events must be tagged. This is done using the AggregateEventTag utility. It’s recommended to shard the tags so they can be consumed in a distributed fashion by Lagom’s Read-Side Processor and Topic Producers. Although not recommended, it’s also possible to not shard the events as explained here.

This example splits the tags into 10 shards and defines the event tagger in the ShoppingCartEntity.Event interface. Note that the tag name must be stable, as well as the number of shards. These two values can’t be changed later without migrating the journal.

/** The tag for shopping cart events used for consuming the journal event stream. */
AggregateEventShards<Event> TAG = AggregateEventTag.sharded(Event.class, 10);

Note: if you’re using a JDBC database to store your journal, the number of sharded tags (NumShards) should not be greater then 10. This is due to an existing bug in the plugin. Failing to follow this directive will result in some events being delivered more than once on the read-side or topic producers.

The AggregateEventTag is a Lagom class used by Lagom’s Read-Side Processor and Topic Producers, however Akka Persistence Typed provides a method accepting an Event and returning a Set<String> to tag events before persisting them. Therefore, we need to use an adapter to transform Lagom’s AggregateEventTag to the required Akka tagger function. As shown in the constructor section, we instantiate a tagger field using AkkaTaggerAdapter. This field can then be used when implementing the tagging method.

private final Function<Event, Set<String>> tagger;

@Override
public Set<String> tagsFor(Event event) {
  return tagger.apply(event);
}

§Configuring snapshots

Snapshotting is a common optimization to avoid replaying all the events since the beginning.

You can define snapshot rules in two ways: by predicate and by counter. Both can be combined. The example below uses a counter to illustrate the APIs. You can find more details on the Akka documentation.

@Override
public RetentionCriteria retentionCriteria() {
  return RetentionCriteria.snapshotEvery(100, 2);
}

§Akka Cluster Sharding

Lagom uses Akka Cluster Sharding to distribute the Aggregates across all the nodes and guarantee that, at any single time, there is only one instance of a given Aggregate loaded in memory over the whole cluster.

§Creating the Aggregate instance

The Aggregate needs to be initialized on the ClusterSharding before it’s used. That process won’t create any specific Aggregate instance, and it will only create the Shard Regions and prepare it to be used (read more about Shard Regions in the Akka Cluster Sharding docs).

Note: In Akka Cluster, the term to refer to a sharded actor is entity, so a sharded Aggregate can also be referred to as an Aggregate Entity.

You must define an EntityTypeKey and a function of EntityContext<Command> -> Behavior<Command> to initialize the the Shopping Cart Aggregate.

The EntityTypeKey has as name to uniquely identify this model in the cluster. It should be typed on ShoppingCartCommand which is the type of the messages that the Shopping Cart can receive. The easiest is to define if as a static field in ShoppingCartEntity.

static EntityTypeKey<Command> ENTITY_TYPE_KEY =
    EntityTypeKey.create(Command.class, "ShoppingCart");

Finally, initialize the Aggregate on the ClusterSharding using the typedKey and the create() static method. Lagom provides an instance of the clusterSharding extension through dependency injection for your convenience. Initializing an entity should be done only once and, in the case of Lagom Aggregates, it is typically done in the constructor of the service implementation. You should inject the ClusterSharding on your service for that matter.

@Inject
public ShoppingCartServiceImpl(ClusterSharding clusterSharing) {
  this.clusterSharing = clusterSharing;

  // register entity on shard
  this.clusterSharing.init(
      Entity.of(
          ShoppingCartEntity.ENTITY_TYPE_KEY, // <- type key
          ShoppingCartEntity::create // <- create function
          ));
}

§Getting instances of the Aggregate Entity

To access instances of the Aggregate (which may be running locally or remotely on the cluster), you should use the injected the ClusterSharding. You can instantiate an EntityRef using the method entityRefFor. In our case, the EntityRef is typed to only accept ShoppingCart.Commands.

private EntityRef<ShoppingCartEntity.Command> entityRef(String id) {
  return clusterSharing.entityRefFor(ShoppingCartEntity.ENTITY_TYPE_KEY, id);
}

To locate the correct actor across the cluster, you need to specify the EntityTypeKey we used to initialize the entity and the id for the instance we need. Akka Cluster will create the required actor in one node on the cluster or reuse the existing instance if the actor has already been created and is still alive.

The EntityRef is similar to an ActorRef but denotes the actor is sharded. Interacting with an EntityRef implies the messages exchanged with the actor may need to travel over the wire to another node.

§Considerations on using the ask pattern

Since we want to send commands to the Aggregate and these commands declare a reply we will need to use the ask pattern.

The code we introduced below creates an EntityRef from inside the ShoppingCartServiceImpl meaning we are calling the actor (the EntityRef) from outside the ActorSystem. EntityRef provides an ask() overload out of the box meant to be used from outside actors.

private final Duration askTimeout = Duration.ofSeconds(5);

@Override
public ServiceCall<NotUsed, ShoppingCartView> get(String id) {
  return request ->
      entityRef(id)
          .<ShoppingCartEntity.Summary>ask(
              replyTo -> new ShoppingCartEntity.Get(replyTo), askTimeout)
          .thenApply(summary -> asShoppingCartView(id, summary));
}

So we declare an askTimeout and then invoke ask. The ask method accepts a function of ActorRef<Res> -> M in which Res is the expected response type and M is the message being sent to the actor. The ask method will create an instance of ActorRef<Res> that can be used to build the outgoing message (command). Once the response is sent to ActorRef<Res>, Akka will complete the returned CompletionStage<Res> with the response (in this case CompletionStage<ShoppingCartSummary>).

Finally, we operate over the summary (in this case, we map it to a different type, ie: ShoppingCartView, using the thenApply method).

The ShoppingCartView and asShoppingCartView are defined as:

@Value
@JsonDeserialize
public final class ShoppingCartView {
  /** The ID of the shopping cart. */
  public final String id;

  /** The list of items in the cart. */
  public final List<ShoppingCartItem> items;

  /** Whether this cart has been checked out. */
  public final boolean checkedOut;

  /** When this cart was checked out. */
  public final Optional<Instant> checkedOutTime;

  @JsonCreator
  public ShoppingCartView(
      String id, List<ShoppingCartItem> items, Optional<Instant> checkedOutTime) {
    this.id = Preconditions.checkNotNull(id, "id");
    this.items = Preconditions.checkNotNull(items, "items");
    this.checkedOutTime = checkedOutTime;
    this.checkedOut = checkedOutTime.isPresent();
  }

  public boolean hasItem(String itemId) {
    return items.stream().anyMatch(item -> item.getItemId().equals(itemId));
  }

  public Optional<ShoppingCartItem> get(String itemId) {
    return items.stream().filter(item -> item.getItemId().equals(itemId)).findFirst();
  }
}
private ShoppingCartView asShoppingCartView(String id, ShoppingCartEntity.Summary summary) {
  List<ShoppingCartItem> items = new ArrayList<>();
  for (Map.Entry<String, Integer> item : summary.getItems().entrySet()) {
    items.add(new ShoppingCartItem(item.getKey(), item.getValue()));
  }
  return new ShoppingCartView(id, items, summary.getCheckoutDate());
}

Note: We are keeping the internal state of the Aggregate isolated from the exposed service API so they can evolve independently.

§Configuring number of shards

Akka recommends, as a rule of thumb, that the number of shards should be a factor ten higher than the planned maximum number of cluster nodes. It doesn’t have to be exact. Fewer shards than the number of nodes will result in that some nodes will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing overhead, and increased latency because the coordinator is involved in the routing of the first message for each shard.

See the Akka Cluster Sharding documentation for details on how to configure the number of shards.

§Configuring Entity passivation

Keeping all the Aggregates in memory all the time is inefficient. Instead, use the Entity passivation feature, then sharded entities (the Aggregates) are removed from the cluster when they’ve been unused for some time.

Akka supports both programmatic passivation and automatic passivation. The default values for automatic passivation are generally good enough.

§Data Serialization

The messages (commands, replies) and the durable classes (events, state snapshots) need to be serializable to be sent over the wire across the cluster or be stored on the database. Akka recommends Jackson-based serializers –preferably JSON, but CBOR is also supported– as a good default in most cases.

Read more about the serialization setup and configuration in the serialization section.

§Testing

The section in Testing covers all the steps and features you need to write unit tests for your Aggregates.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.