§Message Broker API
The Lagom Message Broker API provides a distributed publish-subscribe model that services can use to share data via topics. A topic is simply a channel that allows services to push and pull data. In Lagom, topics are strongly typed, hence both the subscriber and producer can know in advance what the expected data flowing through will be.
§Declaring a topic
To publish data to a topic a service needs to declare the topic in its service descriptor.
import com.lightbend.lagom.javadsl.api.*;
import com.lightbend.lagom.javadsl.api.broker.Topic;
import static com.lightbend.lagom.javadsl.api.Service.*;
public interface HelloService extends Service {
String GREETINGS_TOPIC = "greetings";
@Override
default Descriptor descriptor() {
return named("helloservice")
.withCalls(
pathCall("/api/hello/:id", this::hello), pathCall("/api/hello/:id", this::useGreeting))
// here we declare the topic(s) this service will publish to
.withTopics(topic(GREETINGS_TOPIC, this::greetingsTopic))
.withAutoAcl(true);
}
// The topic handle
Topic<GreetingMessage> greetingsTopic();
ServiceCall<NotUsed, String> hello(String id);
ServiceCall<GreetingMessage, Done> useGreeting(String id);
}
The syntax for declaring a topic is similar to the one used already to define services’ endpoints. The Descriptor.withTopics
method accepts a sequence of topic calls, each topic call can be defined via the Service.topic
static method. The latter takes a topic name (i.e., the topic identifier), and a reference to a method that returns a Topic
instance.
Data flowing through a topic is serialized to JSON by default. Of course, it is possible to use a different serialization format, and you can do so by passing a different message serializer for each topic defined in a service descriptor. For instance, using the above service definition, here is how you could have passed a custom serializer: topic("greetings", this::greetingsTopic).withMessageSerializer(<your-custom-serializer>)
.
§Partitioning topics
Kafka will distribute messages for a particular topic across many partitions, so that the topic can scale. Messages sent to different partitions may be processed out of order, so if the ordering of the messages you are publishing matters, you need to ensure that the messages are partitioned in such a way that order is preserved. Typically, this means ensuring each message for a particular entity goes to the same partition.
Lagom allows this by allowing you to configure a partition key strategy, which extracts the partition key out of a message. Kafka will then use this key to help decide what partition to send each message to. The partition can be selected using the partitionKeyStrategy
property, by passing a PartitionKeyStrategy
to it:
return named("blogpostservice")
.withTopics(
topic("blogposts", this::blogPostEvents)
.withProperty(KafkaProperties.partitionKeyStrategy(), BlogPostEvent::getPostId));
§Implementing a topic
The primary source of messages that Lagom is designed to produce is persistent entity events. Rather than publishing events in an ad-hoc fashion in response to particular things happening, it is better to take the stream of events from your persistent entities, and adapt that to a stream of messages sent to the message broker. In this way, you can ensure at least once processing of events by both publishers and consumers, which allows you to guarantee a very strong level of consistency throughout your system.
Lagom’s TopicProducer
helper provides two methods for publishing a persistent entities event stream, singleStreamWithOffset
for use with non sharded read side event streams, and taggedStreamWithOffset
for use with sharded read side event streams. Both of these methods take a callback which takes the last offset that the topic producer published, and allows resumption of the event stream from that offset via the PersistentEntityRegistry.eventStream
method for obtaining a read-side stream. For more details on read-side streams, see Persistent Read-Side's.
Lagom will, in the case of the singleStreamWithOffset
method, ensure that your topic producer only runs on one node of your cluster, or with the taggedStreamWithOffset
method will distribute the tags evenly across the cluster to distribute the publishing load.
Here’s an example of publishing a single, non sharded event stream:
public Topic<GreetingMessage> greetingsTopic() {
return TopicProducer.singleStreamWithOffset(
offset -> {
return persistentEntityRegistry
.eventStream(HelloEventTag.INSTANCE, offset)
.map(this::convertEvent);
});
}
Note that the read-side event stream you passed to the topic producer is “activated” as soon as the service is started. That means all events persisted by your services will eventually be published to the connected topic. Also, you will generally want to map your domain events into some other type, so that other service won’t be tightly coupled to another service’s domain model events. In other words, domain model events are an implementation detail of the service, and should not be leaked.
§Filtering events
You may not want all events persisted by your services to be published. If that is the case then you can filter the event stream:
public Topic<GreetingMessage> greetingsTopic() {
return TopicProducer.singleStreamWithOffset(
offset -> {
return persistentEntityRegistry
.eventStream(HelloEventTag.INSTANCE, offset)
.mapConcat(this::filterHelloGreetings);
});
}
private List<Pair<GreetingMessage, Offset>> filterHelloGreetings(Pair<HelloEvent, Offset> pair) {
if (pair.first() instanceof AbstractGreetingMessageChanged) {
AbstractGreetingMessageChanged msg = (AbstractGreetingMessageChanged) pair.first();
// Only publish greetings where the message is "Hello".
if (msg.getMessage().equals("Hello")) {
return Arrays.asList(convertEvent(pair));
}
}
return Collections.emptyList();
}
When an event is filtered, the TopicProducer
does not publish the event. It also does not advance the offset. If the TopicProducer
restarts then it will resume from the last offset. If a large number of events are filtered then the last offset could be quite far behind, and so all those events will be reprocessed and filtered out again. You need to be aware that this may occur and keep the number of consecutively filtered elements relatively low and also minimize the time and resources required to perform the filtering.
§Offset storage
Lagom will use your configured persistence API provider to store the offsets for your event streams. To read more about offset storage, see the Cassandra offset documentation, JDBC database offset documentation and JPA database offset documentation.
§Subscribe to a topic
To subscribe to a topic, a service just needs to call Topic.subscribe()
on the topic of interest. For instance, imagine that a service wants to collect all greetings messages published by the HelloService
(refer to the code snippet above). The first thing you should do is inject a HelloService
.
private final HelloService helloService;
@Inject
public AnotherServiceImpl(HelloService helloService) {
this.helloService = helloService;
}
Then, subscribe to the greetings topic, and hook your logic to apply to each messages that published to the topic.
helloService
.greetingsTopic()
.subscribe() // <-- you get back a Subscriber instance
.atLeastOnce(
Flow.fromFunction(
(GreetingMessage message) -> {
return doSomethingWithTheMessage(message);
}));
When calling Topic.subscribe()
you will get back a Subscriber
instance. In the above code snippet we have subscribed to the greetings
topic using at-least-once delivery semantics. That means each message published to the greetings
topic is received at least once, but possibly more. The subscriber also offers a atMostOnceSource
that gives you at-most-once delivery semantics. If in doubt, prefer using at-least-once delivery semantics.
Finally, subscribers are grouped together via Subscriber.withGroupId
. A subscriber group allows many nodes in your cluster to consume a message stream while ensuring that each message is only handled once by each node in your cluster. Without subscriber groups, all of your nodes for a particular service would get every message in the stream, leading to their processing being duplicated. By default, Lagom will use a group id that has the same name as the service consuming the topic.
§Consuming message metadata
Your broker implementation may provide additional metadata with messages which you can consume. This can be accessed by invoking the Subscriber.withMetadata()
method, which returns a subscriber that wraps the messages in a Message
.
helloService
.greetingsTopic()
.subscribe()
.withMetadata()
.atLeastOnce(
Flow.fromFunction(
(Message<GreetingMessage> msg) -> {
GreetingMessage payload = msg.getPayload();
String messageKey = msg.messageKeyAsString();
Optional<Headers> kafkaHeaders = msg.get(KafkaMetadataKeys.HEADERS);
System.out.println(
"Message: " + payload + " Key: " + messageKey + " Headers: " + kafkaHeaders);
return Done.getInstance();
}));
The messageKeyAsString
method is provided as a convenience for accessing the message key. Other properties can be accessed using the get
method. A full list of the metadata keys available for Kafka can be found here.
§Skipping messages
You may only want to apply your logic to a subset of the messages that the topic publishes and skip the others. The Flow
that is passed to Subscriber.atLeastOnce
must emit exactly one Done
element for each element that it receives. It must also emit them in the same order that the elements were received. This means that you must not use methods such as filter
or collect
on the Flow
which would drop elements.
The easiest way to achieve this is to use a total function which returns Done
for the elements that should be skipped. For example:
helloService
.greetingsTopic()
.subscribe()
.atLeastOnce(
Flow.fromFunction(
(GreetingMessage message) -> {
if (message.getMessage().equals("Kia ora")) {
return doSomethingWithTheMessage(message);
} else {
// Skip all messages where the message is not "Kia ora".
return Done.getInstance();
}
}));
§Polymorphic event streams
Typically you will want to publish more than one type of event to a particular topic. This can be done by creating an interface that each event implements. In order to successfully serialize these events to and from JSON, a few extra annotations are needed to instruct Jackson to describe and consume the type of the event in the produced JSON.
For example, consider a situation where you have a blog post created event and a blog post published event. Here’s what your event structure might look like:
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = Void.class)
@JsonSubTypes({
@JsonSubTypes.Type(BlogPostEvent.BlogPostCreated.class),
@JsonSubTypes.Type(BlogPostEvent.BlogPostPublished.class)
})
public interface BlogPostEvent {
String getPostId();
@JsonTypeName("created")
final class BlogPostCreated implements BlogPostEvent {
private final String postId;
private final String title;
@JsonCreator
public BlogPostCreated(String postId, String title) {
this.postId = postId;
this.title = title;
}
public String getPostId() {
return postId;
}
public String getTitle() {
return title;
}
}
@JsonTypeName("published")
final class BlogPostPublished implements BlogPostEvent {
private final String postId;
@JsonCreator
public BlogPostPublished(String postId) {
this.postId = postId;
}
public String getPostId() {
return postId;
}
}
}
The @JsonTypeInfo
annotation describes how the type of the event will be serialized. In this case, it’s saying each event type will be identified by its name, and that name will go into a property called type
. The @JsonTypeName
on each event subclass says what the name of that event should be. And the @JsonSubTypes
annotation is used to tell Jackson what the possible sub types of the event are, so that it knows where to look when deserializing.
The resulting JSON for the BlogPostCreated
event will look like this:
{
"type": "created",
"postId": "1234",
"title": "Some title"
}
While the JSON for the BlogPostPublished
event will look like this:
{
"type": "published",
"postId": "1234",
}
Finally, note the defaultImpl = Void.class
in the @JsonSubTypes
annotation. This tells Jackson that if it comes across an event type that it doesn’t recognize the name for, to deserialize it as null
. This is optional, but can be important for ensuring forwards compatibility in your services, if a service adds a new event subclass that it publishes, often you want your existing services that consume that event stream to just ignore it. Setting this will allow them to do that, otherwise, you’ll have to upgrade all the services that consume that event stream to explicitly ignore it before you upgrade the producer that produces the events.