Aspecto blog

On microservices, OpenTelemetry, and anything in between

How to Write Integration Tests for Kafka in NodeJS

How to write an integration test for kafka in NodeJS

Share this post

Share on facebook
Share on twitter
Share on linkedin

Writing integration tests is usually not such a fun task.

You need to replicate a real environment and test the connections between the different parts of the system.

You cannot rely on mocking as you would in a unit test since you need to simulate the actual environment.

In this tutorial, I’ll show you how to write a simple test for producing and consuming a message in Kafka and validating the content of the message, with exactly 30 lines of test code, using an open-source library called Malabi.

Preparing The Project

First, let’s create a simple Express app with just one route. A route that produces a Kafka message and then consumes it. I do it this way just so it’s easy to see it all at once in this post, but in real life, it’d probably be different endpoints.

We are going to use:

  1. Mocha – A JavaScript test framework running on Node.js
  2. Malabi – This library introduces a new way of testing services called Trace-based testing (more on that, below)
  3. KafkaJS – The library we use to consume and produce Kafka messages

I created the app using express-generator:

npx express-generator kafkamalabi

Perform relevant npm installs:

npm install --save-dev mocha malabi kafkajs

Let’s modify the routes/index.js file to look like this:

const express = require('express');
const router = express.Router();
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
 clientId: 'my-app',
 brokers: ['localhost:9092'],
})
const sendKafkaMessage = async () => {
 const producer = kafka.producer()
 await producer.connect()
 await producer.send({
   topic: 'test-topic',
   messages: [
     { value: 'Content' },
   ],
 })
 await producer.disconnect()
}
const consumeMessages = async () => {
 const consumer = kafka.consumer({ groupId: 'test-group' })
 await consumer.connect()
 await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
 await consumer.run({
   eachMessage: async ({ topic, partition, message }) => {
     console.log({
       value: message.value.toString(),
     })
   },
 })
}
/* GET home page. */
router.get('/', async function(req, res, next) {
 await sendKafkaMessage();
 await consumeMessages();
 res.render('index', { title: 'Express' });
});
module.exports = router;

To run this, you need to launch a locally running Kafka. I did so using Zookeeper and Docker. You can see how it’s documented and follow along using this guide or in the Kafka-docker GitHub repo.

Now run the service:

npm start

Then, go to localhost:3000 in your favorite browser. In your terminal you would see the following message:

{ value: 'Content' }

Now that we have all the code of our microservice setup, let’s see how we can test it.

What do we want to test?

So when it comes to writing an integration test for Kafka in NodeJS (or in any other language), we want to test that an actual Kafka sent a specific message and that the exact message has been read with the correct message content.

So we want to make sure Kafka receives a message with the value ‘Content’ and consumes one with the same content.

If I were to do this with existing tools, I would probably be writing custom code that writes and reads to and from Kafka and asserts on the values. But that’s too complex. 

I want an easy way of knowing what real values the actual Kafka instance has received.

Enter Malabi open source.

What is Open-Source Malabi?

Malabi is an open-source library that introduces a new way of testing services: Trace-based testing.

Trace-based testing is a method that allows us to improve assertion capabilities by leveraging traces data and making it accessible while setting our expectations from a test.

Malabi makes it easy to write the above code. Before we dive into how it happens, I’ll show you the end result – my promised 30 lines of test code:

I added a test.js file under a new folder called test.

