Distributed tracing is essential to understanding and debugging a system, as it follows data moving both within and across process boundaries. If you’re just getting started with distributed tracing, our Introduction to Distributed Tracing is an excellent starting point.

In this blog post, we’ll be looking at implementing distributed tracing between microservices via a message broker, all with open-source tools. While we’ll be using Node.js, RabbitMQ, and Jaeger, this tutorial can be used with other languages, message queues, and tracing systems as well.

Prerequisites

To follow this tutorial and run the finished project, it’s best to use Docker and Docker Compose. For developers using Mac or Windows, you simply need to install Docker Desktop, and Linux users can find instructions from the official Install Docker Compose page.

The Application

The application that you’ll be looking at today is a very simplified version of the sort of analytics collection pipeline that you might see, for example, tracking website page views or email open rates.

This application is made up of two separate microservices, the first of which is an HTTP server that collects the s made up of two separate microservices, the first of which is an HTTP server that collects the User-Agent header of a visitor and pushes it to RabbitMQ for asynchronous processing so that it can respond fast to the client. The response is a transparent one-pixel image, a common approach in web analytics because it allows for an image tag to load it.

The second service represents an ETL-type pipeline that extracts each collected pixel data from RabbitMQ, transforms it by gathering additional data about the User-Agent from an external API, and finally loads it into a file. (In a real system, this would likely be a database or a data warehouse.)

You can download the starter project (without distributed tracing) here. [Add link here to where the starter project is stored (e.g., a public GitHub repository)]

Collector

Let’s start by looking at the code for the collector service, which uses Express.

const express = require("express");
const publish = require("../lib/rabbitmq").publish;
const app = express();
const port = process.env.PORT || 3000;
const pixel = Buffer.from(
  "R0lGODlhAQABAID/AP///wAAACwAAAAAAQABAAACAkQBADs=",
  "base64"
);
app.get("/track", async (req, res) => {
  res.set({
    "Content-Type": "image/gif",
    "Cache-Control": "private, no-cache, no-store, must-revalidate",
    Expires: "-1",
    Pragma: "no-cache"
  });
  
  try {
    await publish("pixels", {
      timestamp: new Date().toISOString(),
      ua: req.get("User-Agent")
    });
    res.status(200);
  } catch (e) {
    res.status(500);
  } finally {
    res.send(pixel);
  }
});

app.listen(port);

In this relatively simple application, you start an HTTP server that has a single endpoint GET /track. You will use a small library to remove the boilerplate code from having utilized RabbitMQ. It provides publish and consume functions that automatically handle connecting to the message queue, creating the queue if it does not yet exist, and encoding and decoding your payload to and from JSON. If you’re curious as to how this is being done, you can take a look at the lib/rabbitmq.js file.

For each request, set the response headers to return the correct content type (the transparent one-pixel GIF image). It’s also vital to set a few caching headers to make sure that browsers don’t cache this response; otherwise, you might not be able to track repeat visitors! Next up, publish a message to a queue named pixels that contain the timestamp of the request and the value of the User-Agent header. While the application will always respond with the image, the status code depends on whether or not publishing the message to the queue succeeded.

ETL

The ETL service is nothing more than a daemon that listens for new messages from the pixels queue. See below:

const consume = require("../lib/rabbitmq").consume;
const axios = require("axios");
const fs = require("fs-extra");
const outputFile = process.env.OUTPUT_FILE;

consume("pixels", async msg => {
  try {
    const data = await transform(msg.payload);
    await load(data);
    msg.ack();
  } catch (e) {
    msg.nack();
  }
}).catch(err => {
  process.exit(1);
});

const transform = async payload => {
  const uaData = await axios.get(
    `http://useragentstring.com/?getJSON=all&uas=${encodeURIComponent(
      payload.ua
    )}`
  );
  return { ...uaData.data, ua: payload.ua, timestamp: payload.timestamp };
};

const load = async data => {
  return fs.appendFile(outputFile, JSON.stringify(data) + "\n");
};

You can now call your consume function, which has a callback for any new messages in the queue. For each message, the application makes an HTTP request to the UserAgentString.com API with the collected User-Agent. Once you have the API response in JSON format, you can then append the original User-Agent value before storing it to a file.

It’s imperative to confirm that the message was consumed successfully. With RabbitMQ, you call either ack or nack to mark either a positive or negative acknowledgment of each message in the queue. This way, if something fails, the message is returned to the queue and available to be consumed again.

Trying it out

To run the application locally, use the following command:

$ docker-compose up

After all the services have started, you can open http://localhost:3000/track in any browser. It shouldn’t be long until there is a new file in the project folder called data.txt. Inside of this, you can see your request, which looks something like the following:

{"agent_type":"Browser","agent_name":"Firefox","agent_version":"69.0","os_type":"Macintosh","os_name":"OSX","os_versionName":"","os_versionNumber":"10.14","os_producer":"","os_producerURL":"","linux_distibution":"Null","agent_language":"","agent_languageTag":"","ua":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:69.0) Gecko/20100101 Firefox/69.0","timestamp":"2019-09-28T06:52:28.007Z"}

