§Migrating to Akka Persistence Typed
With the support for Akka Persistence Typed in Lagom it is possible to migrate existing code from Lagom Persistence (classic) to Akka Persistence Typed. There’s a few steps to consider in order to be able to read an existing journal.
Note: the only limitation when migrating from from Lagom Persistence (classic) to Akka Persistence Typed is that a full cluster shutdown is required. Even though all durable data is compatible, Lagom Persistence (classic) and Akka Persistence Typed can’t coexist.
Before you start, make sure you have read the page Domain Modelling with Akka Persistence Typed and you understand how to model a domain using Akka Persistence Typed.
§Migrating the model
Similarly to Lagom’s Persistent Entity, in Akka Persistence Typed you must create a class extending EventSourcedBehaviorWithEnforcedReplies
.
public class ShoppingCartEntity
extends EventSourcedBehaviorWithEnforcedReplies<ShoppingCartCommand, ShoppingCartEvent, ShoppingCart>
The EventSourcedBehaviorWithEnforcedReplies
abstract class requires you to define the following:
- a
PersistenceId persistenceId
, to be passed to thesuper
in its constructor - an
emptyState()
method returning theState
before any event was ever persisted - a
commandHandler()
method to handle the commands, persist events and return a reply - a
eventHandler()
method to handle events and mutate theState
This migration guide will not go into more details related to writing command and event handlers. Refer to the Akka Persistence Typed docs or the section on Domain Modelling with Akka Persistence Typed for more information.
§Commands
Command
classes are the other set of classes most impacted by the migration. First, a Command
will no longer need to extend the PersistentEntity.ReplyType<R>
of the Lagom API. That type was used to specify a type R
for the reply produced by the Command
. To specify the type R
of the reply add a ActorRef<R> replyTo
field in the command.
Before:
final class UpdateItem implements ShoppingCartCommand, CompressedJsonable,
PersistentEntity.ReplyType<Summary> {
public final String productId;
public final int quantity;
@JsonCreator
UpdateItem(String productId, int quantity) {
this.productId = Preconditions.checkNotNull(productId, "productId");
this.quantity = quantity;
}
}
After:
final class UpdateItem implements ShoppingCartCommand, CompressedJsonable {
public final String productId;
public final int quantity;
public final ActorRef<Confirmation> replyTo;
@JsonCreator
UpdateItem(String productId, int quantity, ActorRef<Confirmation> replyTo) {
this.productId = Preconditions.checkNotNull(productId, "productId");
this.quantity = quantity;
this.replyTo = replyTo;
}
}
The ActorRef<R> replyTo
is necessary to know where to send the response to. It must be added to all command classes and adding it has implications on the serialization of those classes. Make sure to review the Serialization pages later in this reference documentation.
§Replies
In Akka Typed, it’s not possible to return an exception to the caller. All communication between the actor and the caller must be done via the ActorRef<R> replyTo
passed in the command. Therefore, if you want to signal a rejection, you most have it encoded in your reply protocol.
See for example the Confirmation
ADT below:
interface Confirmation {}
class Accepted implements Confirmation {
public final Summary summary;
@JsonCreator
public Accepted(Summary summary) {
this.summary = summary;
}
}
class Rejected implements Confirmation {
public final String reason;
@JsonCreator
public Rejected(String reason) {
this.reason = reason;
}
}
Then, all the command handlers must produce a ReplyEffect
. For operations that don’t mutate the model, use Effect().reply
directly and for operations that persist events use Effect().persist(...).thenReply
to create a ReplyEffect
instance:
builder.forState(ShoppingCart::open)
.onCommand(UpdateItem.class, (state, cmd) -> {
if (cmd.quantity < 0) {
return Effect().reply(cmd.replyTo, new ShoppingCartCommand.Rejected("Quantity must be greater than zero"));
} else if (cmd.quantity == 0 && !state.items.containsKey(cmd.productId)) {
return Effect().reply(cmd.replyTo, new ShoppingCartCommand.Rejected("Cannot delete item that is not already in cart"));
} else {
return Effect()
.persist(new ItemUpdated(entityId, cmd.productId, cmd.quantity))
.thenReply(cmd.replyTo, updatedCart -> new ShoppingCartCommand.Accepted(toSummary(updatedCart)));
}
})
See Modelling Commands and Replies for more details.
§Registration
In order to shard and distribute the EventSourcedBehavior
instances across the cluster you will no longer use Lagom’s persistentEntityRegistry
. Instead, Lagom now provides direct access to clusterSharding
, an instance of Akka’s ClusterSharding
extension you can use to initialize the sharding of EventSourcedBehavior
.
Before, in the ShoppingCartServiceImpl
class we’d use the Lagom provided persistentEntityRegistry
instance to register a Guice provided instance:
@Inject
public ShoppingCartServiceImpl(PersistentEntityRegistry persistentEntityRegistry,
ReportRepository reportRepository) {
this.persistentEntityRegistry = persistentEntityRegistry;
this.reportRepository = reportRepository;
persistentEntityRegistry.register(ShoppingCartEntity.class);
}
That registration can be removed.
After, we use the Lagom provided clusterSharding
instance to initialize the sharding of the event source Behavior
under the ShoppingCartEntity.typeKey
identifier:
@Inject
public ShoppingCartServiceImpl(ClusterSharding clusterSharding,
ReportRepository reportRepository) {
this.clusterSharding = clusterSharding;
this.reportRepository = reportRepository;
// register entity on shard
this.clusterSharding.init(
Entity.of(
ShoppingCartEntity.ENTITY_TYPE_KEY,
ShoppingCartEntity::behavior
)
);
}
To avoid entityId
collisions across the cluster, initializing the sharding of a Behavior
requires specifying an EntityTypeKey
which acts as a namespacing. The EntityTypeKey
is defined by a name and a type. The type indicates the kind of commands that can be sent to that sharded Behavior
. In our example, we defined typeKey
as a static field in the ShoppingCartEntity
class:
public static EntityTypeKey<ShoppingCartCommand> ENTITY_TYPE_KEY =
EntityTypeKey
.create(ShoppingCartCommand.class, "ShoppingCartEntity");
§Sending a command
In order to send commands to your Behavior
instance you will have to obtain a reference to the actor where the Behavior
is running and send commands to it.
Before:
private PersistentEntityRef<ShoppingCartCommand> entityRef(String id) {
return persistentEntityRegistry.refFor(ShoppingCartEntity.class, id);
}
@Override
public ServiceCall<NotUsed, String> get(String id) {
return request ->
entityRef(id)
.ask(ShoppingCartCommand.Get.INSTANCE)
.thenApply(cart -> asShoppingCartView(id, cart));
}
After:
private EntityRef<ShoppingCartCommand> entityRef(String id) {
return clusterSharding.entityRefFor(ShoppingCartEntity.ENTITY_TYPE_KEY, id);
}
@Override
public ServiceCall<NotUsed, String> get(String id) {
return request ->
entityRef(id)
.<Summary>ask(replyTo -> new ShoppingCartCommand.Get(replyTo), askTimeout)
.thenApply(cart -> asShoppingCartView(id, cart));
}
That is, instead of injecting a persistentEntityRegistry
, use a clusterSharding
instance. Instead of getting a PersistentEntityRef<T>
you will obtain an EntityRef<T>
. Both PersistentEntityRef<T>
and EntityRef<T>
provide a method called ask
but their signatures are different. EntityRef<T>
is part of the API of Akka Cluster Sharding and it expects a ActorRef<R> -> C
factory method which given a reference to a replyTo
actor of type ActorRef<R>
will produce a command C
(see reply -> Get(reply)
in the code above). Then the ask
method also expects an implicit timeout. The result is a CompletionStage<R>
with the reply instance produced in the EventSourceBehavior
.
§ Registration: caveats
Even though there is no longer a PersistentEntity
instance to register, the persistentEntityRegistry
is still necessary to build TopicProducer
’s. When writing a Service
implementation that includes a Topic Implementation the TopicProducer
API requires an eventStream
that is provided by the persistentEntityRegistry
. This means that in some cases you will have to inject both the persistentEntityRegistry
and the clusterSharding
.
That is, even if you don’t register a PersistentEntity
, the events produced by Akka Persistence Typed are still compatible with PersistentEntityRegistry.eventStream
as long as they are properly tagged so the projections (Read Sides and Topic Producers) don’t change.
§Maintaining compatibility
Migrating to Akka Persistence Typed requires maintaining compatibility with data previously produced and persisted in the database journal. This requires focusing on three areas: Serialization of events, PersistenceId
and tagging.
In order to be able to read existing events using Akka Persistence Typed you must use a PersistenceId
that produces an identical persistenceId
string as internally done by Lagom’s PersistenceEntity’s API.
ShoppingCartEntity(EntityContext<ShoppingCartCommand> entityContext) {
super(
PersistenceId.of(
entityContext.getEntityTypeKey().name(),
entityContext.getEntityId(),
"" // separator must be an empty String - Lagom Java doesn't have a separator
)
);
this.entityContext = entityContext;
this.entityId = entityContext.getEntityId();
}
The code above uses PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId, "")
. There are three important pieces on that statement that we must review:
- The first constructor argument must be the same value you used in Lagom Persistence (classic). This first argument is known as the
typeHint
and is used by the journal as a mechanism to avoid ID collision between different types. In Lagom Persistence (classic) the type hint defaults to the classname of yourPersistentEntity
but it can be overwritten (review your code or the persisted data on your database). In our case, we are usingentityContext.entityTypeKey.name
because we defined the type key asEntityTypeKey.create(ShoppingCartCommand.class, "ShoppingCartEntity")
where"ShoppingCartEntity"
is the classname of the code we had in the implementation based on Lagom Persistence Classic. - The second argument must be the business id of your Aggregate. In this case, we can use
entityContext.entityId
because we’re using that same business id for the sharded actor. - The third argument specifying a
separator
. Lagom Persistence Classic uses the an empty string""
as a separator. When using Akka Persistence Typed we must explicitly set it to""
for the Java API. The Akka default is"|"
.
Even if you use the appropriate PersistenceId
, you need to use a compatible serializer for your events. Read more about Serialization in this reference documentation..
Finally, only tagged events are readable by Lagom projections (either Read Sides and Topic Producers), and Lagom projections expect event tags to honour certain semantics. Finally, for events to be consumed in the correct order you must keep tagging the events in the same way as in your previous Lagom application.
Lagom provides an AkkaTaggerAdapter
utility class that can be used to convert an existing Lagom AggregateEventTag
to the appropriated tagging function expected by Akka Persistence Typed. When defining the EventSourcedBehavior
specify a tagger by overriding the tagsFor()
method and use the utility AkkaTaggerAdapter.fromLagom
to convert your existing event tag:
@Override
public Set<String> tagsFor(ShoppingCartEvent shoppingCartEvent) {
return AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.TAG).apply(shoppingCartEvent);
}