§Message Broker Testing
When decoupling communication via a Broker you can test from both ends of the Topic
. When your Service
is publishing events into a Topic
(as described in Declaring a Topic) your tests should verify the proper data is being pushed into the Topic. At same time, when your service is subscribed to an upstream Topic
you may want to test how your Service
behaves when there are incoming events.
A broker will not be started neither when writing publish nor consumption tests. Instead, Lagom provides in-memory implementations of the Broker API in order to make tests faster. Integration tests with a complete broker should be later implemented but that is out of scope of this documentation. The provided in-memory implementation of the Broker API runs locally and provides exactly-once delivery. If you want to test your code under scenarios where there’s message loss (at-most-once
) or message duplicates (at-least-once
) you will be responsible for writing such behaviour by injecting duplicates or skipping messages.
The Lagom in-memory broker implementation will also help testing your message serialization and deserialization. That is only available in the tools to test publishing though since the publishing end is the one responsible to describe the messages being sent over the wire. When you test the consuming end of a topic, no de/serialization will be run under the covers.
The following code samples use the HelloService
and AnotherService
already presented in previous sections. HelloService
publishes GreetingsMessage
s on the "greetings"
topic and AnotherService
subscribed to those messages using atLeastOnce
semantics.
§Testing publish
When a Service publishes data into a Topic
the descriptor lists a TopicCall
on the public API. Testing the event publishing is very similar to testing ServiceCall
’s in your Service API (see Service testing).
"The PublishService" should {
"publish events on the topic" in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
new PublishApplication(ctx) with LocalServiceLocator with TestTopicComponents
} { server =>
implicit val system = server.actorSystem
implicit val mat = server.materializer
val client: PublishService = server.serviceClient.implement[PublishService]
val source = client.events().subscribe.atMostOnceSource
source
.runWith(TestSink.probe[PubMessage])
.request(1)
.expectNext should ===(PubMessage("msg 1"))
}
}
In order to start the application with a stubbed broker you will have to mixin a TestTopicComponents
into your test application.
Use a ServiceTest
you to create a client to your Service and using that client you can subscribe
to the published topics. Finally, after interacting with the Service to cause the emission of some events you can assert events were published on the Topic
.
The producer end is responsible to describe the public API and provide the serializable mappings for all messages exchanged (both in ServiceCall
s and TopicCall
s). The tests granting the proper behavior of the publishing operations should also test the serializability and deserializability of the messages.
§Testing subscription
Testing the consumption of messages requires starting the Service under test with a stub of the upstream Service producing data into the topic. The following snippet demonstrates how to achieve it.
- An in-memory
Topic
is required and means to send messages into that in-mem Topic. Using theProducerStubFactory
it’s possible to obtain aProducerStub
given a topic name. - With the
producerStub
instance a service stub can be build to replace the production ready upstream service. This will have to use the topic bound to theProducerStub
created in the previous step. - Use the
ProducerStub
on the tests to send messages into the topic and interact normally with the service under test to verify the Service code.
class AnotherServiceSpec extends WordSpec with Matchers with Eventually with ScalaFutures {
var producerStub: ProducerStub[GreetingMessage] = _
"The AnotherService" should {
"publish updates on greetings message" in
ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
new AnotherApplication(ctx) with LocalServiceLocator {
// (1) creates an in-memory topic and binds it to a producer stub
val stubFactory = new ProducerStubFactory(actorSystem, materializer)
producerStub = stubFactory.producer[GreetingMessage](HelloService.TOPIC_NAME)
// (2) Override the default Hello service with our service stub
// which gets the producer stub injected
override lazy val helloService = new HelloServiceStub(producerStub)
}
} { server =>
// (3) produce a message in the stubbed topic via it's producer
producerStub.send(GreetingMessage("Hi there!"))
// create a service client to assert the message was consumed
eventually(timeout(Span(5, Seconds))) {
// cannot use async specs here because eventually only detects raised exceptions to retry.
// if a future fail at the first time, eventually won't retry though future will succeed later.
// see https://github.com/lagom/lagom/issues/876 for detail info.
val futureResp = server.serviceClient.implement[AnotherService].foo.invoke()
whenReady(futureResp) { resp =>
resp should ===("Hi there!")
}
}
}
}
}
// (2) a Service stub that will use the in-memoru topic bound to
// our producer stub
class HelloServiceStub(stub: ProducerStub[GreetingMessage]) extends HelloService {
override def greetingsTopic(): Topic[GreetingMessage] = stub.topic
override def hello(id: String): ServiceCall[NotUsed, String] = ???
override def useGreeting(id: String): ServiceCall[GreetingMessage, Done] = ???
}
When testing a subscription it is possible the code under test includes a Service that is a producer itself. In those situations, the Application
used for the unit tests must differ from the Application
used for production. The Application
used in unit tests must not mix-in LagomKafkaComponents
and just use TestTopicComponents
instead.