Self-note: Resume / recover a MongoDB change stream

Introduction

  • Sometimes replicator needs to be restarted
  • We cannot afford to lose one or two entries in time-series since it would throw the statistics off, and in exceptional cases, lost the max and min value

Resume & recover 

Change streams are resumable by specifying a resumeAfter token when opening the cursor. For the resumeAfter token, use the _id value of the change stream event document. Passing the _id value to the change stream attempts to resume notifications starting after the specified operation.

IMPORTANT

In the example below, resumeToken contains the change stream notification id. The resumeAfter takes a parameter that must resolve to a resume token. Passing the resumeToken to the resumeAfter modifier directs the change stream to attempt to resume notifications starting after the operation specified.

copy
copied

If the featureCompatibilityVersion (fcv) is set to "4.0" or greater, newly opened change streams return a hex-encoded string for the resume token data, i.e. the _id._data value. This change allows for the ability to compare and sort the resume tokens. If the fcv is 3.6, newly opened change streams return a BinData for the resume token data.

IMPORTANT

The fcv value at the time of the cursor’s opening determine the resume token data type. That is, the modification of the fcv does not affect the resume tokens for change streams already opened before the fcv change.

Regardless of the fcv value, a 4.0 replica set or a sharded cluster can resume a change stream using either the BinData or string resume token.

As such, a 4.0 deployment can use a resume token from a change stream opened on a collection from a 3.6 deployment.

Implementation

Structure of a Mongo change event

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "documentKey" : { "_id" : <ObjectId> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}

Pay attention to _id

Metadata related to the operation.

Use this document as a resumeToken for the resumeAfter parameter when resuming a change stream.

If the featureCompatibilityVersion (fcv) is set to "4.0" or greater, newly opened change streams return a hex-encoded string for the resume token data, i.e. the _id._data value. This change allows for the ability to compare and sort the resume tokens. If the fcv is 3.6, newly opened change streams return a BinData for the resume token data.

IMPORTANT

The fcv value at the time of the cursor’s opening determine the resume token data type. That is, the modification of the fcv does not affect the resume tokens for change streams already opened before the fcv change.

Regardless of the fcv value, a 4.0 replica set or a sharded cluster can resume a change stream using either the BinData or string resume token.

This field is BSON so it

  • Can’t be saved with JSON.stringify
  • Can’t be cast to ObjectID (wrong format, different than MongoDB documentation )

Solution: Include bson

const BSON = require(‘bson’);

function saveId(cb) {
if (lastId) {
let lastIdBuffer = bson.serialize(lastId);
fs.writeFile(ID_FILE, lastIdBuffer, (err) => {
if (err) {
logger.error(‘[saveId]’, err);
}
return cb && cb(err);
});
}
}



function loadId(cb) {
fs.readFile(ID_FILE, (err, data) => {
let buffer;
if (!err && data) {
buffer = bson.deserialize(data);
}
return cb && cb(err, buffer);
});
}

  • Every one second, write the latest _id to disk
  • Reload the _id object from disk on startup
  • Use it to resume the change stream with

const pipeline = [
{
$match: { ‘ns.db’: config.MONGO_DB_NAME },
}
];

let changeStreamOptions = {};

changeStreamOptions[‘resumeAfter’] = resumeToken;