Adding Distributed Tracing

Now that we have a working application and a basic understanding of how it functions, let’s see what it takes to add distributed tracing to it using OpenTracing and Jaeger.

For testing and development purposes, Jaeger provides a handy all-in-one Docker image that uses in-memory data storage. If you look at the docker-compose.yml file, you’ll see that your starter project already contains this image; you just haven’t used it yet.

To do so, you will first need to install OpenTracing and its Jaeger bindings:

$ npm install --save opentracing jaeger-client

Collector

Once installed, you can proceed with configuring the tracer, by requiring the new dependencies and then by adding a new tracer variable at the top-level scope:

const jaeger = require("jaeger-client");
const opentracing = require("opentracing");
const tracer = jaeger.initTracerFromEnv(
  { serviceName: "collector" },
  { logger }
);

By using initTracerFromEnv, the Jaeger client reads the environment variables that you have configured in docker-compose.yml, all of which have the JAEGER_ prefix. The configuration that you’ve chosen collects every single trace. In a high-throughput environment, it’s best to set the sampling mode to “probabilistic” or “rate-limiting.” You can read about different sampling modes in the Sampling section of the Jaeger documentation.

Now that you have your tracer initialized, you can create your first span for the HTTP request by adding the following code to the beginning of your request handler:

const httpSpan = tracer.startSpan("http_request");
httpSpan.addTags({
  [opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_MESSAGING_PRODUCER,
  [opentracing.Tags.HTTP_METHOD]: req.method,
  [opentracing.Tags.HTTP_URL]: req.path
});
res.on("finish", () => {
  httpSpan.setTag(opentracing.Tags.HTTP_STATUS_CODE, res.statusCode);
  httpSpan.finish();
});

You annotate the span with tags, such as details of the HTTP request and response and the fact that it’s producing messages to a queue. While all of these are optional, they help you to get more information from your traces. For each span, it’s required to call the finish function, which provides the data regarding how long the span lasted. You can do this by listening to the Node.js HTTP event that it emits when the response has finished sending.

Go ahead and rebuild your containers, and re-run the application now to see your first trace:

$ docker-compose up --build

The Jaeger Web UI is running at http://localhost:16686, which, after making an HTTP request to your collector, provides you with the following trace information. Note that you’ll have to re-run this command (with the –build flag) every time you want changes in the code to take effect in the running application.

Jaeger UI trace view with single span

It would be great if you could also get information on how long it takes to publish the message to RabbitMQ. Right before the try/catch block where we publish the message, you can create a new span to do this. Because publishing the message happens inside the HTTP request, you want your new span to be a child of the http_request span:

const enqueueSpan = tracer.startSpan("enqueue", {
  childOf: httpSpan
});

You need to finish your span regardless of what happens, and the beginning of the final block is the perfect place to do this. You can also mark the span as having had an error if publishing the message failed. Do this by setting the ERROR tag and adding the error message as a log to the span:

enqueueSpan.setTag(opentracing.Tags.ERROR, true).log({ error: e });

The one final thing you’ll want to do is propagate the trace information to the next ETL service. To do so, you’ll have to modify the message payload to include the trace context:

const traceContext = {};
tracer.inject(enqueueSpan, opentracing.FORMAT_TEXT_MAP, traceContext);
await publish("pixels", {
  timestamp: new Date().toISOString(),
  ua: req.get("User-Agent"),
  trace: traceContext
});

With all of this in mind, all the changes required to the collector service in order to add tracing are as follows:

const express = require("express");
const publish = require("../lib/rabbitmq").publish;
const jaeger = require("jaeger-client");
const opentracing = require("opentracing");
 
const app = express();
const port = process.env.PORT || 3000;
const pixel = Buffer.from(
   "R0lGODlhAQABAID/AP///wAAACwAAAAAAQABAAACAkQBADs=",
   "base64"
);
const tracer = jaeger.initTracerFromEnv({ serviceName: "collector" });
 
app.get("/track", async (req, res) => {
  const httpSpan = tracer.startSpan("http_request");
  httpSpan.addTags({
    [opentracing.Tags.SPAN_KIND]: opentracing.Tags.SPAN_KIND_MESSAGING_PRODUCER,
    [opentracing.Tags.HTTP_METHOD]: req.method,
    [opentracing.Tags.HTTP_URL]: req.path
  });

  res.on("finish", () => {
    httpSpan.setTag(opentracing.Tags.HTTP_STATUS_CODE, res.statusCode);
    httpSpan.finish();
  });

  res.set({
    "Content-Type": "image/gif",
    "Cache-Control": "private, no-cache, no-store, must-revalidate",
    Expires: "-1",
    Pragma: "no-cache"
  });
 
  const enqueueSpan = tracer.startSpan("enqueue", {
    childOf: httpSpan
  });

  const traceContext = {};
  tracer.inject(enqueueSpan, opentracing.FORMAT_TEXT_MAP, traceContext);

  try {
    await publish("pixels", {
      timestamp: new Date().toISOString(),
      ua: req.get("User-Agent")
      ua: req.get("User-Agent"),
      trace: traceContext
    });
  
    res.status(200);
  } catch (e) {
      enqueueSpan.setTag(opentracing.Tags.ERROR, true).log({ error: e });
      res.status(500);
    } finally {
      enqueueSpan.finish();
      res.send(pixel);
   }
});
 
app.listen(port);

Now you can see the trace containing both spans, and if something goes wrong, you can also see the error message that caused it.

 

Jaeger UI trace view with multiple spans

Jaeger UI trace view with failed span showing error log

ETL

For the ETL service, the idea is the same: creating new spans that are associated with previously created ones. The one key difference here is that the root span of each consumed message references the span that is in the message payload.

Let’s look at the changes first and then break up what is happening:

const consume = require("../lib/rabbitmq").consume;
const axios = require("axios");
const fs = require("fs-extra");
const jaeger = require("jaeger-client");
const opentracing = require("opentracing");
 
const outputFile = process.env.OUTPUT_FILE;
 
const tracer = jaeger.initTracerFromEnv({ serviceName: "etl" });

consume("pixels", async msg => {
  const parentSpan = tracer.extract(
    opentracing.FORMAT_TEXT_MAP,
    msg.payload.trace
  );
  const etlSpan = tracer.startSpan("etl", {
    references: [opentracing.followsFrom(parentSpan)]
  });

  try {
    const data = await transform(msg.payload);
    await load(data);
    const transformSpan = tracer.startSpan("transform", {
      childOf: etlSpan
    });
    const data = await withSpan(transformSpan, () => transform(msg.payload));
    const loadSpan = tracer.startSpan("load", {
      childOf: etlSpan
    });
  
    await withSpan(loadSpan, () => load(data));
    msg.ack();
    } catch (e) {
      msg.nack();
    } finally {
      etlSpan.finish();
    }
    }).catch(err => {
    process.exit(1);
  });
 
  const transform = async payload => {
  const uaData = await axios.get(
    `http://useragentstring.com/?getJSON=all&uas=${encodeURIComponent(
      payload.ua
    )}`
  );
  return { ...uaData.data, ua: payload.ua, timestamp: payload.timestamp };
};
 
const load = async data => {
  return fs.appendFile(outputFile, JSON.stringify(data) + "\n");
};

const withSpan = async (span, fn) => {
  try {
    return await fn();
  } catch (e) {
    span.setTag(opentracing.Tags.ERROR, true).log({ error: e.toString() });
    throw e;
  } finally {
    span.finish();
  }
};

Now, let’s go through the changes one by one:

  • Tracer initialization happened as before, but with a different service name.
  • When consuming a message, we first extract the parent span context that was injected by the collector service.
  • We create an etl span, which uses the followsFrom reference type instead of being a child of the parent span. This reference type is typically used for async spans and ends after everything else finishes processing the message.
  • The withSpan helper function reduces duplication in calling async functions, tagging spans with any errors thrown and finishing the span after the completion of the async call.
  • You create a span for the transform and load functions and decorate them with the withSpan helper.

If you now rebuild and re-run your application and then make a request to track the endpoint, you’ll see the full trace from the HTTP request to the end of the ETL pipeline.

Jaeger UI trace view with spans across different services

Alternatives

While we used Jaeger in this blog post, there are many other distributed tracing systems that you can use. Some of the most well-known open-source ones are Zipkin and Elastic APM. With OpenTracing, you can freely switch between other systems without having to change more than your tracer initialization code. However, OpenTracing is not the only available library. OpenCensus is another popular option that, while somewhat different, also aims to solve the observability problem. Due to this, both projects have decided to join forces to give us OpenTelemetry, which is currently under heavy development.

If you’re looking to take advantage of distributed tracing but not have to deal with all the manual code annotations that we’ve used here, then you should consider Epsagon. In addition to getting automatic distributed tracing, users don’t have to manage or maintain any infrastructure. At any scale, not having to maintain Jaeger instances and Cassandra or Elasticsearch clusters to store tracing data is a massive benefit. You can get started for free with a simple five-minute setup.

Automated distributed tracing by Epsagon

Automated distributed tracing by Epsagon

Summary

In this article, you’ve learned how to set up distributed tracing through a message queue using an existing microservice application. While we focused on specific technologies, all of this knowledge is transferable to any other programming language or tracing system (commercial or otherwise).

If you’re using a microservice architecture, distributed tracing is essential. It allows you to monitor your application as a whole and quickly troubleshoot issues when something eventually goes wrong. Don’t wait until it’s too late.

Check out our latest blog posts:

Distribute Messages Between Java Microservices Using Kafka

Announcing: Epsagon, First Provider of Distributed Tracing for AWS AppSync

AWS EventBridge and Epsagon Automated Tracing Integrate

Distributed Tracing: the Right Framework and Getting Started