Integrating with Akka

§Advanced Topic: Integrating with Akka

Lagom is built with Akka as one of the underlying technologies. Nonetheless, writing simple Lagom services doesn’t require interacting with Akka directly.

More advanced users may want direct access, as described in this section.

§Usage from Service Implementation

Pretty much everything in Akka is accessible through an ActorSystem object. You can inject the current ActorSystem into your service implementations or persistent entities with ordinary dependency injection.

Let’s look at an example of a WorkerService that accepts job requests and delegates the work to actors running on other nodes in the service’s cluster.

import static akka.pattern.PatternsCS.ask;

import com.lightbend.lagom.javadsl.api.ServiceCall;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.routing.ClusterRouterGroup;
import akka.cluster.routing.ClusterRouterGroupSettings;
import akka.routing.ConsistentHashingGroup;
import akka.util.Timeout;

public class WorkerServiceImpl implements WorkerService {

  private final ActorRef workerRouter;

  @Inject
  public WorkerServiceImpl(ActorSystem system) {
    if (Cluster.get(system).getSelfRoles().contains("worker-node")) {
      // start a worker actor on each node that has the "worker-node" role
      system.actorOf(Worker.props(), "worker");
    }

    // start a consistent hashing group router,
    // which will delegate jobs to the workers. It is grouping
    // the jobs by their task, i.e. jobs with same task will be
    // delegated to same worker node
    List<String> paths = Arrays.asList("/user/worker");
    ConsistentHashingGroup groupConf = new ConsistentHashingGroup(paths)
      .withHashMapper(msg -> {
        if (msg instanceof Job) {
          return ((Job) msg).getTask();
        } else {
          return null;
        }
      });
      Props routerProps = new ClusterRouterGroup(groupConf,
        new ClusterRouterGroupSettings(1000, paths,
          true, "worker-node")).props();
    this.workerRouter = system.actorOf(routerProps, "workerRouter");
  }

  @Override
  public ServiceCall<Job, JobAccepted> doWork() {
    return job -> {
      // send the job to a worker, via the consistent hashing router
      CompletionStage<JobAccepted> reply = ask(workerRouter, job, Timeout.apply(
          5, TimeUnit.SECONDS))
        .thenApply(ack -> {
          return (JobAccepted) ack;
        });
      return reply;
    };
  }
}

Notice how the ActorSystem is injected through the constructor. We create worker actors on each node that has the “worker-node” role. We create a consistent hashing group router that delegates jobs to the workers. Details on these features are in the Akka documentation.

The worker actor looks like this:

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;

public class Worker extends AbstractActor {

  public static Props props() {
    return Props.create(Worker.class);
  }

  private final LoggingAdapter log = Logging.getLogger(context().system(), this);

  public Worker() {
    receive(ReceiveBuilder.
      match(Job.class, this::perform)
      .build()
    );
  }

  private void perform(Job job) {
    log.info("Working on job: {}", job);
    sender().tell(JobAccepted.of(job.getJobId()), self());
    // perform the work...
    context().stop(self());
  }

}

The messages are ordinary Immutable Objects. Note that they extend Jsonable since they need proper Serialization when they are sent across nodes in the cluster of the service:

@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = Job.class)
public interface AbstractJob extends Jsonable {
  @Value.Parameter
  public String getJobId();

  @Value.Parameter
  public String getTask();

  @Value.Parameter
  public String getPayload();
}
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = JobAccepted.class)
public interface AbstractJobAccepted extends Jsonable {
  @Value.Parameter
  public String getJobId();
}
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = JobStatus.class)
public interface AbstractJobStatus extends Jsonable {
  @Value.Parameter
  public String getJobId();

  @Value.Parameter
  public String getStatus();
}

§Usage of Lagom APIs in Actors

If you need to have access to some Lagom API from an actor, you have two options:

  1. Pass the Lagom object as an ordinary constructor parameter when creating the actor.
  2. Use the AkkaGuiceSupport from the Play Framework.

The first alternative is probably sufficient in many cases, but we will take a closer look at the more advanced second alternative.

In your Guice module you add AkkaGuiceSupport and use the bindActor method, such as:

import play.libs.akka.AkkaGuiceSupport;
import com.google.inject.AbstractModule;
import com.lightbend.lagom.javadsl.server.ServiceGuiceSupport;

public class Worker2Module extends AbstractModule
    implements ServiceGuiceSupport, AkkaGuiceSupport {

  @Override
  protected void configure() {
    bindService(WorkerService2.class, WorkerService2Impl.class);

    bindActor(Worker2.class, "worker");
  }
}

That allows the actor itself to receive injected objects. It also allows the actor ref for the actor to be injected into other components. This actor is named worker and is also qualified with the worker name for injection.

You can read more about this and how to use dependency injection for child actors in the Play documentation.

Adjusting the Worker actor from the previous section to allow injection of the PubSubRegistry:

public class Worker2 extends AbstractActor {

  private final PubSubRef<JobStatus> topic;

  @Inject
  public Worker2(PubSubRegistry pubSub) {
    topic = pubSub.refFor(TopicId.of(JobStatus.class, "jobs-status"));

    receive(ReceiveBuilder.
        match(Job.class, this::perform)
        .build()
      );
  }

  private void perform(Job job) {
    sender().tell(JobAccepted.of(job.getJobId()), self());
    topic.publish(JobStatus.of(job.getJobId(), "started"));
    // perform the work...
    topic.publish(JobStatus.of(job.getJobId(), "done"));
    context().stop(self());
  }

}

With the PubSubRegistry we can publish updates of the progress of the jobs to all nodes in the cluster, as described in Publish-Subscribe.

To make the example complete, an adjusted service implementation follows. Worker actors are created not by the service implementation, but by the WorkerModule. We have also added a status method that provides a stream of JobStatus values that clients can listen to.

public class WorkerService2Impl implements WorkerService2 {

  private final ActorRef workerRouter;
  private final PubSubRef<JobStatus> topic;

  @Inject
  public WorkerService2Impl(ActorSystem system, PubSubRegistry pubSub) {
    // start a consistent hashing group router,
    // which will delegate jobs to the workers. It is grouping
    // the jobs by their task, i.e. jobs with same task will be
    // delegated to same worker node
    List<String> paths = Arrays.asList("/user/worker");
    ConsistentHashingGroup groupConf = new ConsistentHashingGroup(paths)
      .withHashMapper(msg -> {
        if (msg instanceof Job) {
          return ((Job) msg).getTask();
        } else {
          return null;
        }
      });
      Props routerProps = new ClusterRouterGroup(groupConf,
        new ClusterRouterGroupSettings(1000, paths,
          true, "worker-node")).props();
    this.workerRouter = system.actorOf(routerProps, "workerRouter");

    this.topic = pubSub.refFor(TopicId.of(JobStatus.class, "jobs-status"));
  }

  @Override
  public ServiceCall<NotUsed, Source<JobStatus, ?>> status() {
    return req -> {
      return CompletableFuture.completedFuture(topic.subscriber());
    };
  }

  @Override
  public ServiceCall<Job, JobAccepted> doWork() {
    return job -> {
      // send the job to a worker, via the consistent hashing router
      CompletionStage<JobAccepted> reply = ask(workerRouter, job, Timeout.apply(
          5, TimeUnit.SECONDS))
        .thenApply(ack -> {
          return (JobAccepted) ack;
        });
      return reply;
    };
  }
}

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.