Self-note: Resume / recover a MongoDB change stream

Introduction

  • Sometimes replicator needs to be restarted
  • We cannot afford to lose one or two entries in time-series since it would throw the statistics off, and in exceptional cases, lost the max and min value

Resume & recover 

Change streams are resumable by specifying a resumeAfter token when opening the cursor. For the resumeAfter token, use the _id value of the change stream event document. Passing the _id value to the change stream attempts to resume notifications starting after the specified operation.

IMPORTANT

In the example below, resumeToken contains the change stream notification id. The resumeAfter takes a parameter that must resolve to a resume token. Passing the resumeToken to the resumeAfter modifier directs the change stream to attempt to resume notifications starting after the operation specified.

copy
copied

If the featureCompatibilityVersion (fcv) is set to "4.0" or greater, newly opened change streams return a hex-encoded string for the resume token data, i.e. the _id._data value. This change allows for the ability to compare and sort the resume tokens. If the fcv is 3.6, newly opened change streams return a BinData for the resume token data.

IMPORTANT

The fcv value at the time of the cursor’s opening determine the resume token data type. That is, the modification of the fcv does not affect the resume tokens for change streams already opened before the fcv change.

Regardless of the fcv value, a 4.0 replica set or a sharded cluster can resume a change stream using either the BinData or string resume token.

As such, a 4.0 deployment can use a resume token from a change stream opened on a collection from a 3.6 deployment.

Implementation

Structure of a Mongo change event

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "documentKey" : { "_id" : <ObjectId> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}

Pay attention to _id

Metadata related to the operation.

Use this document as a resumeToken for the resumeAfter parameter when resuming a change stream.

If the featureCompatibilityVersion (fcv) is set to "4.0" or greater, newly opened change streams return a hex-encoded string for the resume token data, i.e. the _id._data value. This change allows for the ability to compare and sort the resume tokens. If the fcv is 3.6, newly opened change streams return a BinData for the resume token data.

IMPORTANT

The fcv value at the time of the cursor’s opening determine the resume token data type. That is, the modification of the fcv does not affect the resume tokens for change streams already opened before the fcv change.

Regardless of the fcv value, a 4.0 replica set or a sharded cluster can resume a change stream using either the BinData or string resume token.

This field is BSON so it

  • Can’t be saved with JSON.stringify
  • Can’t be cast to ObjectID (wrong format, different than MongoDB documentation )

Solution: Include bson

const BSON = require(‘bson’);

function saveId(cb) {
if (lastId) {
let lastIdBuffer = bson.serialize(lastId);
fs.writeFile(ID_FILE, lastIdBuffer, (err) => {
if (err) {
logger.error(‘[saveId]’, err);
}
return cb && cb(err);
});
}
}



function loadId(cb) {
fs.readFile(ID_FILE, (err, data) => {
let buffer;
if (!err && data) {
buffer = bson.deserialize(data);
}
return cb && cb(err, buffer);
});
}

  • Every one second, write the latest _id to disk
  • Reload the _id object from disk on startup
  • Use it to resume the change stream with

const pipeline = [
{
$match: { ‘ns.db’: config.MONGO_DB_NAME },
}
];

let changeStreamOptions = {};

changeStreamOptions[‘resumeAfter’] = resumeToken;

