§Publish-Subscribe
Publish–subscribe is a well known messaging pattern. Senders of messages, called publishers, do not target the messages directly to specific receivers, but instead publish messages to topics without knowledge of which receivers, called subscribers, if any, there may be. Similarly, a subscriber express interest in a topic and receive messages published to that topic, without knowledge of which publishers, if any, there are.
§Dependency
To use this feature add the following in your project’s build:
libraryDependencies += lagomScaladslPubSub
§Usage from Service Implementation
Let’s look at an example of a service that publishes temperature measurements of hardware devices. A device can submit its current temperature and interested parties can get a stream of the temperature samples.
The service API is defined as:
trait SensorService extends Service {
def registerTemperature(id: String): ServiceCall[Temperature, NotUsed]
def temperatureStream(id: String): ServiceCall[NotUsed, Source[Temperature, NotUsed]]
def descriptor = {
import Service._
named("/sensorservice").withCalls(
pathCall("/device/:id/temperature", registerTemperature _),
pathCall("/device/:id/temperature/stream", temperatureStream _)
)
}
}
The implementation of this interface looks like:
import com.lightbend.lagom.scaladsl.pubsub.PubSubRegistry
import com.lightbend.lagom.scaladsl.pubsub.TopicId
class SensorServiceImpl(pubSub: PubSubRegistry) extends SensorService {
def registerTemperature(id: String) = ServiceCall { temperature =>
val topic = pubSub.refFor(TopicId[Temperature](id))
topic.publish(temperature)
Future.successful(NotUsed.getInstance())
}
def temperatureStream(id: String) = ServiceCall { _ =>
val topic = pubSub.refFor(TopicId[Temperature](id))
Future.successful(topic.subscriber)
}
}
When a device submits its current temperature it is published to a topic that is unique for that device. Note that the topic where the message is published to is defined by the message class, here Temperature
, and an optional classifier, here the device id. The messages of this topic will be instances of the message class or subclasses thereof. The qualifier can be used to distinguish topics that are using the same message class. The empty string can be used as qualifier if the message class is enough to define the topic identity.
Use the method publish
of the PubSubRef representing a given topic to publish a single message, see registerTemperature
in the above code.
Use the method subscriber
of the PubSubRef to acquire a stream Source
of messages published to a given topic, see temperatureStream
in the above code.
It is also possible to publish a stream of messages to a topic as is illustrated by this variant of the SensorService
:
import akka.stream.Materializer
class SensorServiceImpl(pubSub: PubSubRegistry)(implicit materializer: Materializer) extends SensorService {
def registerTemperature(id: String) = ServiceCall { temperatures =>
val topic = pubSub.refFor(TopicId[Temperature](id))
temperatures.runWith(topic.publisher)
Future.successful(NotUsed.getInstance())
}
def temperatureStream(id: String) = ServiceCall { _ =>
val topic = pubSub.refFor(TopicId[Temperature](id))
Future.successful(topic.subscriber)
}
}
Note how the incoming Source
in registerTemperature
is connected to the publisher
Sink
of the topic with the runWith
method. Also note that we now have an implicit Materializer
injected into the constructor, this is needed when running a stream. You can of course apply ordinary stream transformations of the incoming stream before connecting it to the publisher
.
§Usage from Persistent Entity
You can publish messages from a Persistent Entity. First you must inject the PubSubRegistry to get hold of a PubSubRef
for a given topic.
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity
import com.lightbend.lagom.scaladsl.pubsub.PubSubRegistry
import com.lightbend.lagom.scaladsl.pubsub.TopicId
final class Post(pubSubRegistry: PubSubRegistry) extends PersistentEntity {
private val publishedTopic = pubSubRegistry.refFor(TopicId[PostPublished])
A command handler that publishes messages, in this case the PostPublished
event, may look like this:
.onCommand[Publish.type, Done] {
case (Publish, ctx, state) =>
ctx.thenPersist(PostPublished(entityId)) { evt =>
ctx.reply(Done)
publishedTopic.publish(evt)
}
}
To complete the picture, a service method that delivers these PostPublished
events as a stream:
class BlogServiceImpl(pubSubRegistry: PubSubRegistry) extends BlogService {
private val publishedTopic = pubSubRegistry.refFor(TopicId[PostPublished])
override def publishedStream = ServiceCall { _ =>
Future.successful(publishedTopic.subscriber)
}
}
§Limitations
This feature is specifically for providing publish and subscribe functionality within a single services cluster. To publish and subscribe between services, you should instead use Lagom’s message broker support.
Published messages may be lost. For example in case of networks problems messages might not be delivered to all subscribers. Future version of Lagom may include intra-service pub-sub with at-least-once delivery, in the meantime you can achieve at-least-once delivery by using Lagom’s message broker support.
Note that anytime you fallback to message broker support you will expose your messages via a public topic making them part of your public API.
The registry of subscribers is eventually consistent, i.e. new subscribers are not immediately visible at other nodes, but typically the information will be fully replicated to all other nodes after a few seconds.
§Serialization
The published messages must be serializable since they will be sent across the nodes in the cluster of the service. JSON is the recommended serialization format for these messages. The Serialization section describes how to register serializers for the messages.
§Underlying Implementation
It is implemented with Akka Distributed Publish Subscribe.