const changeStream = db.watch(pipeline, changeStreamOptions);
changeStream.on(‘change’, (change) => {

}

Reference

db.watch(pipeline, options)

New in version 4.0: Requires featureCompatibilityVersion (fCV) set to "4.0" or greater. For more information on fCV, see setFeatureCompatibilityVersion.

Opens a change stream cursor for a database to report on all its non-system collections.

A sequence of one or more of the following aggregation stages:

See Aggregation for complete documentation on the aggregation framework.

Optional. Additional options that modify the behavior of db.watch().

You must pass an empty array [] to the pipeline parameter if you are not specifying a pipeline but are passing the options document.

The options document can contain the following fields and values:

Optional. Directs db.watch() to attempt resuming notifications starting after the operation specified in the resume token.

Each change stream event document includes a resume token as the _idfield. Pass the entire _id field of the change event document that represents the operation you want to resume after.

resumeAfter is mutually exclusive with startAtOperationTime.

Optional. By default, db.watch() returns the delta of those fields modified by an update operation, instead of the entire updated document.

Set fullDocument to "updateLookup" to direct db.watch() to look up the most current majority-committed version of the updated document. db.watch() returns a fullDocument field with the document lookup in addition to the updateDescription delta.

Optional. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.

Has the same functionality as cursor.batchSize().

Optional. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.

Defaults to 1000 milliseconds.

Optional. The starting point for the change stream. If the specified starting point is in the past, it must be in the time range of the oplog. To check the time range of the oplog, see rs.printReplicationInfo().

startAtOperationTime is mutually exclusive with resumeAfter.

SEE ALSO

db.collection.watch() and Mongo.watch()

To generate a new ObjectId, use ObjectId() with no argument:

copy
copied

In this example, the value of x would be:

copy
copied

To generate a new ObjectId using ObjectId() with a unique hexadecimal string:

copy
copied

In this example, the value of y would be:

copy
copied

Access the str attribute of an ObjectId() object, as follows:

copy
copied

This operation will return the following hexadecimal string:

copy
copied

Create a new ObjectID instance

class ObjectID()Arguments:id (string) – Can be a 24 byte hex string, 12 byte binary string or a Number.Returns:object instance of ObjectID.

Return the ObjectID id as a 24 byte hex string representation

toHexString()Returns:string return the 24 byte hex string representation.

Examples

Generate a 24 character hex string representation of the ObjectID

How to connect to MySQL / MariaDB using Node.JS (the right way)

Use a connection pool. It helps

  • Conserve resource, connections got recycled
  • Better reliability: it automatically reconnects when there’s a problem

How? Simple, instead of creating a connection, just create a pool. It’s designed as a drop in replacement for client.query()

var mysql = require('mysql');
var pool  = mysql.createPool({
  connectionLimit : 10,
  host            : 'example.org',
  user            : 'bob',
  password        : 'secret',
  database        : 'my_db'
});

pool.query('SELECT 1 + 1 AS solution', function (error, results, fields) {
  if (error) throw error;
  console.log('The solution is: ', results[0].solution);
});

is a shorthand for

var mysql = require('mysql');
var pool  = mysql.createPool(...);

pool.getConnection(function(err, connection) {
  if (err) throw err; // not connected!

  // Use the connection
  connection.query('SELECT something FROM sometable', function (error, results, fields) {
    // When done with the connection, release it.
    connection.release();

    // Handle error after the release.
    if (error) throw error;

    // Don't use the connection here, it has been returned to the pool.
  });
});

How to install Node.JS

“Isn’t it just simply googling ‘how to install Node'” you asked? Yes it isn’t.

  • The default Node installer will require administrator privilege
  • The node process will require admin privilege also if installed that way (which is a bad idea: every time you need to update node you must sudo)
  • You can’t easily switch node version if you work with multiple projects

Introducing nvm (node version manager), a script that installs node in your local user directory, requiring no sudo while compromising none of the quality

How to use it? According to Zoltan:

On Mac

The best way to install Node.js on Mac is nvm.

https://github.com/creationix/nvm

You have to have on your Mac the Command Line Tools. Or you install the full XCode from App Store either just use the small Command Line Tools installer:

$ xcode-select --install

(If you’ve just installed XCode, don’t forget to launch it first and accepting the Terms and Conditions.)

You can use the install script for nvm installation.

$ curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.33.11/install.sh | bash

However, I would encourage you to use the manual installation process. Nothing special there. Firstly, you just clone the whole repo in a subfolder in your home directory. (~/.nvm) Secondly, you add two extra lines to your console script.

Please follow these steps on NVM Readme: https://github.com/creationix/nvm#git-install

You have to relaunch your Terminals. Maybe you have to log out and log back to activate the new settings.

List your installed node versions:

$ nvm list

List the available node versions in the cloud:

$ nvm ls-remote

You can use the combination of this two commands to see only the last 9 lines from the huge list of versions: $ nvm ls-remote | tail -n9

It is safe if you choose one of the most recent LTS (long time support) version and install it with the following command:

$ nvm install 10.3.0

Setup this version as the default.

$ nvm use 10.3.0
$ nvm alias default 10.3.0

Check your node version with

$ node -v

You should see v10.3.0 if you installed the above version.

You can update your npm to the latest.

$ npm install -g npm

After the update, the npm version, npm -v, should be at least 6.1.0 or above.

A little extra tip. Remember for the following command because it simplifies the update process. 😉

Let’s say, you would like to stay on the stable, LTS version and you would like to keep all the global package what you’ve already installed. Here is the solution:

$ nvm install 8 --reinstall-packages-from=8 --latest-npm

It updates your Node.js version to the latest version 8 and install the latest npm, plus it setup all your previously installed global packages.

Alternatives for installing Node.js, but not suggested:

On Linux

Please avoid to install Node.js with apt-get on Ubuntu. If you already installed Node.js with the built in package manager, please remove that. (sudo apt-get purge nodejs && sudo apt-get autoremove && sudo apt-get autoclean)

The installation process on Linux is the same as on OSX.

With the provided script:

$ curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.33.11/install.sh | bash

(Please read the instructions under OSX section.)

$ nvm list
$ nvm ls-remote
$ nvm install 10.3.0
$ nvm use 10.3.0
$ nvm alias default 10.3.0
$ node -v
$ npm install -g npm
$ npm -v

One more thing! Don’t forget to run the following command, which increases the amount of inotify watches.

$ echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf && sudo sysctl -p

On Windows

On Windows, if you don’t need more version from Node.js, you can use the official installer.

Install also Git for Windows.

Additionally, don’t forget to read this instruction, which is very interesting not just for Ember developers, but for everybody who uses Node.js on Windows.

Plus install and run ember-cli-windows

$ npm install -g ember-cli-windows
$ ember-cli-windows

More here: https://github.com/felixrieseberg/ember-cli-windows

Always run your PowerShell or CMD.exe as Administrator.

Don’t forget to run these two commands in PowerShell (as Administrator):

$ Set-ExecutionPolicy Unrestricted -scope Process
$ ember-cli-windows

Log out and log back in Windows.

Try to upgrade npm and after install the latest ember-cli

$ npm install -g npm
$ npm install -g ember-cli

I would suggest, experiment with different shells. Which worked better for you? PowerShell, Git Shell, the original CMD.exe? Please, share your Windows experiment in a comment.

About mongo’s useNewUrlParser warning

Recently I got this warning when trying to create a new MongoDB application

(node:28962) DeprecationWarning: current URL string parser is deprecated, and will be removed in a future version. To use the new parser, pass option { useNewUrlParser: true } to MongoClient.connect.

Turns out I’m the first few to use the new MongoDB driver released on npm over the weekend.

The new URL parser has no big changes (yet). Currently it only forces you to put the port into the URL

You can resolve the above warning by changing from

MongoClient.connect("mongodb://localhost:27017")

to

MongoClient.connect("mongodb://localhost:27017", { useNewUrlParser: true })

Solving MongoDB timeout

Sometimes you may encounter an error like this

While it’s easy to blame it on an overloaded server, there may be other reasons like

  • You changed something in your database access code
  • You are using a specific version of Mongoose (4.7.1 is known to have problems)
  • Your network condition isn’t good

You can try to set connectionTimeoutMS and socketTimeoutMS to ‘0’, hoping it to never time out, but ‘0’ doesn’t mean never with Mongo (from the FAQ)

The special meaning of 0

Setting connectTimeoutMS and socketTimeoutMS to the value 0 has a special meaning. On the face of it, it means never timeout. However this is a truth with some modifications. Setting it to 0 actually means apply the operating system default socket timeout value.

maxTimeMS is the option you are looking for

Most people try to set a low socketTimeoutMS value to abort server operations. As we have proved above this does not work. To work correctly you want to use the maxTimeMS setting on server operations. This will make MongoDB itself abort the operation if it runs for more than maxTimeMS milliseconds. A simple example is below performing a find operation.

// Execute a find command
col.find({“$where”: “sleep(100) || true”})
 .maxTimeMS(50)
 .count(function(err, count) {
});

So what you can do is:

Increase the timeout value (connectTimeoutMS, socketTimeoutMS)

[cc]
server: {
socketOptions: {
connectTimeoutMS: 60000,
socketTimeoutMS: 60000,
keepAlive: 1
},
[/cc]

Note that those two values’ meaning are

  • 0: default socket timeout (20 secs for Linux)
  • 30s: default for Mongo DB
  • 60s: longer timeout for our case

Add keep alive to the connection

It’s a safeguard against timeout. From Mongoose docs

A note about keepAlive

For long running applications, it is often prudent to enable keepAlive with a number of milliseconds. Without it, after some period of time you may start to see “connection closed” errors for what seems like no reason. If so, after reading this, you may decide to enable keepAlive:

options.server.socketOptions = options.replset.socketOptions = { keepAlive: 120 };
mongoose.connect(uri, options);

Hope this can help someone with the same question 🙂