Aspecto blog

On microservices, OpenTelemetry, and anything in between

Distributed Tracing for Kafka with OpenTelemetry in Python

OpenTelemetry Kafka and Python

Share this post

Share on facebook
Share on twitter
Share on linkedin

In this tutorial, I will cover Apache Kafka, OpenTelemetry, and how they play out together with practical examples in Python from 0 to 1.

You will learn how to enable OpenTelemetry tracing in Python to generate spans and visualize traces for various Kafka operations. 

What to Expect

This guide has two parts:

  1. A “hello world” kind of intro to OpenTelemetry and Kafka in Python in which you will end up seeing the outputs in a console. 
  2. In the second part, you will learn to visualize traces to troubleshoot your microservices.

To generate spans for the different Kafka operations (e.g., consume and produce), we will use OpenTelemetry libraries.

If you’re already familiar with Kafka and OpenTelemetry, feel free to jump right to the practical part of this guide. 

What is Apache Kafka?

Kafka is an open-source distributed event stream platform, where we can send events to topics and consume them. You can learn more about Kafka here.

What is OpenTelemetry?

In a world where microservices are becoming a standard architecture and distributed messaging systems like Kafka play a core part in enabling the independent microservices to communicate, the world needed a way to visualize and troubleshoot the complexity of a distributed architecture.

Led by the CNCF (Cloud Native Computing Foundation), OpenTelemetry is an open-source project, a set of APIs and SDKs, that allows us to collect, export, and generate traces, logs, and metrics.

With OpenTelemetry, we gather data from the events happening within the different components in our systems (including all sorts of message brokers like Kafka), which ultimately help us understand our software’s performance and behavior and visualize our microservices architecture.

For this specific guide, we will focus on traces, you can learn more about OpenTelemetry in this friendly guide for developers.

OpenTelemetry Traces

A trace tracks the progression of a single request, as it is handled by the different microservices/ moving parts.

The atomic unit of work in a trace is called a span, for example, a call to a database, external service, send message to Kafka topic, etc.

Each trace contains a collection of 1 or more spans and more information like how much time the entire trace took to complete.

OpenTelemetry Tracing Spans and Traces

What are OpenTelemetry Instrumentations?

So now you know what traces and spans mean, but how do you create them?

That can be done using the SDKs provided to us by OpenTelemetry, which bundles useful instrumentations.

What’s an instrumentation? A piece of code that is responsible for creating spans and traces and sending them to some backend, for later visualizing them.

OpenTelemetry contains instrumentations for a lot of different libraries, including ones for Kafka.

How to Use Kafka with OpenTelemetry in Python

By now you should have all the theory you need, so let’s start writing some code.

* This guide assumes you are running Kafka in your local (if you don’t, visit this quick guide).

The Practical Part

First, let’s install the relevant libraries:

pip install opentelemetry-api
pip install opentelemetry-sdk
pip install opentelemetry-instrumentation-kafka-python
pip install kafka-python

Then, let’s add our tracing.py file:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    SimpleSpanProcessor,
    ConsoleSpanExporter,
)
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
def instrument(*args, **kwargs):
    provider = TracerProvider()
    simple_processor = SimpleSpanProcessor(ConsoleSpanExporter())
    provider.add_span_processor(simple_processor)
    trace.set_tracer_provider(provider)
    KafkaInstrumentor().instrument()

This file is responsible for using OpenTelemetry SDKs to enable the creation of spans and traces.

Notice we have a console span exporter which means we will be sending the spans to the console output.

As you can see we use the KafkaInstrumentor to create spans for Kafka-related operations, like consume and produce.

This is all the OpenTelemetry code we need to create Kafka spans. 

Let’s create the producer file:

from tracing import instrument
instrument()
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for _ in range(1):
   producer.send('foobar', b'some_message_bytes')
   producer.flush()

And create the consumer file:

from tracing import instrument
instrument()
from kafka import KafkaConsumer
consumer = KafkaConsumer('foobar', bootstrap_servers='localhost:9092')
for msg in consumer:
   print(msg)

Notice that both files have these 2 lines:

from tracing import instrument
instrument()

This is the calling of the instrument function and enables the creation of the spans when we run the consumer and producer files using the following commands:

python producer.py
python consumer.py