const changeStream = db.watch(pipeline, changeStreamOptions);
changeStream.on(‘change’, (change) => {

}

Reference

db.watch(pipeline, options)

New in version 4.0: Requires featureCompatibilityVersion (fCV) set to "4.0" or greater. For more information on fCV, see setFeatureCompatibilityVersion.

Opens a change stream cursor for a database to report on all its non-system collections.

A sequence of one or more of the following aggregation stages:

See Aggregation for complete documentation on the aggregation framework.

Optional. Additional options that modify the behavior of db.watch().

You must pass an empty array [] to the pipeline parameter if you are not specifying a pipeline but are passing the options document.

The options document can contain the following fields and values:

Optional. Directs db.watch() to attempt resuming notifications starting after the operation specified in the resume token.

Each change stream event document includes a resume token as the _idfield. Pass the entire _id field of the change event document that represents the operation you want to resume after.

resumeAfter is mutually exclusive with startAtOperationTime.

Optional. By default, db.watch() returns the delta of those fields modified by an update operation, instead of the entire updated document.

Set fullDocument to "updateLookup" to direct db.watch() to look up the most current majority-committed version of the updated document. db.watch() returns a fullDocument field with the document lookup in addition to the updateDescription delta.

Optional. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.

Has the same functionality as cursor.batchSize().

Optional. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.

Defaults to 1000 milliseconds.

Optional. The starting point for the change stream. If the specified starting point is in the past, it must be in the time range of the oplog. To check the time range of the oplog, see rs.printReplicationInfo().

startAtOperationTime is mutually exclusive with resumeAfter.

SEE ALSO

db.collection.watch() and Mongo.watch()

To generate a new ObjectId, use ObjectId() with no argument:

copy
copied

In this example, the value of x would be:

copy
copied

To generate a new ObjectId using ObjectId() with a unique hexadecimal string:

copy
copied

In this example, the value of y would be:

copy
copied

Access the str attribute of an ObjectId() object, as follows:

copy
copied

This operation will return the following hexadecimal string:

copy
copied

Create a new ObjectID instance

class ObjectID()Arguments:id (string) – Can be a 24 byte hex string, 12 byte binary string or a Number.Returns:object instance of ObjectID.

Return the ObjectID id as a 24 byte hex string representation

toHexString()Returns:string return the 24 byte hex string representation.

Examples

Generate a 24 character hex string representation of the ObjectID

Error mapping file into docker container

Specifically, for Docker on Windows, Virtualbox version, in conjunction with WSL. When I run ‘docker-compose up’, I get this error

ERROR: for 125f20ccead0_lempr_web_1 Cannot start service web: OCI runtime create failed: container_linux.go:348: starting container process caused “process_linux.go:402: container init caused \”rootfs_linux.go:58: mounting \\\”/d/Code/lempr/web/nginx.conf\\\” to rootfs \\\”/mnt/sda1/var/lib/docker/aufs/mnt/065f6aacb3ee01072637ae0544f50a9b772df662ea8e7e3dc2d663d87d7e1a4f\\\” at \\\”/mnt/sda1/var/lib/docker/aufs/mnt/065f6aacb3ee01072637ae0544f50a9b772df662ea8e7e3dc2d663d87d7e1a4f/etc/nginx/nginx.conf\\\” caused \\\”not a directory\\\”\””: unknown: Are you trying to mount a directory onto a file (or vice-versa)? Check if the specified host path exists and is the expected type

I checked to make sure nginx.conf exist both outside and inside the container.

Then I tried to map the file to a different location, it gets created as a directory inside the container!

I tried various suggestions from the internet:

  • Restart the host machine
  • Map /mnt/c to /c
  • Share the /c drive to the docker-machine VM inside virtual box
    • To do so, open virtualbox
    • Select the ‘default’ machine
    • Go to settings
    • Go to ‘shared folders’
    • Click add
    • Add C and D as shares
    • Attach a console to the machine to make sure the share works

To no avail. Meanwhile the same docker-compose works just fine on my Mac.

So I concluded WSL mapping and docker for Windows’ mapping can’t handle this use case at all and I should switch to a *nix OS for the task.

Windows seems to be unable to handle this setup

Docker-Machine (in Virtualbox) -> Windows -> WSL

It seems there’s a problem with mapping paths from the docker-compose VM to Windows, not a problem with WSL, since trying within Docker’s MINGW command line yields the same result as WSL’s.

 

How to set up squid

On Ubuntu

Basic squid conf

/etc/squid3/squid.conf instead of the super bloated default config file

auth_param basic program /usr/lib/squid3/basic_ncsa_auth /etc/squid3/passwords
auth_param basic realm proxy
acl authenticated proxy_auth REQUIRED
http_access allow authenticated

# Choose the port you want. Below we set it to default 3128.
http_port 3128

Setting up a user

sudo htpasswd -c /etc/squid3/passwords username_you_like

and enter a password twice for the chosen username then

sudo service squid3 restart

How to install tinyproxy, the comprehensive guide

You should build from source, since the latest version on Ubuntu repository doesn’t support authentication yet

git clone https://github.com/tinyproxy/tinyproxy.git
sudo apt-get install automake cmake asciidoc 

cd tinyproxy
./autogen.sh
make && make install

Add authentication

vi /etc/tinyproxy.conf
-----
BasicAuth user password
Allow your.local.ip.address
-----
sudo /etc/init.d/tinyproxy restart

How to install SOCKS5 on your VPS

Focusing on Dante, as I find it to be the easiest to install. First thing first, assuming you are using Ubuntu, do the usual

apt-get update && apt-get upgrade

Installing Dante from source on Ubuntu 16.04

The best way to install Dante is to use its source package for the latest available version which is currently version 1.4.2 . You can download the latest version from this Link to Dante Download Page. Copy the source link and download the Dante package using below ‘wget’ command.

# cd /usr/src
# wget http://www.inet.no/dante/files/dante-1.4.2.tar.gz

Once the package has been downloaded, then extract it within the current directory using below command.

# tar -zxf dante-1.4.2.tar.gz

Change directory to the extracted folder to compile and install the package.

# cd dante-1.4.2/

Make sure that you have 'gcc' and 'make' utilities installed on your system prior to compile and installation of Dante package. You can use below command to install.

# apt-get install gcc make

Now let’s run the below command to compile the source with required prefix as shown.

./configure --prefix=/usr --sysconfdir=/etc --localstatedir=/var --disable-client --without-libwrap --without-bsdauth --without-gssapi --without-krb5 --without-upnp --without-pam

At the end of the compilation process, you will get its configuration status as shown below.

 Configure status:

Client:            Disabled, using --disable-client
Server:            Enabled
Preloading:        Enabled
Libwrap:           Disabled, using --without-libwrap
BSD Auth:          Disabled, using --without-bsdauth
PAM:               Disabled, using --without-pam
GSSAPI:            Not found/disabled
KRB5:              Not found/disabled
SASL:              Not found/disabled
UPNP:              Not found/disabled
Compatability:     issetugid setproctitle strlcpy strvis

                     Modules:

redirect:          Not found
bandwidth:         Not found
ldap:              Not found

After that run the following ‘make’ command to install the compiled packages.

# make && make install

You can check the installed version of Dante using below command.

# /usr/sbin/sockd -v
Dante v1.4.2.  Copyright (c) 1997 - 2014 Inferno Nettverk A/S, Norway

Configuring Dante-server service script

Now we are going to create the configuration file for the dante-server’s start/stop script. To do so create a new file in ‘/etc/init.d/’ directory and place the following contents in it using your command line editor.

# vim /etc/init.d/sockd

#! /bin/sh
### BEGIN INIT INFO
# Provides:          sockd
# Required-Start:    $remote_fs $syslog
# Required-Stop:     $remote_fs $syslog
# Default-Start:     2 3 4 5
# Default-Stop:      0 1 6
# Short-Description: Start the dante SOCKS server.
# Description:       SOCKS (v4 and v5) proxy server daemon (sockd).
#                    This server allows clients to connect to it and
#                    request proxying of TCP or UDP network traffic
#                    with extensive configuration possibilities.
### END INIT INFO
#
# dante SOCKS server init.d file. Based on /etc/init.d/skeleton:
# Version:  @(#)skeleton  1.8  03-Mar-1998  miquels@cistron.nl 
# Via: https://gitorious.org/dante/pkg-debian

PATH=/sbin:/usr/sbin:/bin:/usr/bin
NAME=sockd
DAEMON=/usr/sbin/$NAME
DAEMON_ARGS="-D"
PIDFILE=/var/run/$NAME.pid
SCRIPTNAME=/etc/init.d/$NAME
DESC="Dante SOCKS daemon"
CONFFILE=/etc/$NAME.conf

# Exit if the package is not installed
[ -x "$DAEMON" ] || exit 0

# Load the VERBOSE setting and other rcS variables
. /lib/init/vars.sh

# Define LSB log_* functions.
# Depend on lsb-base (>= 3.2-14) to ensure that this file is present
# and status_of_proc is working.
. /lib/lsb/init-functions

set -e

# This function makes sure that the Dante server can write to the pid-file.
touch_pidfile ()
{
  if [ -r $CONFFILE ]; then
    uid="`sed -n -e 's/[[:space:]]//g' -e 's/#.*//' -e '/^user\.privileged/{s/[^:]*://p;q;}' $CONFFILE`"
    if [ -n "$uid" ]; then
      touch $PIDFILE
      chown $uid $PIDFILE
    fi
  fi
}

case "$1" in
  start)
    if ! egrep -cve '^ *(#|$)' \
        -e '^(logoutput|user\.((not)?privileged|libwrap)):' \
        $CONFFILE > /dev/null
    then
        echo "Not starting $DESC: not configured."
        exit 0
    fi
    echo -n "Starting $DESC: "
    touch_pidfile
    start-stop-daemon --start --quiet --pidfile $PIDFILE --exec $DAEMON --test > /dev/null \
        || return 1
    start-stop-daemon --start --quiet --pidfile $PIDFILE --exec $DAEMON -- \
        $DAEMON_ARGS \
        || return 2
    echo "$NAME."
    ;;
  stop)
    echo -n "Stopping $DESC: "
    start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE --name $NAME
    RETVAL="$?"
    [ "$RETVAL" = 2 ] && return 2
    start-stop-daemon --stop --quiet --oknodo --retry=0/30/KILL/5 --exec $DAEMON
    [ "$?" = 2 ] && return 2
    echo "$NAME."
    ;;
  reload|force-reload)
    #
    #   If the daemon can reload its config files on the fly
    #   for example by sending it SIGHUP, do it here.
    #
    #   Make this a do-nothing entry, if the daemon responds to changes in its config file
    #   directly anyway.
    #
     echo "Reloading $DESC configuration files."
     start-stop-daemon --stop --signal 1 --quiet --pidfile \
        $PIDFILE --exec $DAEMON -- -D
  ;;
  restart)
    #
    #   If the "reload" option is implemented, move the "force-reload"
    #   option to the "reload" entry above. If not, "force-reload" is
    #   just the same as "restart".
    #
    echo -n "Restarting $DESC: "
    start-stop-daemon --stop --quiet --pidfile $PIDFILE --exec $DAEMON
    sleep 1
    touch_pidfile
    start-stop-daemon --start --quiet --pidfile $PIDFILE \
      --exec $DAEMON -- -D
    echo "$NAME."
    ;;
  status)
    status_of_proc "$DAEMON" "$NAME" && exit 0 || exit $?
    ;;
  *)
    N=/etc/init.d/$NAME
    # echo "Usage: $N {start|stop|restart|reload|force-reload}" >&2
    echo "Usage: $N {start|stop|restart|status|force-reload}" >&2
    exit 1
    ;;
