How to write a C MQTT client using Mosquitto

Introduction

How to write a C MQTT client using Mosquitto The 2018 version, based upon this excellent post by Kevin Boone:

Writing an MQTT client C for ActiveMQ from the ground up

The article above is a good and easy starting point, but it hasn’t been updated for 2 years so when you run it with the latest version of Mosquitto, it doesn’t work – and it’s a bit hacky (using “sleep” to avoid a concurrency problem).

So I analyzed the latest mosquitto_pub code from mosquitto repository itself to see how it’s working, and this article is the result.

What’s changed

  • There’s a queue inside mosquitto, `mosquitto_loop` must be called for it to be processed. Alternatively, you can also use the `mosquitto_loop_start` and `mosquitto_loop_stop`
  • I added asynchronous (callback) processing to wait for calls to complete, instead of the ole’ sleep function
  • It’s 2018! Everyone is adopting HTTPS. Accordingly, your MQTT traffic shouldn’t be left bare for all to see! Let’s use TLS to encrypt the traffic

The code

How to use callback

I want to publish just once message, so my flow is the following

  • On connect complete -> publish a message
  • On publish complete -> start to disconnect
  • On disconnect complete -> exit the loop and return control to the main thread. If you don’t wait for this, data may not even get sent!

To do this, I set up 3 “hooks” (callback function), like this

mosquitto_connect_callback_set(mosq, my_connect_callback);

mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);

mosquitto_publish_callback_set(mosq, my_publish_callback);
And then write the callback functions to execute my flow
void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
{
    int rc = MOSQ_ERR_SUCCESS;
    if(!result){
        printf("Sending message...\n");
        rc = mosquitto_publish(mosq, &mid_sent, MQTT_TOPIC, strlen(text), text, qos, retain);
        if(rc){
            switch(rc){
                case MOSQ_ERR_INVAL:
                    fprintf(stderr, "Error: Invalid input. Does your topic contain '+' or '#'?\n");
                    break;
                case MOSQ_ERR_NOMEM:
                    fprintf(stderr, "Error: Out of memory when trying to publish message.\n");
                    break;
                case MOSQ_ERR_NO_CONN:
                    fprintf(stderr, "Error: Client not connected when trying to publish.\n");
                    break;
                case MOSQ_ERR_PROTOCOL:
                    fprintf(stderr, "Error: Protocol error when communicating with broker.\n");
                    break;
                case MOSQ_ERR_PAYLOAD_SIZE:
                    fprintf(stderr, "Error: Message payload is too large.\n");
                    break;
            }
            mosquitto_disconnect(mosq);
        }
    } else {
        if(result){
            fprintf(stderr, "%s\n", mosquitto_connack_string(result));
        }
    }
}

void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{
    printf("Disconnected!\n");
    connected = false;
}

void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{
    printf("Published!\n");
    if(disconnect_sent == false){
        mosquitto_disconnect(mosq);
        disconnect_sent = true;
    }
}

How to process the queue with mosquitto_loop

In the main trunk of your code, do this

int rc;
do {
//network동작 끝나기 전에 모스키토 동작을 막기위해 잠깐 딜레이가 필요
  rc = mosquitto_loop(mosq, -1, 1);
} while (rc == MOSQ_ERR_SUCCESS && connected);

How to add TLS to the connection process

Before connecting, set TLS options with mosquitto_tls_set

mosquitto_username_pw_set(mosq, MQTT_USERNAME, MQTT_PASSWORD);
mosquitto_tls_set(mosq, "ca-cert.pem", NULL, NULL, NULL, NULL);
int ret = mosquitto_connect(mosq, MQTT_HOSTNAME, MQTT_PORT, 0);

Complete publish – subscribe sample

Available at https://github.com/thanhphu/mosquitto-sample. Happy cloning!

 

Find and block unsafe content on your wordpress installation

For some reason, my WP installation decided to load some files via http instead of https

To find out which file it is, I used the Javascript console

It’s wp-emoji in this case, and I don’t use emojis on my site, so I decided to block it by adding this to my theme’s functions.php

 

remove_action( 'wp_head', 'print_emoji_detection_script', 7 );
remove_action( 'wp_print_styles', 'print_emoji_styles' );

 

How to turn on the quit confirmation prompt in Chrome

Recently Chrome 69 started to disrespect the keyboard mapping on my Mac, which result in the Cut key becomes the Quit key! I searched for the option to confirm before quitting but the results are all outdated… Unbelievable! So here’s how:

On Linux and Windows

Go to chrome://flags

Search for Warn before quitting

On Mac

Chrome Menu / Warn Before Quitting

Self-note: Resume / recover a MongoDB change stream

Introduction

  • Sometimes replicator needs to be restarted
  • We cannot afford to lose one or two entries in time-series since it would throw the statistics off, and in exceptional cases, lost the max and min value

Resume & recover 

Change streams are resumable by specifying a resumeAfter token when opening the cursor. For the resumeAfter token, use the _id value of the change stream event document. Passing the _id value to the change stream attempts to resume notifications starting after the specified operation.

IMPORTANT