Since we use console exporter – we can see the spans of the consumer and producer on the console output:

{
    "name": "foobar receive",
    "context": {
        "trace_id": "0x436c0e9807b2f61b4a72bc63e5205954",
        "span_id": "0xdda310172f7297cd",
        "trace_state": "[]"
    },
    "kind": "SpanKind.CONSUMER",
    "parent_id": "0x711a6a5d5bb977d8",
    "start_time": "2022-03-02T10:40:36.695519Z",
    "end_time": "2022-03-02T10:40:36.720864Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "messaging.system": "kafka",
        "messaging.destination": "foobar",
        "messaging.kafka.partition": 0,
        "messaging.url": "\"localhost:9092\""
    },
    "events": [],
    "links": [],
    "resource": {
        "telemetry.sdk.language": "python",
        "telemetry.sdk.name": "opentelemetry",
        "telemetry.sdk.version": "1.9.1",
        "service.name": "unknown_service"
    }
}

{
    "name": "foobar send",
    "context": {
        "trace_id": "0x436c0e9807b2f61b4a72bc63e5205954",
        "span_id": "0x711a6a5d5bb977d8",
        "trace_state": "[]"
    },
    "kind": "SpanKind.PRODUCER",
    "parent_id": null,
    "start_time": "2022-03-02T10:40:36.609135Z",
    "end_time": "2022-03-02T10:40:36.610410Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "messaging.system": "kafka",
        "messaging.destination": "foobar",
        "messaging.url": "\"localhost:9092\""
    },
    "events": [],
    "links": [],
    "resource": {
        "telemetry.sdk.language": "python",
        "telemetry.sdk.name": "opentelemetry",
        "telemetry.sdk.version": "1.9.1",
        "service.name": "unknown_service"
    }
}

But I have a feeling that you want to get a better visualization than just a console log. If you do – read on 🙂

Visualizing OpenTelemetry Tracing Data 

For this article, I will be using Aspecto to visualize my traces. You can follow along by quickly creating a free account.

Aspecto has built-in support for visualizing async messaging platforms like Kafka so it fits perfectly with this guide’s purpose.

Once you’re signed in, you get an API key and all you need to do is change a few lines of code, environment variables and install some packages. I’ll be showing you how to do that below.

The end result should look like this:

Single trace view. OpenTelemetry tracing for Kafka operations in the Aspecto Platform.
Payload view. OpenTelemetry tracing for Kafka operations in the Aspecto Platform.

First, let’s install:

pip install opentelemetry-exporter-otlp-proto-grpc

Then let’s modify our tracing.py file so that it uses the otlp exporter instead of the console exporter, to send data to Aspecto:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
   BatchSpanProcessor,
   SimpleSpanProcessor,
   ConsoleSpanExporter,
)
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def instrument(*args, **kwargs):
   provider = TracerProvider()
   simple_processor = SimpleSpanProcessor(OTLPSpanExporter())
   provider.add_span_processor(simple_processor)
   trace.set_tracer_provider(provider)
   KafkaInstrumentor().instrument()

To run the consumer

OTEL_SERVICE_NAME=kafka-consumer OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=https://otelcol.aspecto.io:4317 OTEL_EXPORTER_OTLP_HEADERS=Authorization=YOUR_ASPECTO_API_KEY_HERE python consumer.py

To run the producer

OTEL_SERVICE_NAME=kafka-producer OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=https://otelcol.aspecto.io:4317 OTEL_EXPORTER_OTLP_HEADERS=Authorization=YOUR_ASPECTO_API_KEY_HERE python producer.py

And that’s it! When you enter the Aspecto platform and click on Trace Search, after a minute or two you should be able to see your traces. 

Feel free to click on the chat button and contact us, we’d be glad to help you figure everything out and answer any questions you may have.

P.S. If you prefer to use an open-source to visualize your traces, check out Jaeger Tracing. Follow this guide to learn how to use OpenTelemetry to send traces to Jaeger. Keep in mind that it takes a bit more time to set up the required environment.

Hope this has been a useful guide for you, feel free to reach out if you have any questions!

Spread the word

Share on facebook
Share on twitter
Share on linkedin
Subscribe for more distributed applications tutorials and insights that will help you boost microservices troubleshooting.