§Message Broker Testing
When decoupling communication via a Broker you can test from both ends of the Topic
. When your Service
is publishing events into a Topic
(as described in Declaring a Topic) your tests should verify the proper data is being pushed into the Topic. At same time, when your service is subscribed to an upstream Topic
you may want to test how your Service
behaves when there are incoming events.
A broker will not be started neither when writing publish nor consumption tests. Instead, Lagom provides in-memory implementations or the Broker API in order to make tests faster. Integration tests with a complete broker should be later implemented but that is out of scope of this documentation. The provided in-memory implementation of the Broker API runs locally and provides exactly-once delivery. If you want to test your code under scenarios where there’s message loss (at-most-once
) or message duplicates (at-least-once
) you will be responsible for writing such behaviour by injecting duplicates or skipping messages.
The Lagom in-memory broker implementation will also help testing your message serialisation and deserialisation. That is only available in the tools to test publishing though since the publishing end is the one responsible to describe the messages being sent over the wire. When you test the consuming end of a topic, no de/serialisation will be run under the covers.
The following code samples use the HelloService
and AnotherService
already presented in previous sections. HelloService
publishes GreetingsMessage
s on the "greetings"
topic and AnotherService
subscribed to those messages using atLeastOnce
semantics.
§Testing publish
When a Service publishes data into a Topic
the descriptor lists a TopicCall
on the public API. Testing the event publishing is very similar to testing ServiceCall
’s in your Service API (see Service testing).
@Test
public void shouldEmitGreetingsMessageWhenHelloEntityEmitsEnEvent() {
withServer(setup, server -> {
PublishService client = server.client(PublishService.class);
Source<PublishEvent, ?> source =
client.messageTopic().subscribe().atMostOnceSource();
// use akka stream testkit
TestSubscriber.Probe<PublishEvent> probe =
source.runWith(
TestSink.probe(server.system()), server.materializer()
);
PublishEvent actual = probe.request(1).expectNext();
assertEquals(new PublishEvent(23), actual);
});
}
Using a ServiceTest
you create a client to your Service. Using that client you can subscribe
to the published topics. Finally, after interacting with the Service to cause the emission of some events you can assert events were published on the Topic
.
The producer end is responsible to describe the public API and provide the serialisable mappings for all messages exchanged (both in ServiceCall
s and TopicCall
s). The tests granting the proper behavior of the publishing operations should also test the serialisbility and deserilisability of the messages.
§Testing subscription
Testing the consumption of messages requires starting the Service under test with a stub of the upstream Service producing data into the topic. The following snippet demonstrates how to achieve it.
- A ServiceTest instance is started with a modified
Setup
where the upstreamHelloService
is replaced with aHelloServiceStub
. - An instance of a
ProducerStub
is declared. This instance will be bound when the Server is started and theHelloServiceStub
created. - The Stub for the upstream Service must request a
ProducerStubFactory
from the Injector and use that to obtain aProducerStub
for the appropriateTopic
. See how this snippet usesGREETINGS_TOPIC
constant declared in the super interfaceHelloService
. On the stubbed method that implements theTopicCall
the stub must return theTopic
bound to theProducerStub
created in the constructor. - Use the
ProducerStub
on the tests to send messages into the topic and interact normally with the service under test to verify the Service code.
public class AnotherServiceTest {
// (1) creates a server using the Module for this service Module
// and we override the config to use HelloServiceStub
// implemented below.
private Setup setup = defaultSetup().configureBuilder(b ->
b.overrides(
bind(HelloService.class).to(HelloServiceStub.class),
bind(AnotherService.class).to(AnotherServiceImpl.class))
);
// (2) an instance of ProducerStub allows test code to inject
// messages on the topic.
private static ProducerStub<GreetingMessage> helloProducer;
@Test
public void shouldReceiveMessagesFromUpstream() {
// (1)
withServer(setup, server -> {
GreetingMessage message = new GreetingMessage("someId", "Hi there!");
AnotherService client = server.client(AnotherService.class);
client.audit().invoke().toCompletableFuture().get(3, SECONDS);
// (4) send a message in the topic
helloProducer.send(message);
// use a service client instance to interact with the service
// and assert the message was processed as expected.
// ...
// You will probably need to wrap your assertion in an
// `eventually()` clause so you can retry your assertion
// since your invocation via the service client may arrive
// before the message was consumed.
});
}
// (1) Stub for the upstream Service
static class HelloServiceStub implements HelloService {
// (2) Receives a ProducerStubFactory that factors ProducerStubs
@Inject
HelloServiceStub(ProducerStubFactory producerFactory) {
// (3) Create a stub to request a producer for a specific topic
helloProducer = producerFactory.producer(GREETINGS_TOPIC);
}
@Override
public Topic<GreetingMessage> greetingsTopic() {
// (3) the upstream stub must return the topic bound to the producer stub
return helloProducer.topic();
}
@Override
public ServiceCall<NotUsed, String> hello(String id) {
throw new UnsupportedOperationException();
}
@Override
public ServiceCall<GreetingMessage, Done> useGreeting(String id) {
throw new UnsupportedOperationException();
}
}
}