const { instrument, malabi } = require('malabi');
instrument({
 serviceName: 'tests-runner',
});
const axios = require('axios');
const assert = require('assert');
describe('Kafka integration test', function () {
 it('should produce a kafka message and consume it with correct data', async function () {
   // This is the payload of the message producer to (and later consumed in) kafka
   const MESSAGE_PAYLOAD  = "Content";
   const TOPIC_NAME = 'test-topic';
   const telemetryRepo = await malabi(async () => {
     // This endpoint performs the calls to kafka, which producers a message and later consumes it.
     await axios(`http://localhost:3000/`);
   });
   // assert on what produce did
   const producerSpan = telemetryRepo.spans.messaging().first;
   assert.equal(producerSpan.attr('messaging.destination'), TOPIC_NAME);
   assert.equal(producerSpan.attr('messaging.payload'), MESSAGE_PAYLOAD);
   // assert on what consume did
   const consumerSpan = telemetryRepo.spans.messaging().second;
   assert.equal(producerSpan.attr('messaging.destination'), TOPIC_NAME);
   assert.equal(producerSpan.attr('messaging.payload'), MESSAGE_PAYLOAD);
 });
});

Now that you see how simple the code is, I’ll explain exactly what happens here.

Malabi uses OpenTelemetry to generate data about what operations happen during a test run.

I won’t dive too deep under the hood, but OpenTelemetry is another open-source that allows us to gather information about what happens in our microservices. For example: which HTTP calls / DB / messaging operations are happening.

This information is represented as spans (each operation like a DB save has a corresponding span), and a group/tree of spans creates a trace.

Using Malabi, we can generate a specific trace for each logical test unit (like it clause), and make assertions on this data.

The main magic happens here:

  const telemetryRepo = await malabi(async () => {
     // This endpoint performs the calls to kafka, which producers a message and later consumes it.
     await axios(`http://localhost:3000/`);
   });

The Malabi function handles all the internal OpenTelemetry logic and returns the data about all the operations that got triggered due to the code written inside the callback function.

In our use case, the call to localhost:3000 triggered a Kafka produce and consume, so the info about those operations would be inside the telemetryRepo object and thus the assertions you see above.

Also, notice the instrument function that enables Malabi to collect information from the test runner.

But for the above code to work, we need to add some code inside our service to enable data collection from that service.

To enable this, let’s add the following code at the top of the app.js file(the entry point of the service).

const { instrument, serveMalabiFromHttpApp } = require('malabi'); // 1
const instrumentationConfig = { // 2
 serviceName: 'service-under-test',
};
instrument(instrumentationConfig); // 3
serveMalabiFromHttpApp(18393, instrumentationConfig); // 4

Let’s review line by line:

1. Import relevant functions from Malabi

2. The instrumentation config. This holds the name of the service so that Malabi knows to attach this information to the created spans. In our case, we called it simply ‘service-under-test’

3. Instrument function: As mentioned, this is the call that enables the data collection from the service operations.

4. serveMalabiFromHttpApp: By default, Malabi stores the data about the operations in memory. This data needs to be made accessible to the test runner for assertions, so Malabi exposes an endpoint at the specified port (18393 in this case), and under the hood, the Malabi function is calling this endpoint to retrieve the information to assert on.

Now that we have everything set up – this is the code we use to run our test:
Update package.json to be able to run the test using mocha (under scripts):

"test": "MALABI_ENDPOINT_PORT_OR_URL=18393 mocha --paths \"./test/*.js\" --timeout 10000"

Explaining the different variables:

1. MALABI_ENDPOINT_PORT_OR_URL: This tells the test runner to look for an exposed endpoint at port 18393.

2. The “paths” part: Tells Mocha where to look for tests.

3. The timeout modification: In my case, it took a bit longer for it to actually finish in 2 seconds, so I’ve increased the timeout to 10 seconds to allow the test to complete.

If you haven’t already, don’t forget to run the service before running the test runner. Here are the complete run commands:

npm run start
npm run test

And our results:

An integration test for Kafka in NodeJS output

And that is a wrap! You now know how to use Malabi to generate information about a service producing and consuming Kafka messages during a test run. And you know how to use this data to make assertions in your integration test.

I hope you found this insightful. We would love to hear your feedback, and if something is not working as expected, feel free to reach out to me!

Spread the word

Share on facebook
Share on twitter
Share on linkedin
Subscribe for more distributed applications tutorials and insights that will help you boost microservices troubleshooting.