§Persistent Entity
Event Sourcing and CQRS is a recommended introduction to this section.
A PersistentEntity
has a stable entity identifier, with which it can be accessed from the service implementation or other places. The state of an entity is persistent (durable) using Event Sourcing. We represent all state changes as events and those immutable facts are appended to an event log. To recreate the current state of an entity when it is started we replay these events.
A persistent entity corresponds to an Aggregate Root in Domain-Driven Design terms. Each instance has a stable identifier and for a given id there will only be one instance of the entity. Lagom takes care of distributing those instances across the cluster of the service. If you know the identifier you can send messages, so called commands, to the entity.
The persistent entity is also a transaction boundary. Invariants can be maintained within one entity but not across several entities.
If you are familiar with JPA it is worth noting that a PersistentEntity
can be used for similar things as a JPA @Entity
but several aspects are rather different. For example, a JPA @Entity
is loaded from the database from wherever it is needed, i.e. there may be many Java object instances with the same entity identifier. In contrast, there is only one instance of PersistentEntity
with a given identifier. With JPA you typically only store current state and the history of how the state was reached is not captured.
You interact with a PersistentEntity
by sending command messages to it. Commands are processed sequentially, one at a time, for a specific entity instance. A command may result in state changes that are persisted as events, representing the effect of the command. The current state is not stored for every change, since it can be derived from the events. These events are only ever appended to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication.
The entities are automatically distributed across the nodes in the cluster of the service. Each entity runs only at one place, and messages can be sent to the entity without requiring the sender to know the location of the entity. If a node is stopped the entities running on that node will be started on another node when a message is sent to it next time. When new nodes are added to the cluster some existing entities are rebalanced to the new nodes to spread the load.
An entity is kept alive, holding its current state in memory, as long as it is used. When it has not been used for a while it will automatically be passivated to free up resources.
When an entity is started it replays the stored events to restore the current state. This can be either the full history of changes or starting from a snapshot which will reduce recovery times.
§Choosing a database
Lagom supports the following databases:
We recommend using Cassandra. Cassandra is a very scalable distributed database, and it is also flexible enough to support typical use cases of reactive services. In contrast to most relational databases, it natively supports sharding and replication, and is emerging as an industry standard open source NoSQL database.
Lagom also provides out of the box support for running Cassandra in a development environment - developers do not need to install, configure or manage Cassandra at all themselves when using Lagom, which makes for great developer velocity, and it means gone are the days where developers spend days setting up their development environment before they can start to be productive on a project.
For instructions on configuring your project to use Cassandra, see Using Cassandra for Persistent Entities. If instead you want to use one of the relational databases listed above, see Using a Relational Database for Persistent Entities on how to configure your project.
§PersistentEntity Stub
This is how a PersistentEntity class looks like before filling in the implementation details:
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity
final class Post1 extends PersistentEntity {
override type Command = BlogCommand
override type Event = BlogEvent
override type State = BlogState
override def initialState: BlogState = BlogState.empty
override def behavior: Behavior = Actions()
}
The three abstract type members that the concrete PersistentEntity
subclass must define:
Command
- the super class/interface of the commandsEvent
- the super class/interface of the eventsState
- the class of the state
initialState
is an abstract method that your concrete subclass must implement to define the State
when the entity is first created.
behavior
is an abstract method that your concrete subclass must implement. It returns the Behavior
of the entity. Behavior
is a function from current State
to Actions
, which defines command and event handlers.
Use Actions()
to create an immutable builder for defining the behavior. The behavior functions process incoming commands and persisted events as described in the following sections.
§Command Handlers
The functions that process incoming commands are registered using onCommand
of the Actions
.
// Command handlers are invoked for incoming messages (commands).
// A command handler must "return" the events to be persisted (if any).
Actions()
.onCommand[AddPost, AddPostDone] {
case (AddPost(content), ctx, state) =>
ctx.thenPersist(PostAdded(entityId, content)) { evt =>
// After persist is done additional side effects can be performed
ctx.reply(AddPostDone(entityId))
}
}
You should define one command handler for each command class that the entity can receive.
A command handler is a partial function with 3 parameters (Tuple3
) for the Command
, the CommandContext
and current State
.
A command handler returns a Persist directive that defines what event or events, if any, to persist. Use the thenPersist
, thenPersistAll
or done
methods of the context that is passed to the command handler function to create the Persist
directive.
thenPersist
will persist one single eventthenPersistAll
will persist several events atomically, i.e. all events
are stored or none of them are stored if there is an errordone
no events are to be persisted
External side effects can be performed after successful persist in the afterPersist
function. In the above example a reply is sent with the ctx.reply
method.
The command can be validated before persisting state changes. Note that current state is passed as parameter to the command handler partial function. Use ctx.invalidCommand
or ctx.commandFailed
to reject an invalid command.
// Command handlers are invoked for incoming messages (commands).
// A command handler must "return" the events to be persisted (if any).
.onCommand[AddPost, AddPostDone] {
case (AddPost(content), ctx, state) =>
if (content.title == null || content.title.equals("")) {
ctx.invalidCommand("Title must be defined")
ctx.done
}
A PersistentEntity
may also process commands that do not change application state, such as query commands or commands that are not valid in the entity’s current state (such as a bid placed after the auction closed). Such command handlers are registered using onReadOnlyCommand
of the Actions
. Replies are sent with the reply
method of the context that is passed to the command handler function.
The onReadOnlyCommand
is simply a convenience function that avoids you having to return no events followed by a side effect.
.onReadOnlyCommand[GetPost.type, PostContent] {
case (GetPost, ctx, state) =>
ctx.reply(state.content.get)
}
The commands must be immutable to avoid concurrency issues that may occur from changing a command instance that has been sent.
The section Immutable Objects describes how to define immutable command classes.
§Event Handlers
When an event has been persisted successfully the current state is updated by applying the event to the current state. The functions for updating the state are registered with the onEvent
method of the Actions
.
// Event handlers are used both when persisting new events and when replaying
// events.
.onEvent {
case (PostAdded(postId, content), state) =>
BlogState(Some(content), published = false)
}
You should define one event handler for each event class that the entity can persist.
The event handler returns the new state. The state must be immutable, so you return a new instance of the state. Current state is passed as a parameter to the event handler function. The same event handlers are also used when the entity is started up to recover its state from the stored events.
The events must be immutable to avoid concurrency issues that may occur from changing an event instance that is about to be persisted.
The section Immutable Objects describes how to define immutable event classes.
§Replies
Each command must define what type of message to use as reply to the command by implementing the PersistentEntity.ReplyType interface.
final case class AddPost(content: PostContent) extends BlogCommand with ReplyType[AddPostDone]
You send the reply message using the reply
method of the context that is passed to the command handler function. Note that the reply message type must match the ReplyType
defined by the command, and by the second type parameter of onCommand
.
Typically the reply will be an acknowledgment that the entity processed the command successfully, i.e. you send it after persist.
.onCommand[ChangeBody, Done] {
case (ChangeBody(body), ctx, state) =>
ctx.thenPersist(BodyChanged(entityId, body))(_ => ctx.reply(Done))
}
For convenience you may use the akka.Done
as acknowledgment message.
It can also be a reply to a read-only query command.
.onReadOnlyCommand[GetPost.type, PostContent] {
case (GetPost, ctx, state) =>
ctx.reply(state.content.get)
}
You can use ctx.invalidCommand
to reject an invalid command, which will fail the Future
with PersistentEntity.InvalidCommandException
on the sender side.
You can send a negative acknowledgment with ctx.commandFailed
, which will fail the Future
on the sender side with the given exception.
If persisting the events fails a negative acknowledgment is automatically sent, which will fail the Future
on the sender side with PersistentEntity.PersistException
.
If the PersistentEntity
receives a command for which there is no registered command handler a negative acknowledgment is automatically sent, which will fail the Future
on the sender side with PersistentEntity.UnhandledCommandException
.
If you don’t reply to a command the Future
on the sender side will be completed with a akka.pattern.AskTimeoutException
after a timeout.
§Changing Behavior
For simple entities you can use the same set of command and event handlers independent of what state the entity is in. The actions can then be defined like this:
override def behavior: Behavior =
Actions()
.onCommand[AddPost, AddPostDone] {
case (AddPost(content), ctx, state) if state.isEmpty =>
ctx.thenPersist(PostAdded(entityId, content)) { evt =>
ctx.reply(AddPostDone(entityId))
}
}
.onEvent {
case (PostAdded(postId, content), state) =>
BlogState(Some(content), published = false)
}
.onReadOnlyCommand[GetPost.type, PostContent] {
case (GetPost, ctx, state) if !state.isEmpty =>
ctx.reply(state.content.get)
}
When the state changes it can also change the behavior of the entity in the sense that new functions for processing commands and events may be defined. This is useful when implementing finite state machine (FSM) like entities. The Actions
, the set of event handler and command handlers, can be selected based on current state. The return type of the behavior
method is a function from current State
to Actions
. The reason Actions
can be used as in the above example is because Actions
itself is such a function returning itself for any State.
This is how to define different behavior for different State
:
override def behavior: Behavior = {
case state if state.isEmpty => initial
case state if !state.isEmpty => postAdded
}
private val initial: Actions = {
private val postAdded: Actions = {
Actions
is an immutable builder and therefore you have great flexibility when it comes to how to structure the various command and event handlers and combine them to the final behavior. Note that Actions
has an orElse
method that is useful for composing actions.
§Snapshots
When the entity is started the state is recovered by replaying stored events. To reduce this recovery time the entity may start the recovery from a snapshot of the state and then only replaying the events that were stored after the snapshot for that entity.
Such snapshots are automatically saved after a configured number of persisted events. The snapshot if any is automatically used as the initial state before replaying the events.
The state must be immutable to avoid concurrency issues that may occur from changing a state instance that is about to be saved as snapshot.
The section Immutable Objects describes how to define immutable state classes.
§Usage from Service Implementation
To access an entity from a service implementation you first need to inject the PersistentEntityRegistry and at startup (in the constructor) register the class that implements the PersistentEntity
.
In the service method you retrieve a PersistentEntityRef
for a given entity identifier from the registry. Then you can send the command to the entity using the ask
method of the PersistentEntityRef
. ask
returns a Future
with the reply message.
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import com.lightbend.lagom.scaladsl.api.Service
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRef
import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRegistry
class BlogServiceImpl(persistentEntities: PersistentEntityRegistry)(implicit ec: ExecutionContext)
extends BlogService {
persistentEntities.register(new Post)
override def addPost(id: String): ServiceCall[AddPost, String] =
ServiceCall { request: AddPost =>
val ref: PersistentEntityRef[BlogCommand] =
persistentEntities.refFor[Post](id)
val reply: Future[AddPostDone] = ref.ask(request)
reply.map(ack => "OK")
}
}
The explicit type annotations in the above example are included for illustrative purposes. It can be written in a more compact way that is still has the same type safety:
override def addPost(id: String) = ServiceCall { request =>
val ref = persistentEntities.refFor[Post](id)
ref.ask(request).map(ack => "OK")
}
In this example we are using the command AddPost
also as the request parameter of the service method, but you can of course use another type for the external API of the service.
The commands are sent as messages to the entity that may be running on a different node. If that node is not available due to network issues, JVM crash or similar the messages may be lost until the problem has been detected and the entities have been migrated to another node. In such situations the ask
will time out and the Future
will be completed with akka.pattern.AskTimeoutException
.
Note that the AskTimeoutException
is not a guarantee that the command was not processed. For example, the command might have been processed but the reply message was lost.
§Serialization
JSON is the recommended format the persisted events and state. The Serialization section describes how to add Play-json serialization support to such classes and also how to evolve the classes, which is especially important for the persistent state and events, since you must be able to deserialize old objects that were stored.
§Unit Testing
For unit testing of the entity you can use the PersistentEntityTestDriver, which will run the PersistentEntity
without using a database. You can verify that it emits expected events and side-effects in response to incoming commands.
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.Done
import akka.actor.ActorSystem
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity.InvalidCommandException
import com.lightbend.lagom.scaladsl.playjson.JsonSerializerRegistry
import com.lightbend.lagom.scaladsl.testkit.PersistentEntityTestDriver
import com.typesafe.config.ConfigFactory
import org.scalactic.TypeCheckedTripleEquals
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
class PostSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with TypeCheckedTripleEquals {
val system = ActorSystem("PostSpec", JsonSerializerRegistry.actorSystemSetupFor(BlogPostSerializerRegistry))
override def afterAll(): Unit = {
Await.ready(system.terminate, 10.seconds)
}
"Blog Post entity" must {
"handle AddPost" in {
val driver = new PersistentEntityTestDriver(system, new Post, "post-1")
val content = PostContent("Title", "Body")
val outcome = driver.run(AddPost(content))
outcome.events should ===(List(PostAdded("post-1", content)))
outcome.state.published should ===(false)
outcome.state.content should ===(Some(content))
outcome.replies should ===(List(AddPostDone("post-1")))
outcome.issues should be(Nil)
}
"validate title" in {
val driver = new PersistentEntityTestDriver(system, new Post, "post-1")
val outcome = driver.run(AddPost(PostContent("", "Body")))
outcome.replies.head.getClass should be(classOf[InvalidCommandException])
outcome.events.size should ===(0)
outcome.issues should be(Nil)
}
"handle ChangeBody" in {
val driver = new PersistentEntityTestDriver(system, new Post, "post-1")
driver.run(AddPost(PostContent("Title", "Body")))
val outcome = driver.run(ChangeBody("New body 1"), ChangeBody("New body 2"))
outcome.events should ===(List(BodyChanged("post-1", "New body 1"), BodyChanged("post-1", "New body 2")))
outcome.state.published should ===(false)
outcome.state.content.get.body should ===("New body 2")
outcome.replies should ===(List(Done, Done))
outcome.issues should be(Nil)
}
}
}
run
may be invoked multiple times to divide the sequence of commands into manageable steps. The Outcome contains the events and side-effects of the last run
, but the state is not reset between different runs.
Note that it also verifies that all commands, events, replies and state are serializable, and reports any such problems in the issues
of the Outcome
.
To use this feature add the following in your project’s build:
libraryDependencies += lagomScaladslTestKit
§Full Example
import akka.Done
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity
final class Post extends PersistentEntity {
override type Command = BlogCommand
override type Event = BlogEvent
override type State = BlogState
override def initialState: BlogState = BlogState.empty
override def behavior: Behavior = {
case state if state.isEmpty => initial
case state if !state.isEmpty => postAdded
}
private val initial: Actions = {
Actions()
// Command handlers are invoked for incoming messages (commands).
// A command handler must "return" the events to be persisted (if any).
.onCommand[AddPost, AddPostDone] {
case (AddPost(content), ctx, state) =>
if (content.title == null || content.title.equals("")) {
ctx.invalidCommand("Title must be defined")
ctx.done
} else {
ctx.thenPersist(PostAdded(entityId, content)) { _ =>
// After persist is done additional side effects can be performed
ctx.reply(AddPostDone(entityId))
}
}
}
// Event handlers are used both when persisting new events and when replaying
// events.
.onEvent {
case (PostAdded(postId, content), state) =>
BlogState(Some(content), published = false)
}
}
private val postAdded: Actions = {
Actions()
.onCommand[ChangeBody, Done] {
case (ChangeBody(body), ctx, state) =>
ctx.thenPersist(BodyChanged(entityId, body))(_ => ctx.reply(Done))
}
.onEvent {
case (BodyChanged(_, body), state) =>
state.withBody(body)
}
.onReadOnlyCommand[GetPost.type, PostContent] {
case (GetPost, ctx, state) =>
ctx.reply(state.content.get)
}
}
}
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity.ReplyType
import akka.Done
import com.lightbend.lagom.scaladsl.playjson.JsonSerializer
sealed trait BlogCommand
object BlogCommand {
import play.api.libs.json._
import JsonSerializer.emptySingletonFormat
implicit val postContentFormat = Json.format[PostContent]
val serializers = Vector(
JsonSerializer(Json.format[AddPost]),
JsonSerializer(Json.format[AddPostDone]),
JsonSerializer(emptySingletonFormat(GetPost)),
JsonSerializer(Json.format[ChangeBody]),
JsonSerializer(emptySingletonFormat(Publish))
)
}
//#AddPost
final case class AddPost(content: PostContent) extends BlogCommand with ReplyType[AddPostDone]
//#AddPost
final case class AddPostDone(postId: String)
case object GetPost extends BlogCommand with ReplyType[PostContent]
final case class ChangeBody(body: String) extends BlogCommand with ReplyType[Done]
case object Publish extends BlogCommand with ReplyType[Done]
import com.lightbend.lagom.scaladsl.persistence.AggregateEvent
import com.lightbend.lagom.scaladsl.persistence.AggregateEventShards
import com.lightbend.lagom.scaladsl.persistence.AggregateEventTag
import com.lightbend.lagom.scaladsl.playjson.JsonSerializer
object BlogEvent {
val NumShards = 10
// second param is optional, defaults to the class name
val Tag = AggregateEventTag.sharded[BlogEvent](NumShards)
import play.api.libs.json._
private implicit val postContentFormat = Json.format[PostContent]
val serializers = Vector(
JsonSerializer(Json.format[PostAdded]),
JsonSerializer(Json.format[BodyChanged]),
JsonSerializer(Json.format[PostPublished])
)
}
sealed trait BlogEvent extends AggregateEvent[BlogEvent] {
override def aggregateTag: AggregateEventShards[BlogEvent] = BlogEvent.Tag
}
final case class PostAdded(postId: String, content: PostContent) extends BlogEvent
final case class BodyChanged(postId: String, body: String) extends BlogEvent
final case class PostPublished(postId: String) extends BlogEvent
import play.api.libs.json._
object BlogState {
val empty = BlogState(None, published = false)
implicit val postContentFormat = Json.format[PostContent]
implicit val format: Format[BlogState] = Json.format[BlogState]
}
final case class BlogState(content: Option[PostContent], published: Boolean) {
def withBody(body: String): BlogState = {
content match {
case Some(c) =>
copy(content = Some(c.copy(body = body)))
case None =>
throw new IllegalStateException("Can't set body without content")
}
}
def isEmpty: Boolean = content.isEmpty
}
final case class PostContent(title: String, body: String)
§Refactoring Consideration
If you change the class name of a PersistentEntity
you have to override entityTypeName
and retain the original name because this name is part of the key of the store data (it is part of the persistenceId
of the underlying PersistentActor
). By default the entityTypeName
is using the short class name of the concrete PersistentEntity
class.
§Configuration
The default configuration should be good starting point, and the following settings may later be amended to customize the behavior if needed. The following is a listing of the non database specific settings for Lagom persistence:
lagom.persistence {
# As a rule of thumb, the number of shards should be a factor ten greater
# than the planned maximum number of cluster nodes. Less shards than number
# of nodes will result in that some nodes will not host any shards. Too many
# shards will result in less efficient management of the shards, e.g.
# rebalancing overhead, and increased latency because the coordinator is
# involved in the routing of the first message for each shard. The value
# must be the same on all nodes in a running cluster. It can be changed
# after stopping all nodes in the cluster.
max-number-of-shards = 100
# Persistent entities saves snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
# It may be configured to "off" to disable snapshots.
snapshot-after = 100
# A persistent entity is passivated automatically if it does not receive
# any messages during this timeout. Passivation is performed to reduce
# memory consumption. Objects referenced by the entity can be garbage
# collected after passivation. Next message will activate the entity
# again, which will recover its state from persistent storage. Set to 0
# to disable passivation - this should only be done when the number of
# entities is bounded and their state, sharded across the cluster, will
# fit in memory.
passivate-after-idle-timeout = 120s
# Specifies that entities run on cluster nodes with a specific role.
# If the role is not specified (or empty) all nodes in the cluster are used.
# The entities can still be accessed from other nodes.
run-entities-on-role = ""
# Default timeout for PersistentEntityRef.ask replies.
ask-timeout = 5s
dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 1
}
}
§Underlying Implementation
Each PersistentEntity
instance is executed by a PersistentActor that is managed by Akka Cluster Sharding.
§Execution details (advanced)
If you’ve read all the sections above you are familiar with all the pieces conforming a Persistent Entity but there are few details worth explaining more extensively. As stated above:
Commands are processed sequentially, one at a time, for a specific entity instance.
This needs a deeper explanation to understand the guarantees provided by Lagom. When a command is received, the following occurs:
- a command handler is selected, if none is found an
UnhandledCommandException
is thrown - the command handler is invoked for the command, one or more events may be emitted (to process a command that emits no events,
setReadOnlyCommandHandler
must be used) - events are applied to the appropriate event Handler (this can cause
Behavior
changes so defining the command handler on a behavior doesn’t require all event handlers to be supported on that behavior) - if applying the events didn’t cause any exception, events are persisted atomically and in the same order they were emitted on the command handler
- if there’s an
afterPersist
, then it is invoked (only once) - if the snapshotting threshold is exceeded, a snapshot is generated and stored
- finally, the command processing completes and a new command may be processed.
If you are familiar with Akka Persistence this process is slightly different in few places:
- new commands are not processed until events are stored, the
Effect
completed and the snapshot updated (if necessary). Akka provides the same behavior and alsoasync
alternatives that cause new commands to be processed even before all event handlers have completed. - saving snapshots is an operation run under the covers at least every
lagom.persistence.snapshot-after
events (see Configuration above) but “storing events atomically” takes precedence. Imagine we want a snapshot every 100 events and we already have 99 events, if the next command emits 3 events the snapshot will only be stored after event number 102 because events[100, 101, 102]
will be stored atomically and only after it’ll be possible to create a snapshot.