§Migrating to Akka Persistence Typed
With the support for Akka Persistence Typed in Lagom it is possible to migrate existing code from Lagom Persistence (classic) to Akka Persistence Typed. There’s a few steps to consider in order to be able to read an existing journal.
Note: the only limitation when migrating from from Lagom Persistence (classic) to Akka Persistence Typed is that a full cluster shutdown is required. Even though all durable data is compatible, Lagom Persistence (classic) and Akka Persistence Typed can’t coexist.
Before you start, make sure you have read the page Domain Modelling with Akka Persistence Typed and you understand how to model a domain using Akka Persistence Typed.
§Migrating the model
Similarly to Lagom’s Persistent Entity, to create an Akka Persistence Typed EventSourcedBehavior
you need:
- a
persistenceId: PersistenceId
- an
emptyState
which represents theState
before any event was ever persisted - a function
(State, Command) => ReplyEffect
to handle the commands, persist events and return a reply - a function
(State, Event) => State
to handle events and mutate theState
EventSourcedBehavior
.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
emptyState = ShoppingCart.empty,
commandHandler = (cart, cmd) => cart.applyCommand(cmd),
eventHandler = (cart, evt) => cart.applyEvent(evt)
)
Instead of using a builder and adding multiple command and event handlers, in Akka Persistence Typed, you must define a command handler and an event handler function in which you can use Scala’s pattern matching to select the specific logic.
def applyCommand(cmd: ShoppingCartCommand): ReplyEffect[ShoppingCartEvent, ShoppingCart] =
cmd match {
case x: UpdateItem => onUpdateItem(x)
case x: Checkout => onCheckout(x)
case x: Get => onReadState(x)
}
def applyEvent(evt: ShoppingCartEvent): ShoppingCart =
evt match {
case ItemUpdated(productId, quantity) => updateItem(productId, quantity)
case CheckedOut => checkout
}
This migration guide will not go into more details related to writing command and event handlers. Refer to the Akka Persistence Typed docs or the section on domain modeling with Akka Persistence Typed for more information.
§Commands
Command
classes are the other set of classes most impacted by the migration. First, a Command
will no longer need to extend the ReplyType[R]
of the Lagom API. That type was used to specify a type R
for the reply produced by the Command
. To specify the type R
of the reply add a replyTo: ActorRef[R]
field in the command.
Before:
sealed trait ShoppingCartCommand[R] extends ReplyType[R]
case class UpdateItem(productId: String, quantity: Int) extends ShoppingCartCommand[Summary]
object UpdateItem {
implicit val format: Format[UpdateItem] = Json.format
}
After:
sealed trait ShoppingCartCommand extends ShoppingCartCommandSerializable
case class UpdateItem(productId: String, quantity: Int, replyTo: ActorRef[Confirmation])
extends ShoppingCartCommand
The replyTo: ActorRef[R]
is necessary to know where to send the response to. It must be added to all command classes and adding it has implication on the serialization of those classes. Make sure to review the Serialization section below and the Serialization pages later in this reference documentation.
§Replies
In Akka Typed, it’s not possible to return an exception to the caller. All communication between the actor and the caller must be done via the replyTo:ActorRef[R]
passed in the command. Therefore, if you want to signal a rejection, you most have it encoded in your reply protocol.
See for example the Confirmation
ADT below:
sealed trait Confirmation
case class Accepted(summary: Summary) extends Confirmation
case class Rejected(reason: String) extends Confirmation
Then, all the command handlers must produce a ReplyEffect
. For operations that don’t mutate the model, use Effect.reply
directly and for operations that persist events use Effect.persist(...).thenReply
to create a ReplyEffect
instance:
private def onCheckout(cmd: Checkout): ReplyEffect[ShoppingCartEvent, ShoppingCart] =
if (items.isEmpty)
Effect.reply(cmd.replyTo)(Rejected("Cannot checkout empty cart"))
else
Effect
.persist(CheckedOut)
.thenReply(cmd.replyTo)(updatedCart => Accepted(toSummary(updatedCart)))
See Modelling Commands and Replies for more details.
§Registration
In order to shard and distribute the EventSourcedBehavior
instances across the cluster you will no longer use Lagom’s persistentEntityRegistry
. Instead, Lagom now provides direct access to clusterSharding
, an instance of Akka’s ClusterSharding
extension you can use to initialize the sharding of EventSourcedBehavior
.
Before, in the ShoppingCartLoader
class we’d use the Lagom provided persistentEntityRegistry
instance to register a macwire
provided instance:
// Register the ShoppingCart persistent entity
persistentEntityRegistry.register(wire[ShoppingCartEntity])
That registration can be removed.
After, we use the Lagom provided clusterSharding
instance to initialize the sharding of the event source Behavior
under the ShoppingCart.typeKey
identifier:
// in Akka Typed, this is the equivalent of Lagom's PersistentEntityRegistry.register
clusterSharding.init(
Entity(ShoppingCart.typeKey) {
ctx => ShoppingCart.behavior(ctx)
}
)
To avoid entityId
collisions across the cluster, initializing the sharding of a Behavior
requires specifying an EntityTypeKey
which acts as a namespacing. The EntityTypeKey
is defined by a name and a type. The type indicates the kind of commands that can be sent to that sharded Behavior
. In our example, we defined typeKey
in object ShoppingCart
:
object ShoppingCart {
val typeKey = EntityTypeKey[ShoppingCartCommand]("ShoppingCartEntity")
def empty: ShoppingCart = ShoppingCart(items = Map.empty)
//#akka-persistence-typed-lagom-tagger-adapter
def behavior(entityContext: EntityContext[ShoppingCartCommand]): Behavior[ShoppingCartCommand] = {
//#akka-persistence-behavior-definition
EventSourcedBehavior
.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
emptyState = ShoppingCart.empty,
commandHandler = (cart, cmd) => cart.applyCommand(cmd),
eventHandler = (cart, evt) => cart.applyEvent(evt)
)
//#akka-persistence-behavior-definition
.withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))
}
//#akka-persistence-typed-lagom-tagger-adapter
implicit val format: Format[ShoppingCart] = Json.format
}
§Sending a command
In order to send commands to your Behavior
instance you will have to obtain a reference to the actor where the Behavior
run and send commands to it.
Before:
/**
* Looks up the shopping cart entity for the given ID.
*/
private def entityRef(id: String) =
persistentEntityRegistry.refFor[ShoppingCartEntity](id)
override def get(id: String): ServiceCall[NotUsed, String] = ServiceCall { _ =>
entityRef(id)
.ask(Get)
.map(cart => asShoppingCartView(id, cart))
}
After:
/**
* Looks up the shopping cart entity for the given ID.
*/
private def entityRef(id: String): EntityRef[ShoppingCartCommand] =
clusterSharding.entityRefFor(ShoppingCart.typeKey, id)
implicit val timeout = Timeout(5.seconds)
override def get(id: String): ServiceCall[NotUsed, String] = ServiceCall { _ =>
entityRef(id)
.ask { reply: ActorRef[Summary] => Get(reply) }
.map { cart => asShoppingCartView(id, cart) }
}
That is, instead of injecting a persistentEntityRegistry
, use a clusterSharding
instance. Instead of getting a PersistentEntityRef[T]
you will obtain an EntityRef[T]
. Both PersistentEntityRef[T]
and EntityRef[T]
provide a method called ask
but their signatures are different. EntityRef[T]
is part of the API of Akka Cluster Sharding and it expects a ActorRef[R] => C
factory method which given a reference to a replyTo
actor of type ActorRef[R]
will produce a command C
(see reply => Get(reply)
in the code above). Then the ask
method also expects an implicit timeout. The result is a Future[R]
with the reply instance produced in the EventSourceBehavior
.
§ Registration: caveats
Even though there is no longer a PersistentEntity
instance to register, the persistentEntityRegistry
is still necessary to build TopicProducer
’s. When writing a Service
implementation that includes a Topic Implementation the TopicProducer
API requires an eventStream
that is provided by the persistentEntityRegistry
. This means that in some cases you will have to inject both the persistentEntityRegistry
and the clusterSharding
.
That is, even if you don’t register a PersistentEntity
, the events produced by Akka Persistence Typed are still compatible with PersistentEntityRegistry.eventStream
as long as they are properly tagged so the projections (Read Sides and Topic Producers) don’t change.
§Maintaining compatibility
Migrating to Akka Persistence Typed requires maintaining compatibility with data previously produced and persisted in the database journal. This requires focusing on three areas: De/Serialization of events (detailed later), PersistenceId
and tagging.
In order to be able to read existing events using Akka Persistence Typed you must use a PersistenceId
that produces an identical persistenceId
string as internally done by Lagom’s PersistenceEntity’s API.
EventSourcedBehavior
.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
emptyState = ShoppingCart.empty,
commandHandler = (cart, cmd) => cart.applyCommand(cmd),
eventHandler = (cart, evt) => cart.applyEvent(evt)
)
The code above uses PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)
. There are three important pieces on that statement that we must review:
- The first argument of
PersistenceId.apply()
must be the same value you used in Lagom Persistence (classic). This first argument is known as thetypeHint
and is used by the journal as a mechanism to avoid ID collision between different types. In Lagom Persistence (classic) the type hint defaults to the classname of yourPersistentEntity
but it can be overwritten (review your code or the persisted data on your database). In our case, we are usingentityContext.entityTypeKey.name
because we defined the type key asEntityTypeKey[ShoppingCartCommand]("ShoppingCartEntity")
where"ShoppingCartEntity"
is the classname of the code we had in the implementation based on Lagom Persistence (classic). - The second argument must be the business id of your Aggregate. In this case, we can use
entityContext.entityId
because we’re using that same business id for the sharded actor. - An (optional) third argument specifying a
separator
. Lagom uses the"|"
as a separator and, sincePersistenceId
also uses"|"
as a default we’re not specifying a separator.
Even if you use the appropriate PersistenceId
, you need to use a compatible serializer for your events. Read more about De/Serialization in the section below.
Finally, only tagged events are readable by Lagom projections (either Read Sides and Topic Producers), and Lagom projections expect event tags to honour certain semantics. Finally, for events to be consumed in the correct order you must keep tagging the events in the same way as in your previous Lagom application.
Lagom provides an AkkaTaggerAdapter
utility class that can be used to convert an existing Lagom AggregateEventTag
to the appropriated tagging function expected by Akka Persistence Typed. When defining the EventSourcedBehavior
specify a tagger using withTagger
with the AkkaTaggerAdapter.fromLagom
:
def behavior(entityContext: EntityContext[ShoppingCartCommand]): Behavior[ShoppingCartCommand] = {
//#akka-persistence-behavior-definition
EventSourcedBehavior
.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
emptyState = ShoppingCart.empty,
commandHandler = (cart, cmd) => cart.applyCommand(cmd),
eventHandler = (cart, evt) => cart.applyEvent(evt)
)
//#akka-persistence-behavior-definition
.withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))
}
§Serialization
All the classes sent over the wire or stored on the database will still need to be serializable. Persisted events need to be read.
Existing code creating and registering serializers is 100% valid except for Command
classes. In Akka Typed, it is required to add a replyTo: ActorRef[Reply]
field on messages that need a reference to reply back. In order to serialize a class that includes an ActorRef[T]
field the class must use the Akka Jackson serializer. Read more on the serialization section of the docs.
To convert your Command
classes to use Akka Jackson serialization instead of Lagom Play-JSON you need to follow these steps:
First,create a marker trait. For example:
/**
* This is a marker trait for shopping cart commands.
* We will serialize them using the Akka Jackson serializer that is able to
* deal with the replyTo field. See application.conf
*/
trait ShoppingCartCommandSerializable
Then, use the regular Akka serialization binding mechanism so all classes extending that trait use the Akka Jackson JSON serializer:
akka.actor {
serialization-bindings {
# commands won't use Lagom Play-Json serializers but Akka Jackson serializers
"com.example.shoppingcart.impl.ShoppingCartCommandSerializable" = jackson-json
}
}
Then, remove all code that’s play-json
from your Command classes and companion objects.
Before:
sealed trait ShoppingCartCommand[R] extends ReplyType[R]
case class UpdateItem(productId: String, quantity: Int) extends ShoppingCartCommand[Summary]
object UpdateItem {
implicit val format: Format[UpdateItem] = Json.format
}
After:
sealed trait ShoppingCartCommand extends ShoppingCartCommandSerializable
case class UpdateItem(productId: String, quantity: Int, replyTo: ActorRef[Confirmation])
extends ShoppingCartCommand
Note how the type of the reply is no longer specified via ReplyType[T]
but as the type of the protocol the replyTo: ActorRef[T]
actor.
And finally, remove all commands from JsonSerialiserRegistry
override def serializers: Seq[JsonSerializer[_]] = Seq(
JsonSerializer[ItemUpdated],
JsonSerializer[CheckedOut.type],
JsonSerializer[UpdateItem],
JsonSerializer[Checkout.type],
JsonSerializer[Get.type],
JsonSerializer[ShoppingCart],
JsonSerializer[ShoppingCartException]
)
override def serializers: Seq[JsonSerializer[_]] = Seq(
// state and events can use play-json, but commands should use jackson because of ActorRef[T] (see application.conf)
JsonSerializer[ShoppingCart],
JsonSerializer[ItemUpdated],
JsonSerializer[CheckedOut.type],
// the replies use play-json as well
JsonSerializer[Summary],
JsonSerializer[Confirmation],
JsonSerializer[Accepted],
JsonSerializer[Rejected]
)