Persistent Entity

§Persistent Entity

Event Sourcing and CQRS is a recommended introduction to this section.

A PersistentEntity has a stable entity identifier, with which it can be accessed from the service implementation or other places. The state of an entity is persistent (durable) using Event Sourcing. We represent all state changes as events and those immutable facts are appended to an event log. To recreate the current state of an entity when it is started we replay these events.

A persistent entity corresponds to an Aggregate Root in Domain-Driven Design terms. Each instance has a stable identifier and for a given id there will only be one instance of the entity. Lagom takes care of distributing those instances across the cluster of the service. If you know the identifier you can send messages, so called commands, to the entity.

The persistent entity is also a transaction boundary. Invariants can be maintained within one entity but not across several entities.

If you are familiar with JPA it is worth noting that a PersistentEntity can be used for similar things as a JPA @Entity but several aspects are rather different. For example, a JPA @Entity is loaded from the database from wherever it is needed, i.e. there may be many Java object instances with the same entity identifier. In contrast, there is only one instance of PersistentEntity with a given identifier. With JPA you typically only store current state and the history of how the state was reached is not captured.

You interact with a PersistentEntity by sending command messages to it. Commands are processed sequentially, one at a time, for a specific entity instance. A command may result in state changes that are persisted as events, representing the effect of the command. The current state is not stored for every change, since it can be derived from the events. These events are only ever appended to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication.

The entities are automatically distributed across the nodes in the cluster of the service. Each entity runs only at one place, and messages can be sent to the entity without requiring the sender to know the location of the entity. If a node is stopped the entities running on that node will be started on another node when a message is sent to it next time. When new nodes are added to the cluster some existing entities are rebalanced to the new nodes to spread the load.

An entity is kept alive, holding its current state in memory, as long as it is used. When it has not been used for a while it will automatically be passivated to free up resources.

When an entity is started it replays the stored events to restore the current state. This can be either the full history of changes or starting from a snapshot which will reduce recovery times.

§Dependency

To use this feature add the following in your project’s build.

In Maven:

<dependency>
    <groupId>com.lightbend.lagom</groupId>
    <artifactId>lagom-javadsl-persistence_2.11</artifactId>
    <version>${lagom.version}</version>
</dependency>

In sbt:

libraryDependencies += lagomJavadslPersistence

§Cassandra

Lagom has support for Cassandra as data store, both for the write-side and read-side. Cassandra is a very scalable distributed database, and it is also flexible enough to support typical use cases of reactive services.

§What if I don’t want to use Cassandra?

The Persistence module in Lagom only supports Cassandra as backend data store. It is especially the read-side that is tightly integrated with Cassandra instead of providing an abstraction on top of it. We will not provide support for a wide range of other data stores, but if there is enough demand we might support some other, such as a SQL database.

If you can’t use Cassandra you can implement your Lagom services with whatever data store solution you like, but then you would not use the Persistence module in Lagom.

§PersistentEntity Stub

This is how a PersistentEntity class looks like before filling in the implementation details:

import com.lightbend.lagom.javadsl.persistence.PersistentEntity;

public class Post1
  extends PersistentEntity<BlogCommand, BlogEvent, BlogState> {

  @Override
  public Behavior initialBehavior(Optional<BlogState> snapshotState) {
    BehaviorBuilder b = newBehaviorBuilder(
        snapshotState.orElse(BlogState.EMPTY));

    // TODO define command and event handlers

    return b.build();
  }

}

The three type parameters of the extended PersistentEntity class define:

  • Command - the super class/interface of the commands
  • Event - the super class/interface of the events
  • State - the class of the state

initialBehavior is an abstract method that your concrete subclass must implement. It returns the Behavior of the entity. Use newBehaviorBuilder to create a mutable builder for defining the behavior. The behavior consists of current state and functions to process incoming commands and persisted events as described in the following sections.

§Command Handlers

The functions that process incoming commands are registered in the Behavior using setCommandHandler of the BehaviorBuilder.