In the example below, resumeToken contains the change stream notification id. The resumeAfter takes a parameter that must resolve to a resume token. Passing the resumeToken to the resumeAfter modifier directs the change stream to attempt to resume notifications starting after the operation specified.

copy
copied

If the featureCompatibilityVersion (fcv) is set to "4.0" or greater, newly opened change streams return a hex-encoded string for the resume token data, i.e. the _id._data value. This change allows for the ability to compare and sort the resume tokens. If the fcv is 3.6, newly opened change streams return a BinData for the resume token data.

IMPORTANT

The fcv value at the time of the cursor’s opening determine the resume token data type. That is, the modification of the fcv does not affect the resume tokens for change streams already opened before the fcv change.

Regardless of the fcv value, a 4.0 replica set or a sharded cluster can resume a change stream using either the BinData or string resume token.

As such, a 4.0 deployment can use a resume token from a change stream opened on a collection from a 3.6 deployment.

Implementation

Structure of a Mongo change event

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "documentKey" : { "_id" : <ObjectId> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}

Pay attention to _id

Metadata related to the operation.

Use this document as a resumeToken for the resumeAfter parameter when resuming a change stream.

If the featureCompatibilityVersion (fcv) is set to "4.0" or greater, newly opened change streams return a hex-encoded string for the resume token data, i.e. the _id._data value. This change allows for the ability to compare and sort the resume tokens. If the fcv is 3.6, newly opened change streams return a BinData for the resume token data.

IMPORTANT

The fcv value at the time of the cursor’s opening determine the resume token data type. That is, the modification of the fcv does not affect the resume tokens for change streams already opened before the fcv change.

Regardless of the fcv value, a 4.0 replica set or a sharded cluster can resume a change stream using either the BinData or string resume token.

This field is BSON so it

  • Can’t be saved with JSON.stringify
  • Can’t be cast to ObjectID (wrong format, different than MongoDB documentation )

Solution: Include bson

const BSON = require(‘bson’);

function saveId(cb) {
if (lastId) {
let lastIdBuffer = bson.serialize(lastId);
fs.writeFile(ID_FILE, lastIdBuffer, (err) => {
if (err) {
logger.error(‘[saveId]’, err);
}
return cb && cb(err);
});
}
}



function loadId(cb) {
fs.readFile(ID_FILE, (err, data) => {
let buffer;
if (!err && data) {
buffer = bson.deserialize(data);
}
return cb && cb(err, buffer);
});
}

  • Every one second, write the latest _id to disk
  • Reload the _id object from disk on startup
  • Use it to resume the change stream with

const pipeline = [
{
$match: { ‘ns.db’: config.MONGO_DB_NAME },
}
];

let changeStreamOptions = {};

changeStreamOptions[‘resumeAfter’] = resumeToken;

const changeStream = db.watch(pipeline, changeStreamOptions);
changeStream.on(‘change’, (change) => {

}

Reference

db.watch(pipeline, options)

New in version 4.0: Requires featureCompatibilityVersion (fCV) set to "4.0" or greater. For more information on fCV, see setFeatureCompatibilityVersion.

Opens a change stream cursor for a database to report on all its non-system collections.

A sequence of one or more of the following aggregation stages:

See Aggregation for complete documentation on the aggregation framework.

Optional. Additional options that modify the behavior of db.watch().

You must pass an empty array [] to the pipeline parameter if you are not specifying a pipeline but are passing the options document.

The options document can contain the following fields and values:

Optional. Directs db.watch() to attempt resuming notifications starting after the operation specified in the resume token.

Each change stream event document includes a resume token as the _idfield. Pass the entire _id field of the change event document that represents the operation you want to resume after.

resumeAfter is mutually exclusive with startAtOperationTime.

Optional. By default, db.watch() returns the delta of those fields modified by an update operation, instead of the entire updated document.

Set fullDocument to "updateLookup" to direct db.watch() to look up the most current majority-committed version of the updated document. db.watch() returns a fullDocument field with the document lookup in addition to the updateDescription delta.

Optional. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.

Has the same functionality as cursor.batchSize().

Optional. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.

Defaults to 1000 milliseconds.

Optional. The starting point for the change stream. If the specified starting point is in the past, it must be in the time range of the oplog. To check the time range of the oplog, see rs.printReplicationInfo().

startAtOperationTime is mutually exclusive with resumeAfter.

SEE ALSO

db.collection.watch() and Mongo.watch()

To generate a new ObjectId, use ObjectId() with no argument:

copy
copied

In this example, the value of x would be:

copy
copied

To generate a new ObjectId using ObjectId() with a unique hexadecimal string:

copy
copied

In this example, the value of y would be:

copy
copied

Access the str attribute of an ObjectId() object, as follows:

copy
copied

This operation will return the following hexadecimal string:

copy
copied

Create a new ObjectID instance

class ObjectID()Arguments:id (string) – Can be a 24 byte hex string, 12 byte binary string or a Number.Returns:object instance of ObjectID.

Return the ObjectID id as a 24 byte hex string representation

toHexString()Returns:string return the 24 byte hex string representation.

Examples

Generate a 24 character hex string representation of the ObjectID