§Publish-Subscribe
Publish–subscribe is a well known messaging pattern. Senders of messages, called publishers, do not target the messages directly to specific receivers, but instead publish messages to topics without knowledge of which receivers, called subscribers, if any, there may be. Similarly, a subscriber express interest in a topic and receive messages published to that topic, without knowledge of which publishers, if any, there are.
§Dependency
To use this feature add the following in your project’s build:
In Maven:
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-pubsub_2.11</artifactId>
<version>${lagom.version}</version>
</dependency>
In sbt:
libraryDependencies += lagomJavadslPubSub
§Usage from Service Implementation
Let’s look at an example of a service that publishes temperature measurements of hardware devices. A device can submit its current temperature and interested parties can get a stream of the temperature samples.
The service API is defined as:
public interface SensorService extends Service {
ServiceCall<Temperature, NotUsed> registerTemperature(String id);
ServiceCall<NotUsed, Source<Temperature, ?>> temperatureStream(String id);
@Override
default Descriptor descriptor() {
return named("/sensorservice").withCalls(
pathCall("/device/:id/temperature", this::registerTemperature),
pathCall("/device/:id/temperature/stream", this::temperatureStream)
);
}
}
The implementation of this interface looks like:
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.pubsub.PubSubRef;
import com.lightbend.lagom.javadsl.pubsub.PubSubRegistry;
import com.lightbend.lagom.javadsl.pubsub.TopicId;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;
public class SensorServiceImpl implements SensorService {
private final PubSubRegistry pubSub;
@Inject
public SensorServiceImpl(PubSubRegistry pubSub) {
this.pubSub = pubSub;
}
@Override
public ServiceCall<Temperature, NotUsed> registerTemperature(String id) {
return temperature -> {
final PubSubRef<Temperature> topic =
pubSub.refFor(TopicId.of(Temperature.class, id));
topic.publish(temperature);
return CompletableFuture.completedFuture(NotUsed.getInstance());
};
}
@Override
public ServiceCall<NotUsed, Source<Temperature, ?>> temperatureStream(String id) {
return request -> {
final PubSubRef<Temperature> topic =
pubSub.refFor(TopicId.of(Temperature.class, id));
return CompletableFuture.completedFuture(topic.subscriber());
};
}
}
When a device submit its current temperature it is published to a topic that is unique for that device. Note that the topic where the message is published to is defined by the message class, here Temperature
, and an optional classifier, here the device id. The messages of this topic will be instances of the message class or subclasses thereof. The qualifier can be used to distinguish topics that are using the same message class. The empty string can be used as qualifier if the message class is enough to define the topic identity.
Use the method publish
of the PubSubRef representing a given topic to publish a single message, see registerTemperature
in the above code.
Use the method subscriber
of the PubSubRef to acquire a stream Source
of messages published to a given topic, see temperatureStream
in the above code.
It is also possible to publish a stream of messages to a topic as is illustrated by this variant of the SensorService
:
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.pubsub.PubSubRef;
import com.lightbend.lagom.javadsl.pubsub.PubSubRegistry;
import com.lightbend.lagom.javadsl.pubsub.TopicId;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
public class SensorServiceImpl2 implements SensorService2 {
private final PubSubRegistry pubSub;
private final Materializer materializer;
@Inject
public SensorServiceImpl2(PubSubRegistry pubSub, Materializer mat) {
this.pubSub = pubSub;
this.materializer = mat;
}
@Override
public ServiceCall<Source<Temperature, ?>, NotUsed> registerTemperatures(String id) {
return request -> {
final PubSubRef<Temperature> topic =
pubSub.refFor(TopicId.of(Temperature.class, id));
request.runWith(topic.publisher(), materializer);
return CompletableFuture.completedFuture(NotUsed.getInstance());
};
}
@Override
public ServiceCall<NotUsed, Source<Temperature, ?>> temperatureStream(String id) {
return request -> {
final PubSubRef<Temperature> topic =
pubSub.refFor(TopicId.of(Temperature.class, id));
return CompletableFuture.completedFuture(topic.subscriber());
};
}
}
Note how the incoming Source
in registerTemperature
is connected to the publisher
Sink
of the topic with the runWith
method using the Materializer
that is injected in the constructor. You can of course apply ordinary stream transformations of the incoming stream before connecting it to the publisher
.
§Usage from Persistent Entity
You can publish messages from a Persistent Entity. First you must inject the PubSubRegistry to get hold of a PubSubRef
for a given topic.
private final PubSubRef<PostPublished> publishedTopic;
@Inject
public Post4(PubSubRegistry pubSub) {
publishedTopic = pubSub.refFor(TopicId.of(PostPublished.class, ""));
}
A command handler that publishes messages, in this case the PostPublished
event, may look like this:
b.setCommandHandler(Publish.class,
(cmd, ctx) -> ctx.thenPersist(PostPublished.of(entityId()), evt -> {
ctx.reply(Done.getInstance());
publishedTopic.publish(evt);
}));
To complete the picture, a service method that delivers these PostPublished
events as a stream:
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.pubsub.PubSubRef;
import com.lightbend.lagom.javadsl.pubsub.PubSubRegistry;
import com.lightbend.lagom.javadsl.pubsub.TopicId;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;
public class BlogServiceImpl4 implements BlogService4 {
private final PubSubRef<PostPublished> publishedTopic;
@Inject
public BlogServiceImpl4(PubSubRegistry pubSub) {
publishedTopic = pubSub.refFor(TopicId.of(PostPublished.class, ""));
}
@Override
public ServiceCall<NotUsed, Source<PostPublished, ?>> getNewPosts() {
return request ->
CompletableFuture.completedFuture(publishedTopic.subscriber());
}
}
§Limitations
It is only possible to publish and subscribe within a single service. It is not possible to publish messages in one service and subscribe to them in another service. We will probably provide publish-subscribe across services in a future release of Lagom.
Published messages may be lost. For example in case of networks problems messages might not be delivered to all subscribers. We might provide publish-subscribe with at-least-once delivery in a future release of Lagom.
The registry of subscribers is eventually consistent, i.e. new subscribers are not immediately visible at other nodes, but typically the information will be fully replicated to all other nodes after a few seconds.
§Serialization
The published messages must be serializable since they will be sent across the nodes in the cluster of the service. JSON is the recommended serialization format for these messages. The Serialization section describes how to add Jackson serialization support to such message classes.
§Underlying Implementation
It is implemented with Akka Distributed Publish Subscribe.