Distributed Systems With Node.js: Part 4 Observability
Introduction
In this series, I will be working through this book on Distributed Systems using Node.js. The book is quite large and in depth, but in these articles I will be distilling the big picture pieces of the book into bite size tutorials and walk throughs.
In this section, we will be figuring out how to monitor our applications, creating searchable logs and slick dashboards with an ELK stack, tracking metrics and server heartbeats with Graphite & Grafana and tracing our requests throughout our micro-services with Zipkin.
The code for this demo is found in this repo at the commit here
Logging
Logging is perhaps the most important thing to do in your system for making bug tracking easy. In a local environment, you may use console.log or print, etc… However, once our application is in prod, it is difficult for us to access the terminal to view our logs, also maintaining the logs only on the running instance means that when the instance goes down we lose all our logs — which would not be great if we are trying to figure out why our instance went down. The ELK stack is a great methodology for capturing and viewing logs in distributed systems.
ELK
ELK stands for Elasticsearch, Logstash and Kibana. Our application will dump logs to Logstash which will then flow into Elasticsearch for long term persistence and we will use Kibana’s UI to create dashboards and run queries against the Elasticsearch data.
Application -> Logstash
The first step to creating our ELK stack is to get data from the Application, we send logs over UDP since it is not as compute intensive as sending messages over TCP and we don’t mind if a log or two gets dropped as dumping logs to Logstash should not be out application’s main concern.
For the purposes of learning, we built our own Logstash module — but you would be better off using an established library for production code.
const client = require('dgram').createSocket('udp4');
const host = require('os').hostname();
const [LS_HOST, LS_PORT] = process.env.LOGSTASH.split(':');
const NODE_ENV = process.env.NODE_ENV;
module.exports = function (severity, type, fields) {
const payload = JSON.stringify({
'@timestamp': (new Date()).toISOString(),
'@version': 1,
app: 'web-api',
environment: NODE_ENV,
severity: severity,
type: type,
fields: fields,
host: host
});
console.log(payload);
client.send(payload, LS_PORT, LS_HOST);
}
We have a single function which takes in our severity and fields from the caller and adds some extra metadata that we can query against once the data flows into Elasticsearch. We then send this payload from our application to our Logstash instance over UDP.
#!/usr/bin/env node
const server = require('fastify')();
const fetch = require('node-fetch');
const HOST = process.env.HOST || '127.0.0.1';
const PORT = process.env.PORT || 3000;
const TARGET = process.env.TARGET || '127.0.0.1:4000';
const log = require("./logstash");
(async () => {
await server.register(require('middie'));
server.use((req, res, next) => {
log('info', 'request-incoming', {
path: req.url,
method: req.method,
ip: req.ip,
ua: req.headers['user-agent'] || null
});
next();
});
server.setErrorHandler(async (error, req) => {
log('error', 'request-failure', {
stack: error.stack,
path: req.url,
method: req.method
});
return {error: error.message}
});
server.get('/', async () => {
const url = `http://${TARGET}/recipes/42`;
log('info', 'request-outgoing', {url, svc: 'recipe-api'});
const res = await fetch(url);
const producer_data = await res.json();
return {
consumer_pid: process.pid,
producer_data: producer_data
};
});
server.get('/error', async () => {throw new Error('oh no')})
server.listen(PORT, HOST, () => {
log('verbose', 'listen', {host: HOST, port: PORT});
})
})();
We have added a few logs across our application in different severities. We also set up a middle-ware for our service which gets invoked on EVERY incoming request, this is great so we don’t have to remember to call a function at the beginning of each of our route handlers and the server automatically pipes our requests through the middleware before routing to the correct handler.
Logstash -> ElasticSearch
Our ELK stack comes in a pre-built Docker image for us. In a production setup, we might run these processes on separate host machines but this all in one image approach works for now!
input {
udp {
id => "nodejs_udp_logs"
port => 7777
codec => json
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
document_type => "nodeLog"
manage_template => false
index => "nodejs-%{+YYYY.MM.dd}"
}
}
The Docker image utilizes a .conf file to tell Logstash to listen for json messages over UDP on port 7777. When it gets messages, it will persist them onto Elasticsearch on port 9200. By using Logstash as a middle man, we can quickly dump logs over UDP and then allow Logstash to handle persisting them to Elasticsearch over TCP on a whole other machine. This saves valuable CPU resources on our application machine since it doesn’t need to directly communicate with Elasticsearch.
Elasticsearch -> Kibana
Kibana is a nice visualizer and dashboard builder for our data that is stored in Elasticsearch. We expose access to the UI via port 5601 on our pre-built Docker image.
Summary
- We store logs quickly from our app over UDP which is faster than TCP but not guaranteed to succeed
- Logstash persists data into Elasticsearch
- Using Elasticsearch, we can build useful dashboards in Kibana
Metrics
To track our metrics, we will be using a different stack of statsD for simple protocol storing of logs (this is similar to Logstash’s role in ELK), Graphite stores our metrics data in a hierarchal structure (taking on the role of Elasticsearch) and Grafana lets us build cool dashboards.
#!/usr/bin/env node
const server = require('fastify')();
const fetch = require('node-fetch');
const HOST = process.env.HOST || '127.0.0.1';
const PORT = process.env.PORT || 3000;
const TARGET = process.env.TARGET || '127.0.0.1:4000';
const SDC = require('statsd-client');
const statsd = new SDC({host: '127.0.0.1', port: 8125, prefix: 'web-api'});
(async () => {
await server.register(require('middie'));
server.use(statsd.helpers.getExpressMiddleware('inbound', {timeByUrl: true}));
server.get('/', async () => {
const begin = new Date();
const res = await fetch(`http://${TARGET}/recipes/42`);
statsd.timing('outbound.recipe-api.request-time', begin);
statsd.increment('outbound.recipe-api.request-count', 1);
const producer_data = await res.json();
return {
consumer_pid: process.pid,
producer_data: producer_data
};
});
server.get('/error', async () => {throw new Error('oh no')})
server.listen(PORT, HOST, () => {
console.log(`Consumer running at http://${HOST}:${PORT}`)
})
})();
const v8 = require('v8');
const fs = require('fs');
setInterval(() => {
statsd.gauge('server.conn', server.server._connections);
const m = process.memoryUsage();
statsd.gauge('server.memory.used', m.heapUsed);
statsd.gauge('server.memory.limit', m.heapTotal);
const h = v8.getHeapStatistics();
statsd.gauge('server.heap.size', h.used_heap_size);
statsd.gauge('server.heap.limit', h.heap_size_limit);
fs.readdir('/proc/self/fd', (err, list) => {
if (err) return;
statsd.gauge('server.descriptors', list.length);
});
const begin = new Date();
setTimeout(() => {statsd.timing('eventLag', begin);}, 0);
}, 10_000)
We track a couple business metrics around request counts and latency using statsD’s provided middleware. We also track metrics on the health of our Node JS process itself, including things like the time it takes for our eventLoop to fully run, our heap size and any open file pointers.
Our tracked metrics are hierarchal so we can build Dashboards to display metrics like myService.latency.*.p90 to see all enpoint P90 latency, this allows us to build highly focused dashboards with ease and miniaml configuration changes. We can also use Grafana to set up alerts to email us if some metric breaches a certain pre-defined threshold.
Summary
- Graphite allows us to track hierarchal metrics
- It is a good idea to capture business metrics as well as instance health metrics
Request Tracing
In a distributed system, services often call other services creating a fanning out tree pattern of requests. It is often a manual and tedious effort to associate a request in one system to another and trace the full lifecycle of the original request. There are services that exist to ameliorate this problem, Zipkin is one such service we will explore. We will be using zipkin-lite which makes us configure more things manually than zipkin would (but it teaches us more about the internals of the process so it is great to use as a learning resource).
Initiator of Request
#!/usr/bin/env node
const server = require('fastify')();
const fetch = require('node-fetch');
const HOST = process.env.HOST || '127.0.0.1';
const PORT = process.env.PORT || 3000;
const TARGET = process.env.TARGET || '127.0.0.1:4000';
const ZIPKIN = process.env.TARGET || '127.0.0.1:9411';
const Zipkin = require('zipkin-lite');
const zipkin = new Zipkin({
zipkinHost: ZIPKIN,
serviceName: 'web-api',
servicePort: PORT,
serviceIp: HOST,
init: 'short'
});
server.addHook('onRequest', zipkin.onRequest());
server.addHook('onResponse', zipkin.onResponse());
server.get('/', async (req) => {
req.zipkin.setName('get_root');
const url = `http://${TARGET}/recipes/42`;
const zreq = req.zipkin.prepare();
const res = await fetch(url, {headers: zreq.headers});
zreq.complete('GET', url);
const producer_data = await res.json();
return {
consumer_pid: process.pid,
producer_data: producer_data,
trace: req.zipkin.trace
};
});
server.listen(PORT, HOST, () => {
console.log(`Consumer running at http://${HOST}:${PORT}`)
})
With Zipkin, every request we receive and response we provide also involves making a request to the running Zipkin instance to notify it of the request and response. This allows Zipkin to do things like differentiate between server and client perceived latency.
Since this API is the entry point to the public for our system, we include the init field when we create our Zipkin object. Zipkin hooks into our service requests and responses using addHooks and provides us headers to pass to upstream services which will be used to trace the request throughout our system.
Upstream Source
#!/usr/bin/env node
const server = require('fastify')();
const HOST = process.env.HOST || '127.0.0.1';
const PORT = process.env.PORT || 4000;
const ZIPKIN = process.env.TARGET || '127.0.0.1:9411';
const Zipkin = require('zipkin-lite');
const zipkin = new Zipkin({
zipkinHost: ZIPKIN,
serviceName: 'recipe-api',
servicePort: PORT,
serviceIp: HOST
});
server.addHook('onRequest', zipkin.onRequest());
server.addHook('onResponse', zipkin.onResponse());
console.log(`worker pid=${process.pid}`);
server.get('/recipes/:id', async (req, res) => {
console.log(`worker request pid=${process.pid}`);
req.zipkin.setName('get_recipe');
const id = Number(req.params.id);
if (id !== 42) {
res.statusCode = 404;
return { error: 'not_found' }
}
return {
producer_pid: process.pid,
recipe: {
id: id,
name: "Chicken Tikka Masala",
steps: "Throw it in a pot...",
ingredients: [
{id:1, name: "Chicken", quantity: "1 lb"},
{id:2, name: "Sauce", quantity: "2 cups"},
]
}
}
});
server.listen(PORT, HOST, () => {
console.log(`Producer running at http://${HOST}:${PORT}`);
})
Since this is not the entry point for our service, we will not use init in our Zipkin object. We have the same hooks as the entry point, but since this is the end of our request route, we do not need to surround any calls with zipkin.prepare and zipkin.complete.
Now, we can go to the Zipkin service running on localhost to visualize our requests in a flame chart like view or a micro-service dependency graph.
There are tons of logging, and metrics solutions at all major cloud providers, however system tracing is not something I have seen and would be a very nice addition to any project that has a complex, distributed setup.
Summary
- Tracing and associating requests with one another throughout a distributed system is quite difficult
- Zipkin uses headers and hooking extra calls to the Zipkin service in order to allow us to trace and associate requests
Conclusion
In this section, we explored how to production-ize logging, metrics tracking and request tracing. A lot of the tools mentioned are replaceable but the general ideas behind a quick stash of metrics/logs by the application instance and a separate machine doing the indexing and persisting is a good pattern to follow regardless of the tools used. The request tracing was new to me and is a very cool feature that I will certainly include in my next projects — if they ever reach a scale where such complexity requires this kind of tracing. In the next section, we will be looking into containers for our apps, specifically using Docker.