Distributed Systems With Node.js: Part 7 Resilience

Matthew MacFarquhar
11 min readMay 7, 2024

Introduction

In this series, I will be working through this book on Distributed Systems using Node.js. The book is quite large and in depth, but in these articles I will be distilling the big picture pieces of the book into bite size tutorials and walk throughs (for those following along in the book, this is actually chapter 8).

In this section, we will learn about how to deal with failures in our system to make our application more resilient. We will make our application stateless and idempotent (so it can be retried) and talk about retrying calling and connection to upstream services.

The code for this demo is found in this repo at the commit here

Things can go wrong during business logic execution, some of these issues can be well tested and planned for in code (like a lost DB connection or a failing upstream service). However, we can not always ensure our resilience efforts will catch every issue which may cause a process to crash — some can be completely out of our hands like a natural disaster or someone tripping over a power cord in a data center.

This unpredictability is why we should strive to make API calls…

idempotent — meaning they can be retried as many times as we like and have the same result on our system state as if it was only done once.

stateless — so there is no issue of losing data if we have some runtime error in our code.

Caching

Caching is useful when our application needs data that would normally require some extra computation to retrieve. We store the request-response pair as a key-value in a cache to avoid that extra computation and say that the last value we computed is good enough to re-use.

In Memory Cache

The most basic form of a cache is a local map, but it is a little bit of an anti -pattern since maps can bloat if not cleaned up properly and lead to a memory leak in your service.

If you want to use an in process cache, you are better off using an established library which will automatically take care of the clean up logic for you.

#!/usr/bin/env node

const fetch = require('node-fetch');
const server = require('fastify')();
const lru = new (require('lru-cache'))({
max: 4096,
length: (payload, key) => payload.length + key.length,
maxAge: 10 * 60 * 1_000
});
const PORT = process.env.PORT || 3000;

server.get('/account/:account', async (req, res) => {
return getAccount(req.params.account);
})
server.listen(PORT, () => console.log(`http://localhost:${PORT}`));

async function getAccount(account) {
const cached = lru.get(account);
if (cached) {
console.log('cache hit');
return JSON.parse(cached);
}
console.log('cache miss');
const result = await fetch(`https://api.github.com/users/${account}`);
const body = await result.text();
lru.set(account, body);
return JSON.parse(body);
}

Our in process cache specifies a max size, once we reach that size the Least Recently Used item will be removed from the cache to make way for new item. This example makes requests to Github’s users api to get user info, and will short circuit the request if we have the information in the cache.

This is fine for a single process prototype perhaps, however there are two main issues here for production applications.

  1. The service is stateful and the cache only exists in memory, so if our process exits for some reason, we lose our cache.
  2. The cache only exists on a single process, if we had multiple worker nodes behind this url, they wouldn’t be able to share cached information.

Distributed Cache

Let’s move to the more production-ized solution of running and using memcached on a separate process (in production you would even run a memcached cluster on separate machines).

We can spin up a memcached image on port 11211 with this docker command. We have given it 64MB of memory to store data in.

docker run --name distnode-memcached -p 11211:11211 -it --rm memcached:1.6-alpine memcached -m 64 -vv
const fetch = require('node-fetch');
const server = require('fastify')();
const memcache = require('memjs').Client.create('localhost:11211');
const PORT = process.env.PORT || 3000;

server.get('/account/:account', async (req, res) => {
return getAccount(req.params.account);
})
server.listen(PORT, () => console.log(`http://localhost:${PORT}`));

async function getAccount(account) {
const {value: cached} = await memcache.get(account);
if (cached) {
console.log('cache hit');
return JSON.parse(cached);
}
console.log('cache miss');
const result = await fetch(`https://api.github.com/users/${account}`);
const body = await result.text();
await memcache.set(account, body, {});
return JSON.parse(body);
}

Now, our code looks like this, where instead of using the lru-cache functions to get and set values, we are making network requests to memcahed. This is slower than a pure in memory cache, however the cache can now out live a single service process and can be shared among multiple service workers. With this caching approach, we can remove state from our services while still maintaining a good speed that we could not achieve if we always defaulted to making the API request or querying a Database for information.

Versioning Cache Entries

When caching it is also important to version cache data to account for data structure migrations, perhaps the structure of users is changing from v1 to v2 where we are now adding date of birth. If we don’t have versions in our cache key names, we will be stuck with the v1 structure even though we may want v2 and it is available. A good idea is to correlate cache versions with schema changes, so we would name our first cache key ‘user-12312-v1’ and we would have our next version cache key be ‘user-12312-v2’. This way we can cache both versions (eventually all entries will be v2 as the dial up to v2 completes system wide).

Summary

  • In memory caches are faster than distributed ones, but are lost if the node process dies and cannot be shared among workers in a distributed setting
  • Using a cache server like memcached offers long living, shared caching among multiple worker nodes allowing us to remove some state from our application’s process
  • It is a good idea to add some version to your cache keys to account for any schema changes in data items which may occur

