Publish-Subscribe

§Publish-Subscribe

Publish–subscribe is a well known messaging pattern. Senders of messages, called publishers, do not target the messages directly to specific receivers, but instead publish messages to topics without knowledge of which receivers, called subscribers, if any, there may be. Similarly, a subscriber express interest in a topic and receive messages published to that topic, without knowledge of which publishers, if any, there are.

§Dependency

To use this feature add the following in your project’s build:

In Maven:

<dependency>
    <groupId>com.lightbend.lagom</groupId>
    <artifactId>lagom-javadsl-pubsub_${scala.binary.version}</artifactId>
    <version>${lagom.version}</version>
</dependency>

In sbt:

libraryDependencies += lagomJavadslPubSub

§Usage from Service Implementation

Let’s look at an example of a service that publishes temperature measurements of hardware devices. A device can submit its current temperature and interested parties can get a stream of the temperature samples.

The service API is defined as:

public interface SensorService extends Service {

  ServiceCall<Temperature, NotUsed> registerTemperature(String id);

  ServiceCall<NotUsed, Source<Temperature, ?>> temperatureStream(String id);

  @Override
  default Descriptor descriptor() {
    return named("/sensorservice")
        .withCalls(
            pathCall("/device/:id/temperature", this::registerTemperature),
            pathCall("/device/:id/temperature/stream", this::temperatureStream));
  }
}

The implementation of this interface looks like:

import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.pubsub.PubSubRef;
import com.lightbend.lagom.javadsl.pubsub.PubSubRegistry;
import com.lightbend.lagom.javadsl.pubsub.TopicId;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;

public class SensorServiceImpl implements SensorService {

  private final PubSubRegistry pubSub;

  @Inject
  public SensorServiceImpl(PubSubRegistry pubSub) {
    this.pubSub = pubSub;
  }

  @Override
  public ServiceCall<Temperature, NotUsed> registerTemperature(String id) {
    return temperature -> {
      final PubSubRef<Temperature> topic = pubSub.refFor(TopicId.of(Temperature.class, id));
      topic.publish(temperature);
      return CompletableFuture.completedFuture(NotUsed.getInstance());
    };
  }

  @Override
  public ServiceCall<NotUsed, Source<Temperature, ?>> temperatureStream(String id) {
    return request -> {
      final PubSubRef<Temperature> topic = pubSub.refFor(TopicId.of(Temperature.class, id));
      return CompletableFuture.completedFuture(topic.subscriber());
    };
  }
}

When a device submits its current temperature it is published to a topic that is unique for that device. Note that the topic where the message is published to is defined by the message class, here Temperature, and an optional classifier, here the device id. The messages of this topic will be instances of the message class or subclasses thereof. The qualifier can be used to distinguish topics that are using the same message class. The empty string can be used as qualifier if the message class is enough to define the topic identity.

Use the method publish of the PubSubRef representing a given topic to publish a single message, see registerTemperature in the above code.

Use the method subscriber of the PubSubRef to acquire a stream Source of messages published to a given topic, see temperatureStream in the above code.

It is also possible to publish a stream of messages to a topic as is illustrated by this variant of the SensorService:

import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.pubsub.PubSubRef;
import com.lightbend.lagom.javadsl.pubsub.PubSubRegistry;
import com.lightbend.lagom.javadsl.pubsub.TopicId;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;

public class SensorServiceImpl2 implements SensorService2 {

  private final PubSubRegistry pubSub;
  private final Materializer materializer;

  @Inject
  public SensorServiceImpl2(PubSubRegistry pubSub, Materializer mat) {
    this.pubSub = pubSub;
    this.materializer = mat;
  }

  @Override
  public ServiceCall<Source<Temperature, ?>, NotUsed> registerTemperatures(String id) {
    return request -> {
      final PubSubRef<Temperature> topic = pubSub.refFor(TopicId.of(Temperature.class, id));
      request.runWith(topic.publisher(), materializer);
      return CompletableFuture.completedFuture(NotUsed.getInstance());
    };
  }

  @Override
  public ServiceCall<NotUsed, Source<Temperature, ?>> temperatureStream(String id) {
    return request -> {
      final PubSubRef<Temperature> topic = pubSub.refFor(TopicId.of(Temperature.class, id));
      return CompletableFuture.completedFuture(topic.subscriber());
    };
  }
}

Note how the incoming Source in registerTemperature is connected to the publisher Sink of the topic with the runWith method using the Materializer that is injected in the constructor. You can of course apply ordinary stream transformations of the incoming stream before connecting it to the publisher.

§Usage from Persistent Entity

You can publish messages from a Persistent Entity. First you must inject the PubSubRegistry to get hold of a PubSubRef for a given topic.

private final PubSubRef<PostPublished> publishedTopic;

@Inject
public Post4(PubSubRegistry pubSub) {
  publishedTopic = pubSub.refFor(TopicId.of(PostPublished.class, ""));
}

A command handler that publishes messages, in this case the PostPublished event, may look like this:

b.setCommandHandler(
    Publish.class,
    (cmd, ctx) ->
        ctx.thenPersist(
            new PostPublished(entityId()),
            evt -> {
              ctx.reply(Done.getInstance());
              publishedTopic.publish(evt);
            }));

To complete the picture, a service method that delivers these PostPublished events as a stream:

import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.pubsub.PubSubRef;
import com.lightbend.lagom.javadsl.pubsub.PubSubRegistry;
import com.lightbend.lagom.javadsl.pubsub.TopicId;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;

import akka.stream.javadsl.Source;

public class BlogServiceImpl4 implements BlogService4 {

  private final PubSubRef<BlogEvent.PostPublished> publishedTopic;

  @Inject
  public BlogServiceImpl4(PubSubRegistry pubSub) {
    publishedTopic = pubSub.refFor(TopicId.of(BlogEvent.PostPublished.class, ""));
  }

  @Override
  public ServiceCall<NotUsed, Source<BlogEvent.PostPublished, ?>> getNewPosts() {
    return request -> CompletableFuture.completedFuture(publishedTopic.subscriber());
  }
}

§Limitations

This feature is specifically for providing publish and subscribe functionality within a single services cluster. To publish and subscribe between services, you should instead use Lagom’s message broker support.

Published messages may be lost. For example in case of networks problems messages might not be delivered to all subscribers. Future version of Lagom may include intra-service pub-sub with at-least-once delivery, in the meantime you can achieve at-least-once delivery by using Lagom’s message broker support.

Note that anytime you fallback to message broker support you will expose your messages via a public topic making them part of your public API.

The registry of subscribers is eventually consistent, i.e. new subscribers are not immediately visible at other nodes, but typically the information will be fully replicated to all other nodes after a few seconds.

§Serialization

The published messages must be serializable since they will be sent across the nodes in the cluster of the service. JSON is the recommended serialization format for these messages. The Serialization section describes how to add Jackson serialization support to such message classes.

§Underlying Implementation

It is implemented with Akka Distributed Publish Subscribe.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.