// Command handlers are invoked for incoming messages (commands).
// A command handler must "return" the events to be persisted (if any).
b.setCommandHandler(AddPost.class, (AddPost cmd, CommandContext<AddPostDone> ctx) -> {
  final PostAdded postAdded =
      PostAdded.builder().content(cmd.getContent()).postId(entityId()).build();
  return ctx.thenPersist(postAdded, (PostAdded evt) ->
    // After persist is done additional side effects can be performed
    ctx.reply(AddPostDone.of(entityId())));
});

You should define one command handler for each command class that the entity can receive.

A command handler returns a Persist directive that defines what event or events, if any, to persist. Use the thenPersist, thenPersistAll or done methods of the context that is passed to the command handler function to create the Persist directive.

  • thenPersist will persist one single event
  • thenPersistAll will persist several events atomically, i.e. all events
    are stored or none of them are stored if there is an error
  • done no events are to be persisted

External side effects can be performed after successful persist in the afterPersist function. In the above example a reply is sent with the ctx.reply method.

The command can be validated before persisting state changes. Use ctx.invalidCommand or ctx.commandFailed to reject an invalid command.

b.setCommandHandler(AddPost.class, (AddPost cmd, CommandContext<AddPostDone> ctx) -> {
  if (cmd.getContent().getTitle() == null || cmd.getContent().getTitle().equals("")) {
    ctx.invalidCommand("Title must be defined");
    return ctx.done();
  }

A PersistentEntity may also process commands that do not change application state, such as query commands. Such command handlers are registered using setReadOnlyCommandHandler of the BehaviorBuilder. Replies are sent with the reply method of the context that is passed to the command handler function.

b.setReadOnlyCommandHandler(GetPost.class, (cmd, ctx) ->
  ctx.reply(state().getContent().get()));

The commands must be immutable to avoid concurrency issues that may occur from changing a command instance that has been sent.

The section Immutable Objects describes how to define immutable command classes.

§Event Handlers

When an event has been persisted successfully the current state is updated by applying the event to the current state. The functions for updating the state are registered with the setEventHandler method of the BehaviorBuilder.

// Event handlers are used both when persisting new events
// and when replaying events.
b.setEventHandler(PostAdded.class, evt ->
  state().withContent(Optional.of(evt.getContent())));

You should define one event handler for each event class that the entity can persists.

The event handler returns the new state. The state must be immutable, so you return a new instance of the state. Current state can be accessed from the event handler with the state method of the PersistentEntity. The same event handlers are also used when the entity is started up to recover its state from the stored events.

The events must be immutable to avoid concurrency issues that may occur from changing an event instance that is about to be persisted.

The section Immutable Objects describes how to define immutable event classes.

§Replies

Each command must define what type of message to use as reply to the command by implementing the PersistentEntity.ReplyType interface.

@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = AddPost.class)
interface AbstractAddPost
  extends BlogCommand, PersistentEntity.ReplyType<AddPostDone> {

  @Value.Parameter
  PostContent getContent();
}

You send the reply message using the reply method of the context that is passed to the command handler function.

Typically the reply will be an acknowledgment that the entity processed the command successfully, i.e. you send it after persist.

b.setCommandHandler(ChangeBody.class,
    (cmd, ctx) -> ctx.thenPersist(BodyChanged.of(cmd.getBody()), evt ->
      ctx.reply(Done.getInstance())));

For convenience you may use the akka.Done as acknowledgment message.

It can also be a reply to a read-only query command.

b.setReadOnlyCommandHandler(GetPost.class, (cmd, ctx) ->
  ctx.reply(state().getContent().get()));

You can use ctx.invalidCommand to reject an invalid command, which will fail the CompletionStage with PersistentEntity.InvalidCommandException on the sender side.

You can send a negative acknowledgment with ctx.commandFailed, which will fail the CompletionStage on the sender side with the given exception.

If persisting the events fails a negative acknowledgment is automatically sent, which will fail the CompletionStage on the sender side with PersistentEntity.PersistException.

If the PersistentEntity receives a command for which there is no registered command handler a negative acknowledgment is automatically sent, which will fail the the CompletionStage on the sender side with PersistentEntity.UnhandledCommandException.

If you don’t reply to a command the CompletionStage on the sender side will be completed with a akka.pattern.AskTimeoutException after a timeout.

§Changing Behavior

The event handlers are typically only updating the state, but they may also change the behavior of the entity in the sense that new functions for processing commands and events may be defined. This is useful when implementing finite state machine (FSM) like entities. Event handlers that change the behavior are registered with the setEventHandlerChangingBehavior of the BehaviorBuilder. Such an event handler returns the new Behavior instead of just returning the new state.

b.setEventHandlerChangingBehavior(PostAdded.class, evt ->
  becomePostAdded(state().withContent(Optional.of(evt.getContent()))));
private Behavior becomePostAdded(BlogState newState) {
  BehaviorBuilder b = newBehaviorBuilder(newState);

  b.setReadOnlyCommandHandler(GetPost.class, (cmd, ctx) ->
    ctx.reply(state().getContent().get()));

  b.setCommandHandler(ChangeBody.class,
      (cmd, ctx) -> ctx.thenPersist(BodyChanged.of(cmd.getBody()), evt ->
        ctx.reply(Done.getInstance())));

  b.setEventHandler(BodyChanged.class, evt -> state().withBody(evt.getBody()));

  return b.build();
}

In the above example we are creating a completely new Behavior with newBehaviorBuilder. It is also possible to start with current Behavior and modify it. You can access current behavior with the behavior method of the PersistentEntity and then use the builder method of the Behavior.

§Snapshots

When the entity is started the state is recovered by replaying stored events. To reduce this recovery time the entity may start the recovery from a snapshot of the state and then only replaying the events that were stored after the snapshot.

Such snapshots are automatically saved after a configured number of persisted events. The snapshot if any is passed as a parameter to the initialBehavior method and you should use that state as the state of the returned Behavior.

One thing to keep in mind is that if you are using event handlers that change the behavior (setEventHandlerChangingBehavior) you must also restore corresponding Behavior from the snapshot state that is passed as a parameter to the initialBehavior method.

@Override
public Behavior initialBehavior(Optional<BlogState> snapshotState) {
  if (snapshotState.isPresent() && !snapshotState.get().isEmpty()) {
    // behavior after snapshot must be restored by initialBehavior
    // if we have a non-empty BlogState we know that the initial
    // AddPost has been performed
    return becomePostAdded(snapshotState.get());
  } else {
    // behavior when no snapshot is used
    BehaviorBuilder b = newBehaviorBuilder(BlogState.EMPTY);

    // TODO define command and event handlers

    return b.build();
  }
}

The state must be immutable to avoid concurrency issues that may occur from changing a state instance that is about to be saved as snapshot.

The section Immutable Objects describes how to define immutable state classes.

§Usage from Service Implementation

To access an entity from a service implementation you first need to inject the PersistentEntityRegistry and at startup (in the constructor) register the class that implements the PersistentEntity.

In the service method you retrieve a PersistentEntityRef for a given entity identifier from the registry. Then you can send the command to the entity using the ask method of the PersistentEntityRef. ask returns a CompletionStage with the reply message.

import com.lightbend.lagom.javadsl.persistence.PersistentEntityRef;

import javax.inject.Inject;
import com.lightbend.lagom.javadsl.persistence.PersistentEntityRegistry;
import com.lightbend.lagom.javadsl.api.*;

public class BlogServiceImpl implements BlogService {

  private final PersistentEntityRegistry persistentEntities;

  @Inject
  public BlogServiceImpl(PersistentEntityRegistry persistentEntities) {
    this.persistentEntities = persistentEntities;

    persistentEntities.register(Post.class);
  }

  @Override
  public ServiceCall<AddPost, String> addPost(String id) {
    return request -> {
      PersistentEntityRef<BlogCommand> ref =
        persistentEntities.refFor(Post.class, id);
      return ref.ask(request).thenApply(ack -> "OK");
    };
  }
}

In this example we are using the command AddPost also as the request parameter of the service method, but you can of course use another type for the external API of the service.

The commands are sent as messages to the entity that may be running on a different node. If that node is not available due to network issues, JVM crash or similar the messages may be lost until the problem has been detected and the entities have been migrated to another node. In such situations the ask will time out and the CompletionStage will be completed with akka.pattern.AskTimeoutException.

Note that the AskTimeoutException is not a guarantee that the command was not processed. For example, the command might have been processed but the reply message was lost.

§Serialization

JSON is the recommended format the persisted events and state. The Serialization section describes how to add Jackson serialization support to such classes and also how to evolve the classes, which is especially important for the persistent state and events, since you must be able to deserialize old objects that were stored.

§Unit Testing

For unit testing of the entity you can use the PersistentEntityTestDriver, which will run the PersistentEntity without using a database. You can verify that it emits expected events and side-effects in response to incoming commands.

import static org.junit.Assert.assertEquals;

import java.util.Collections;
import java.util.Optional;
import com.lightbend.lagom.javadsl.persistence.PersistentEntity.InvalidCommandException;
import com.lightbend.lagom.javadsl.testkit.PersistentEntityTestDriver;
import com.lightbend.lagom.javadsl.testkit.PersistentEntityTestDriver.Outcome;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import akka.Done;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;

public class PostTest {

  static ActorSystem system;

  @BeforeClass
  public static void setup() {
    system = ActorSystem.create();
  }

  @AfterClass
  public static void teardown() {
    JavaTestKit.shutdownActorSystem(system);
    system = null;
  }

  @Test
  public void testAddPost() {
    PersistentEntityTestDriver<BlogCommand, BlogEvent, BlogState> driver =
        new PersistentEntityTestDriver<>(system, new Post(), "post-1");

    PostContent content = PostContent.of("Title", "Body");
    Outcome<BlogEvent, BlogState> outcome = driver.run(
        AddPost.of(content));
    assertEquals(PostAdded.builder().content(content).postId("post-1").build(),
        outcome.events().get(0));
    assertEquals(1, outcome.events().size());
    assertEquals(false, outcome.state().isPublished());
    assertEquals(Optional.of(content), outcome.state().getContent());
    assertEquals(AddPostDone.of("post-1"), outcome.getReplies().get(0));
    assertEquals(Collections.emptyList(), outcome.issues());
  }

  @Test
  public void testInvalidTitle() {
    PersistentEntityTestDriver<BlogCommand, BlogEvent, BlogState> driver =
        new PersistentEntityTestDriver<>(system, new Post(), "post-1");

    Outcome<BlogEvent, BlogState> outcome = driver.run(
        AddPost.of(PostContent.of("", "Body")));
    assertEquals(InvalidCommandException.class,
        outcome.getReplies().get(0).getClass());
    assertEquals(0, outcome.events().size());
    assertEquals(Collections.emptyList(), outcome.issues());
  }

  @Test
  public void testChangeBody() {
    PersistentEntityTestDriver<BlogCommand, BlogEvent, BlogState> driver =
        new PersistentEntityTestDriver<>(system, new Post(), "post-1");

    driver.run(AddPost.of(PostContent.of("Title", "Body")));

    Outcome<BlogEvent, BlogState> outcome = driver.run(
      ChangeBody.of("New body 1"),
      ChangeBody.of("New body 2"));

    assertEquals(BodyChanged.of("New body 1"), outcome.events().get(0));
    assertEquals(BodyChanged.of("New body 2"), outcome.events().get(1));
    assertEquals(2, outcome.events().size());
    assertEquals(false, outcome.state().isPublished());
    assertEquals("New body 2", outcome.state().getContent().get().getBody());
    assertEquals(Done.getInstance(), outcome.getReplies().get(0));
    assertEquals(Done.getInstance(), outcome.getReplies().get(1));
    assertEquals(2, outcome.getReplies().size());
    assertEquals(Collections.emptyList(), outcome.issues());
  }

}

run may be invoked multiple times to divide the sequence of commands into manageable steps. The Outcome contains the events and side-effects of the last run, but the state is not reset between different runs.

Note that it also verifies that all commands, events, replies and state are serializable, and reports any such problems in the issues of the Outcome.

To use this feature add the following in your project’s build.

In Maven:

<dependency>
    <groupId>com.lightbend.lagom</groupId>
    <artifactId>lagom-javadsl-cluster_2.11</artifactId>
    <version>${lagom.version}</version>
</dependency>

In sbt:

libraryDependencies += lagomJavadslTestKit

§Full Example

import com.lightbend.lagom.javadsl.persistence.PersistentEntity;
import java.util.Optional;
import akka.Done;

public class Post extends PersistentEntity<BlogCommand, BlogEvent, BlogState> {

  @Override
  public Behavior initialBehavior(Optional<BlogState> snapshotState) {
    if (snapshotState.isPresent() && !snapshotState.get().isEmpty()) {
      // behavior after snapshot must be restored by initialBehavior
      return becomePostAdded(snapshotState.get());
    } else {
      // Behavior consist of a State and defined event handlers and command handlers.
      BehaviorBuilder b = newBehaviorBuilder(BlogState.EMPTY);

      // Command handlers are invoked for incoming messages (commands).
      // A command handler must "return" the events to be persisted (if any).
      b.setCommandHandler(AddPost.class, (AddPost cmd, CommandContext<AddPostDone> ctx) -> {
        if (cmd.getContent().getTitle() == null || cmd.getContent().getTitle().equals("")) {
          ctx.invalidCommand("Title must be defined");
          return ctx.done();
        }

        final PostAdded postAdded =
            PostAdded.builder().content(cmd.getContent()).postId(entityId()).build();
        return ctx.thenPersist(postAdded, (PostAdded evt) ->
        // After persist is done additional side effects can be performed
            ctx.reply(AddPostDone.of(entityId())));
      });

      // Event handlers are used both when persisting new events and when replaying
      // events.
      b.setEventHandlerChangingBehavior(PostAdded.class, evt ->
        becomePostAdded(state().withContent(Optional.of(evt.getContent()))));

      return b.build();
    }
  }

  // Behavior can be changed in the event handlers.
  private Behavior becomePostAdded(BlogState newState) {
    BehaviorBuilder b = newBehaviorBuilder(newState);

    b.setCommandHandler(ChangeBody.class,
        (cmd, ctx) -> ctx.thenPersist(BodyChanged.of(cmd.getBody()), evt ->
          ctx.reply(Done.getInstance())));

    b.setEventHandler(BodyChanged.class, evt -> state().withBody(evt.getBody()));

    return b.build();
  }

}

You find the classes for the commands, events and state in the Immutable Objects section.

§Refactoring Consideration

If you change the class name of a PersistentEntity you have to override entityTypeName and retain the original name because this name is part of the key of the store data (it is part of the persistenceId of the underlying PersistentActor). By default the entityTypeName is using the short class name of the concrete PersistentEntity class.

§Configuration

The default configuration should be good starting point, and the following settings may later be amended to customize the behavior if needed.

lagom.persistence {

  # As a rule of thumb, the number of shards should be a factor ten greater 
  # than the planned maximum number of cluster nodes. Less shards than number
  # of nodes will result in that some nodes will not host any shards. Too many 
  # shards will result in less efficient management of the shards, e.g. 
  # rebalancing overhead, and increased latency because the coordinator is 
  # involved in the routing of the first message for each shard. The value 
  # must be the same on all nodes in a running cluster. It can be changed 
  # after stopping all nodes in the cluster.
  max-number-of-shards = 100

  # Persistent entities saves snapshots after this number of persistent
  # events. Snapshots are used to reduce recovery times.
  # It may be configured to "off" to disable snapshots.
  snapshot-after = 100
  
  # A persistent entity is passivated automatically if it does not receive 
  # any messages during this timeout. Passivation is performed to reduce
  # memory consumption. Objects referenced by the entity can be garbage
  # collected after passivation. Next message will activate the entity
  # again, which will recover its state from persistent storage.  
  passivate-after-idle-timeout = 120s
  
  # Specifies that entities run on cluster nodes with a specific role.
  # If the role is not specified (or empty) all nodes in the cluster are used.
  # The entities can still be accessed from other nodes.
  run-entities-on-role = ""
  
  # Default timeout for PersistentEntityRef.ask replies.
  ask-timeout = 5s
  
  dispatcher {
    type = Dispatcher
    executor = "thread-pool-executor"
    thread-pool-executor {
      fixed-pool-size = 16
    }
    throughput = 1
  }
}

You may also need to adjust the configuration of akka-persistence-cassandra. See its reference.conf

§Underlying Implementation

Each PersistentEntity instance is executed by a PersistentActor that is managed by Akka Cluster Sharding

The Akka Persistence journal plugin is akka-persistence-cassandra.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.