Database Resilience

When our cache misses, we need to go hit the source of truth (a postgres database in this case). In this section, we will discuss what to do to make our database connections as resilient as possible with reconnections and pooling. We will also touch on how to effectively migrate schema changes so as not to cause an outage.

Reconnecting

If our database is down, our application should be considered down and unhealthy. In a distributed system, if a service is unhealthy for a sustained amount of time, a good reverse proxy may take the process down and re-start it. We could try to reconnect to the database, so that we can recover and become healthy again.

Let’s first spin up our postgres instance using Docker

docker run --name distnode-postgres -it --rm -p 5432:5432 -e POSTGRES_PASSWORD=password -e POSTGRES_USER=user -e POSTGRES_DB=dbconn postgres:12.3

We can now configure a db module our application will use to interface with postgres.

const {Client} = require('pg');
const {EventEmitter} = require('events');

class DatabaseReconnection extends EventEmitter {
#client = null;
#conn = null;
#kill = false;
connected = false;

constructor(conn) {
super();
this.#conn = conn;
}

connect() {
if (this.#client) {
this.#client.end();
}
if (this.#kill) {
return;
}

const client = new Client(this.#conn);

client.on('error', (err) => this.emit('error', err));

client.once('end', () => {
if (this.connected) {
this.emit('disconnect');
this.connected = false;
}
if (this.#kill) {
return;
}
setTimeout(() => this.connect(), this.#conn.retry || 1_000);
});

client.connect((err) => {
this.connected = !err;
if (!err) {
this.emit('connect');
}
});
this.#client = client;
this.emit('reconnect');
}

async query(q, p) {
if (this.#kill || !this.connected) {
throw new Error('disconnected');
}
return this.#client.query(q,p);
}

disconnect() {
this.#kill = true;
this.#client.end();
}
}

module.exports = DatabaseReconnection;

Our database client is an event emitter allowing it to tell consumers of the states it may be going through.

To query our database, we create a function which forwards a query to the postgres client if we are connected.

In our connect logic, we establish a connection to postgres, but we also add a listener on the client disconnection event — ‘end’.

When the client connection ends, and it is not done intentionally (indicated by the kill state) we automatically try to reconnect. Now, even if we have some transient de-connection causing our application to be temporarily unhealthy, we still may be able to recover and re-connect to our database.

Our application code will look like this

#!/usr/bin/env node

const DatabaseReconnection = require('./db.js');
const db = new DatabaseReconnection({
host: 'localhost',
port: 5432,
user: 'user',
password: 'password',
database: 'dbconn',
retry: 1_000
});
db.connect();
db.on('error', (err) => console.error('db error', err.message));
db.on('reconnect', () => console.log('reconnecting...'));
db.on('connect', () => console.log('connect'));
db.on('disconnect', () => console.log('disconnected'));

const server = require('fastify')();

server.get('/foo/:foo_id', async (req, res) => {
try {
let dbRes = await db.query('SELECT NOW() AS time, $1 AS echo', [req.params.foo_id]);
return dbRes.rows[0];
} catch (e) {
res.statusCode = 503;
return e;
}
});

server.get('/health', async (req, res) => {
if (!db.connected) {
throw new Error('no db connection');
}
return 'OK';
});

server.listen(3000, () => console.log('localhost:3000'));

From here, we can take action on the different events our DatabaseReconnection sends out — in our case we will just be logging stuff. We also configure our health check to check if we are connected to the database when sending back a health check. This endpoint could be used by HAProxy or nginx to determine if this worker process is still healthy enough to receive traffic.

Database Pooling

Another option to deal with resilience in database connections is to create a database pool for our process. The database pool will contain multiple open connections to the database, allowing us to grab a live one when we need to do a query. This option would mean that we will always — baring a catastrophic database failure– have a connection to the database. It also allows us to parallelize queries since we can send each query through a separate connection.

const { Pool } = require('pg');
const db = new Pool({
host: 'localhost',
port: 5432,
user: 'user',
password: 'password',
database: 'dbconn',
max: 10
});
db.connect();

(async () => {
const start = Date.now();
await Promise.all(
[
db.query("SELECT pg_sleep(2);"),
db.query("SELECT pg_sleep(2);")
]);
console.log(`took ${(Date.now() - start) / 1000} seconds`);
db.end();
})();

The one thing to be careful about with the pool approach is that databases like postgres have a maximum number of connections they can keep open at any one time, allowing a single process to take up many of these connections instead of just one will force you to scale your database’s power up faster than you would if each process only kept a single connection at a time.

Schema Migrations

The last database related topic we touch upon for resilience is schema migration. SQL is heavily dependent on maintaining a standard structure across all entries in a database, at times table structures need to be changed and it is much easier to have this managed via code instead of being preformed ad hoc by some database systems admin.

The book goes through using Knex to manage creating our migrations which we can use to build up and roll back our schema as needed.

The Knex tool will take care of creating the structures for you to write your migration logic, each migration consists of an up and a down.

module.exports.up = async (knex) => {
await knex.schema.createTable('users', (table) => {
table.increments('id').unsigned().primary();
table.string('username', 24).unique().notNullable()
});

await knex('users')
.insert([
{username: 'mattmacf'},
{username: 'steve'},
{username: 'bob'}
])
};

module.exports.down = async (knex) => {
knex.schema.dropTable('users');
};

This up will create a users table and then add some fake users, the down simply drops the table.

We can also write our ups and downs using pure SQL instead of the Knex functions.

module.exports.up = async (knex) => {
await knex.raw(`CREATE TABLE groups (id SERIAL PRIMARY KEY, name VARCHAR(24) UNIQUE NOT NULL)`);
await knex.raw(`INSERT INTO groups (id, name) VALUES (1, 'Basic'), (2, 'Mods'), (3, 'Admins')`);
await knex.raw(`ALTER TABLE users ADD COLUMN group_id INTEGER NOT NULL REFERENCES groups (id) DEFAULT 1`);
};

module.exports.down = async (knex) => {
await knex.raw(`ALTER TABLE users DROP COLUMN group_id`);
await knex.raw(`DROP TABLE groups`);
};

This one creates a groups table and adds a group_id column to our users table.

The book also talks about a good approach to deploy a complex migration which alters previous business logic alongside application changes using an example of changing the firstName, lastName fields into a single fullName field.

  1. Starting the Transition: Run the migration to add the fullName column and start populating it with live data as it goes through your application and stop populating first+last. When retrieving data, try to use fullName but fall back to first+last if not present.
  2. Backfill: Run an update to hydrate the fullName field with the values from first and last name (if fullName is null).
  3. Complete the Transition: Run a migration to remove the first and last name columns, update application logic to not fallback to first+last name anymore.

addition of schema element -> application uses new schema element but falls back to old -> backfill new schema element -> remove old schema element -> remove fallback to old in application

Summary

  • We can make database connections resilient by adding re-connection functionality or using a database pool
  • Schema migrations are an important topic in managing SQL data and changes should be committed to code
  • We should consider our connection to a critical database when informing a reverse proxy of our process’s health

Up Stream Services

Another issue that may arise during our application’s lifecycle is that an upstream service may fail. We must decide how to deal with a broken dependent service so that our application can still function in a degraded manner (if possible).

When to Retry

We want to ensure we only retry if it will not break the system state (i.e. we are calling an idempotent API) or we know the server has not received our message due to ECONNREFUSED or ENOTFOUND errors.

In general, the GET, PUT and DELETE functionalities of a service should be idempotent and should allow you to re-run them with creating system state issues. POST is usually not idempotent and may lead to errors if we are unable to determine for sure that the upstream service has committed our POST or not (this could occur because of some 5xx issue on the server side or us loosing connection because of an ECONNRESET and not getting back the response).

Exponential Backoff

When we are retrying a request to a downed upstream service, it is usually a best practice to add some extra time between each of our requests. Sending requests at intervals 100ms | 200ms | 300 ms | 500 ms | 500ms will give the service more time to recover than if we barraged it with 5 retries only spaced apart by 100 ms each.

Circuit Breaker

Sometimes catastrophic failures happen and the upstream service may not be able to recover by itself in a timely manner. A circuit breaker is a rule in our application code that says “if upstream service X sends five 5xx errors within 30 seconds, consider it down and short circuit calling this upstream service.” If the upstream service is not completely necessary for the application to run then this option may help reduce our application’s perceived latency by avoiding all the retries on the downed upstream service.

Chaos

Chaotic issues that may occur during a node execution are hard and sometimes even impossible to account for. Therefore, we must strive to make our applications resilient to these kinds of failures. To test our resilience, a common approach is to add manual chaos to non prod stages.

We may do this by adding some functionality that only gets called in non prod environments to do things like…

  • Call a function that has a 1/100 chance of disconnecting our database connection to ensure we properly re-connect
  • Set some random time in the future where our process will exit using setTimeout to ensure our reverse proxy is able to re-start downed worker processes effectively

Summary

  • Only retry if you know that subsequent calls will not cause issues in the application’s state
  • Use exponential backoff when preforming retries, and if a service is down for a sustained amount of time, use a circuit breaker to skip calling it
  • Adding manual chaos to your non prod deployments is a good way to stress test how resilient your application is

Conclusion

In this section, we explored how process failures are sometimes unpredictable and the necessity for building our applications in a way that they do their best to survive unpredictable failures. We touched on making our services idempotent and stateless, recovering from database disconnections and dealing with flaky upstream services.

--

--

Matthew MacFarquhar

I am a software engineer working for Amazon living in SF/NYC.