esac

exit 0

Save and close the file using 'wq!' and give it execution permissions and update thh startup script using below commands.

# chmod +x /etc/init.d/sockd
# update-rc.d sockd defaults

Dante-server Sockd Configuration

First we start the global server settings before going to configure the advance settings of access rules. There are two types of rules that have to be defined in its configuration file. Client rules are mainly designed to specify which client is granted access to which socks server. In the socks rule section the actual application request is evaluated. By adding port numbers or rages access to the specific hosts or networks can be limited.

Let’s create the sockd configuration by placing the following parameters into the 'sockd.conf' file.

# vim /etc/sockd.conf
logoutput: /var/log/socks.log

internal: ens160 port = 1080
external: ens160

method: username
user.privileged: root
user.notprivileged: nobody

client pass {
        from: 0.0.0.0/0 to: 0.0.0.0/0
        log: error connect disconnect
}


client block {
        from: 0.0.0.0/0 to: 0.0.0.0/0
        log: connect error
}

pass {
        from: 0.0.0.0/0 to: 0.0.0.0/0
        log: error connect disconnect
}

block {
        from: 0.0.0.0/0 to: 0.0.0.0/0
        log: connect error
}

Starting Dante-server service

Once you have configured the danted-server sockd configuration file, then start its service and check the status its started without any error.

# /etc/init.d/sockd start
# /etc/init.d/sockd status

# /etc/init.d/sockd status


Use below command to check its listening state of port ‘1080’.

# netstat -tulp
tcp        0      0 k-vm:socks              *:*                     LISTEN      70839/sockd

In case you are unable to start your sockd service, then check your logs from ‘/var/log/socks.log’ file and modify 'sockd.conf' file.

To stop 'sockd' service you can kill its process number or use below command.

# /etc/init.d/sockd stop
[ ok ] Stopping sockd (via systemctl): sockd.service.

Create user for Dante

Dante uses unix authentication and password will be sent via clear text, so it’s best to have a dedicated user for it

useradd -M proxyuser # create user without home
usermod -L proxyuser # disable login
passwd dante # chage password

Try the proxy

Do this on your client machine to make sure everything works

curl --max-time 5 -x socks4://proxyuser:passssssword@ip:port https://api.ipify.org\?format\=json && echo