§Advanced Topic: Integrating with Akka
Lagom is built with Akka as one of the underlying technologies. Nonetheless, writing simple Lagom services doesn’t require interacting with Akka directly.
More advanced users may want direct access, as described in this section.
§Usage from Service Implementation
Pretty much everything in Akka is accessible through an ActorSystem
object. You can inject the current ActorSystem
into your service implementations or persistent entities with ordinary dependency injection.
Let’s look at an example of a WorkerService
that accepts job requests and delegates the work to actors running on other nodes in the service’s cluster.
import static akka.pattern.Patterns.ask;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.routing.ClusterRouterGroup;
import akka.cluster.routing.ClusterRouterGroupSettings;
import akka.routing.ConsistentHashingGroup;
import akka.util.Timeout;
public class WorkerServiceImpl implements WorkerService {
private final ActorRef workerRouter;
@Inject
public WorkerServiceImpl(ActorSystem system) {
if (Cluster.get(system).getSelfRoles().contains("worker-node")) {
// start a worker actor on each node that has the "worker-node" role
system.actorOf(Worker.props(), "worker");
}
// start a consistent hashing group router,
// which will delegate jobs to the workers. It is grouping
// the jobs by their task, i.e. jobs with same task will be
// delegated to same worker node
List<String> paths = Arrays.asList("/user/worker");
ConsistentHashingGroup groupConf =
new ConsistentHashingGroup(paths)
.withHashMapper(
msg -> {
if (msg instanceof Job) {
return ((Job) msg).getTask();
} else {
return null;
}
});
Set<String> useRoles = new TreeSet<>();
useRoles.add("worker-node");
Props routerProps =
new ClusterRouterGroup(
groupConf, new ClusterRouterGroupSettings(1000, paths, true, useRoles))
.props();
this.workerRouter = system.actorOf(routerProps, "workerRouter");
}
@Override
public ServiceCall<Job, JobAccepted> doWork() {
return job -> {
// send the job to a worker, via the consistent hashing router
CompletionStage<JobAccepted> reply =
ask(workerRouter, job, Duration.ofSeconds(5))
.thenApply(
ack -> {
return (JobAccepted) ack;
});
return reply;
};
}
}
Notice how the ActorSystem
is injected through the constructor. We create worker actors on each node that has the “worker-node” role. We create a consistent hashing group router that delegates jobs to the workers. Details on these features are in the Akka documentation.
The worker actor looks like this:
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class Worker extends AbstractActor {
public static Props props() {
return Props.create(Worker.class);
}
private final LoggingAdapter log = Logging.getLogger(context().system(), this);
@Override
public Receive createReceive() {
return receiveBuilder().match(Job.class, this::perform).build();
}
private void perform(Job job) {
log.info("Working on job: {}", job);
sender().tell(JobAccepted.of(job.getJobId()), self());
// perform the work...
context().stop(self());
}
}
The messages are ordinary Immutable Objects. Note that they extend Jsonable
since they need proper Serialization when they are sent across nodes in the cluster of the service:
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = Job.class)
public interface AbstractJob extends Jsonable {
@Value.Parameter
public String getJobId();
@Value.Parameter
public String getTask();
@Value.Parameter
public String getPayload();
}
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = JobAccepted.class)
public interface AbstractJobAccepted extends Jsonable {
@Value.Parameter
public String getJobId();
}
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = JobStatus.class)
public interface AbstractJobStatus extends Jsonable {
@Value.Parameter
public String getJobId();
@Value.Parameter
public String getStatus();
}
§Usage of Lagom APIs in Actors
If you need to have access to some Lagom API from an actor, you have two options:
- Pass the Lagom object as an ordinary constructor parameter when creating the actor.
- Use the
AkkaGuiceSupport
from the Play Framework.
The first alternative is probably sufficient in many cases, but we will take a closer look at the more advanced second alternative.
In your Guice module you add AkkaGuiceSupport
and use the bindActor
method, such as:
import play.libs.akka.AkkaGuiceSupport;
import com.google.inject.AbstractModule;
import com.lightbend.lagom.javadsl.server.ServiceGuiceSupport;
public class Worker2Module extends AbstractModule implements ServiceGuiceSupport, AkkaGuiceSupport {
@Override
protected void configure() {
bindService(WorkerService2.class, WorkerService2Impl.class);
bindActor(Worker2.class, "worker");
}
}
That allows the actor itself to receive injected objects. It also allows the actor ref for the actor to be injected into other components. This actor is named worker
and is also qualified with the worker
name for injection.
You can read more about this and how to use dependency injection for child actors in the Play documentation.
Adjusting the Worker
actor from the previous section to allow injection of the PubSubRegistry
:
public class Worker2 extends AbstractActor {
private final PubSubRef<JobStatus> topic;
@Inject
public Worker2(PubSubRegistry pubSub) {
topic = pubSub.refFor(TopicId.of(JobStatus.class, "jobs-status"));
}
@Override
public Receive createReceive() {
return receiveBuilder().match(Job.class, this::perform).build();
}
private void perform(Job job) {
sender().tell(JobAccepted.of(job.getJobId()), self());
topic.publish(JobStatus.of(job.getJobId(), "started"));
// perform the work...
topic.publish(JobStatus.of(job.getJobId(), "done"));
context().stop(self());
}
}
With the PubSubRegistry
we can publish updates of the progress of the jobs to all nodes in the cluster, as described in Publish-Subscribe.
To make the example complete, an adjusted service implementation follows. Worker actors are created not by the service implementation, but by the WorkerModule
. We have also added a status
method that provides a stream of JobStatus
values that clients can listen to.
public class WorkerService2Impl implements WorkerService2 {
private final ActorRef workerRouter;
private final PubSubRef<JobStatus> topic;
@Inject
public WorkerService2Impl(ActorSystem system, PubSubRegistry pubSub) {
// start a consistent hashing group router,
// which will delegate jobs to the workers. It is grouping
// the jobs by their task, i.e. jobs with same task will be
// delegated to same worker node
List<String> paths = Arrays.asList("/user/worker");
ConsistentHashingGroup groupConf =
new ConsistentHashingGroup(paths)
.withHashMapper(
msg -> {
if (msg instanceof Job) {
return ((Job) msg).getTask();
} else {
return null;
}
});
Set<String> useRoles = new TreeSet<String>();
useRoles.add("worker-node");
Props routerProps =
new ClusterRouterGroup(
groupConf, new ClusterRouterGroupSettings(1000, paths, true, useRoles))
.props();
this.workerRouter = system.actorOf(routerProps, "workerRouter");
this.topic = pubSub.refFor(TopicId.of(JobStatus.class, "jobs-status"));
}
@Override
public ServiceCall<NotUsed, Source<JobStatus, ?>> status() {
return req -> {
return CompletableFuture.completedFuture(topic.subscriber());
};
}
@Override
public ServiceCall<Job, JobAccepted> doWork() {
return job -> {
// send the job to a worker, via the consistent hashing router
CompletionStage<JobAccepted> reply =
ask(workerRouter, job, Duration.ofSeconds(5))
.thenApply(
ack -> {
return (JobAccepted) ack;
});
return reply;
};
}
}