§Advanced Topic: Integrating with Akka
Lagom is built with Akka as one of the underlying technologies. Nonetheless, writing simple Lagom services generally won’t require interacting with Akka directly.
More advanced users may want direct access, as described in this section.
§Usage from Service Implementation
Most Akka functions are accessible through an ActorSystem
object. You can inject the current ActorSystem
into your service implementations or persistent entities with ordinary dependency injection.
Let’s look at an example of a WorkerService
that accepts job requests and delegates the work to actors running on other nodes in the service’s cluster.
import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.routing.{ClusterRouterGroup, ClusterRouterGroupSettings}
import akka.routing.ConsistentHashingGroup
import akka.pattern.ask
import akka.util.Timeout
import com.lightbend.lagom.scaladsl.api.ServiceCall
import scala.concurrent.duration._
class WorkerServiceImpl(system: ActorSystem) extends WorkerService {
if (Cluster.get(system).selfRoles("worker-node")) {
// start a worker actor on each node that has the "worker-node" role
system.actorOf(Worker.props, "worker")
}
// start a consistent hashing group router,
// which will delegate jobs to the workers. It is grouping
// the jobs by their task, i.e. jobs with same task will be
// delegated to same worker node
val workerRouter = {
val paths = List("/user/worker")
val groupConf = ConsistentHashingGroup(paths, hashMapping = {
case Job(_, task, _) => task
})
val routerProps = ClusterRouterGroup(groupConf,
ClusterRouterGroupSettings(
totalInstances = 1000,
routeesPaths = paths,
allowLocalRoutees = true,
useRole = Some("worker-node")
)
).props
system.actorOf(routerProps, "workerRouter")
}
def doWork = ServiceCall { job =>
implicit val timeout = Timeout(5.seconds)
(workerRouter ? job).mapTo[JobAccepted]
}
}
Notice how the ActorSystem
is injected through the constructor. We create worker actors on each node that has the “worker-node” role. We create a consistent hashing group router that delegates jobs to the workers. Details on these features are in the Akka documentation.
The worker actor looks like this:
import akka.actor.{AbstractActor, Props}
import akka.event.Logging
object Worker {
def props = Props[Worker]
}
class Worker extends AbstractActor {
private val log = Logging.getLogger(context.system, this)
override def receive = {
case job @ Job(id, task, payload) =>
log.info("Working on job: {}", job)
sender ! JobAccepted(id)
// perform the work...
context.stop(self)
}
}
The messages are ordinary case classes. Note that they extend Jsonable
since they need proper Serialization when they are sent across nodes in the cluster of the service, and the have formats created for them:
import play.api.libs.json.{Format, Json}
case class Job(jobId: String, task: String, payload: String)
object Job {
implicit val format: Format[Job] = Json.format
}
case class JobAccepted(jobId: String)
object JobAccepted {
implicit val format: Format[JobAccepted] = Json.format
}
These formats needed to be added to the serialization registry, as described in the cluster serialization documentation.