§Advanced Topic: Integrating with Akka
Lagom is built with Akka as one of the underlying technologies. Nonetheless, writing simple Lagom services generally won’t require interacting with Akka directly.
More advanced users may want direct access, as described in this section.
§Usage from Service Implementation
Pretty much everything in Akka is accessible through an ActorSystem
object. You can inject the current ActorSystem
into your service implementations or persistent entities with ordinary dependency injection.
Let’s look at an example of a WorkerService
that accepts job requests and delegates the work to actors running on other nodes in the service’s cluster.
import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.routing.ClusterRouterGroup
import akka.cluster.routing.ClusterRouterGroupSettings
import akka.routing.ConsistentHashingGroup
import akka.pattern.ask
import akka.util.Timeout
import com.lightbend.lagom.scaladsl.api.ServiceCall
import scala.concurrent.duration._
class WorkerServiceImpl(system: ActorSystem) extends WorkerService {
if (Cluster.get(system).selfRoles("worker-node")) {
// start a worker actor on each node that has the "worker-node" role
system.actorOf(Worker.props, "worker")
}
// start a consistent hashing group router,
// which will delegate jobs to the workers. It is grouping
// the jobs by their task, i.e. jobs with same task will be
// delegated to same worker node
val workerRouter = {
val paths = List("/user/worker")
val groupConf = ConsistentHashingGroup(paths, hashMapping = {
case Job(_, task, _) => task
})
val routerProps = ClusterRouterGroup(
groupConf,
ClusterRouterGroupSettings(
totalInstances = 1000,
routeesPaths = paths,
allowLocalRoutees = true,
useRoles = Set("worker-node")
)
).props
system.actorOf(routerProps, "workerRouter")
}
def doWork = ServiceCall { job =>
implicit val timeout = Timeout(5.seconds)
(workerRouter ? job).mapTo[JobAccepted]
}
}
Notice how the ActorSystem
is injected through the constructor. We create worker actors on each node that has the “worker-node” role. We create a consistent hashing group router that delegates jobs to the workers. Details on these features are in the Akka documentation.
The worker actor looks like this:
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
object Worker {
def props = Props[Worker]
}
class Worker extends Actor {
private val log = Logging.getLogger(context.system, this)
override def receive = {
case job @ Job(id, task, payload) =>
log.info("Working on job: {}", job)
sender ! JobAccepted(id)
// perform the work...
context.stop(self)
}
}
The messages are ordinary case classes. Note that they extend Jsonable
since they need proper Serialization when they are sent across nodes in the cluster of the service, and the have formats created for them:
import play.api.libs.json.Format
import play.api.libs.json.Json
case class Job(jobId: String, task: String, payload: String)
object Job {
implicit val format: Format[Job] = Json.format
}
case class JobAccepted(jobId: String)
object JobAccepted {
implicit val format: Format[JobAccepted] = Json.format
}
These formats needed to be added to the serialization registry, as described in the cluster serialization documentation.
§Updating Akka version
If you want to use a newer version of Akka, one that is not used by Lagom yet, you can add the following to your build.sbt
file:
// The newer Akka version you want to use.
val akkaVersion = "2.6.0-RC2"
// Akka dependencies used by Lagom
dependencyOverrides ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-remote" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-coordination" % akkaVersion,
"com.typesafe.akka" %% "akka-discovery" % akkaVersion,
"com.typesafe.akka" %% "akka-distributed-data" % akkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-protobuf-v3" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
// Use "sbt-dependency-graph" or any other dependency report generator to
// make sure you add all the necessary dependencies on this list
)
Of course, other Akka artifacts can be added transitively. Use sbt-dependency-graph to better inspect your build and check which ones you need to add explicitly.
Note: When doing such updates, keep in mind that you need to follow Akka’s Binary Compatibility Rules. And if you are manually adding other Akka artifacts, remember to keep the version of all the Akka artifacts consistent since mixed versioning is not allowed.
§Adding other Akka dependencies
If you want to use Akka artifacts that are not added transtively by Lagom, you can use com.lightbend.lagom.core.LagomVersions.akka
to ensure all the artifacts will use a consistent version. For example:
import com.lightbend.lagom.core.LagomVersion
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-typed" % LagomVersion.akka
)
Note: When resolving dependencies, sbt will get the newest one declared for this project or added transitively. It means that if Play depends on a newer Akka (or Akka HTTP) version than the one you are declaring, Play version wins. See more details about how sbt does evictions here.