How to write a C MQTT client using Mosquitto

Introduction

How to write a C MQTT client using Mosquitto The 2018 version, based upon this excellent post by Kevin Boone:

Writing an MQTT client C for ActiveMQ from the ground up

The article above is a good and easy starting point, but it hasn’t been updated for 2 years so when you run it with the latest version of Mosquitto, it doesn’t work – and it’s a bit hacky (using “sleep” to avoid a concurrency problem).

So I analyzed the latest mosquitto_pub code from mosquitto repository itself to see how it’s working, and this article is the result.

What’s changed

  • There’s a queue inside mosquitto, `mosquitto_loop` must be called for it to be processed. Alternatively, you can also use the `mosquitto_loop_start` and `mosquitto_loop_stop`
  • I added asynchronous (callback) processing to wait for calls to complete, instead of the ole’ sleep function
  • It’s 2018! Everyone is adopting HTTPS. Accordingly, your MQTT traffic shouldn’t be left bare for all to see! Let’s use TLS to encrypt the traffic

The code

How to use callback

I want to publish just once message, so my flow is the following

  • On connect complete -> publish a message
  • On publish complete -> start to disconnect
  • On disconnect complete -> exit the loop and return control to the main thread. If you don’t wait for this, data may not even get sent!

To do this, I set up 3 “hooks” (callback function), like this

mosquitto_connect_callback_set(mosq, my_connect_callback);

mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);

mosquitto_publish_callback_set(mosq, my_publish_callback);
And then write the callback functions to execute my flow
void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
{
    int rc = MOSQ_ERR_SUCCESS;
    if(!result){
        printf("Sending message...\n");
        rc = mosquitto_publish(mosq, &mid_sent, MQTT_TOPIC, strlen(text), text, qos, retain);
        if(rc){
            switch(rc){
                case MOSQ_ERR_INVAL:
                    fprintf(stderr, "Error: Invalid input. Does your topic contain '+' or '#'?\n");
                    break;
                case MOSQ_ERR_NOMEM:
                    fprintf(stderr, "Error: Out of memory when trying to publish message.\n");
                    break;
                case MOSQ_ERR_NO_CONN:
                    fprintf(stderr, "Error: Client not connected when trying to publish.\n");
                    break;
                case MOSQ_ERR_PROTOCOL:
                    fprintf(stderr, "Error: Protocol error when communicating with broker.\n");
                    break;
                case MOSQ_ERR_PAYLOAD_SIZE:
                    fprintf(stderr, "Error: Message payload is too large.\n");
                    break;
            }
            mosquitto_disconnect(mosq);
        }
    } else {
        if(result){
            fprintf(stderr, "%s\n", mosquitto_connack_string(result));
        }
    }
}

void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{
    printf("Disconnected!\n");
    connected = false;
}

void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{
    printf("Published!\n");
    if(disconnect_sent == false){
        mosquitto_disconnect(mosq);
        disconnect_sent = true;
    }
}

How to process the queue with mosquitto_loop

In the main trunk of your code, do this

int rc;
do {
//network동작 끝나기 전에 모스키토 동작을 막기위해 잠깐 딜레이가 필요
  rc = mosquitto_loop(mosq, -1, 1);
} while (rc == MOSQ_ERR_SUCCESS && connected);

How to add TLS to the connection process

Before connecting, set TLS options with mosquitto_tls_set

mosquitto_username_pw_set(mosq, MQTT_USERNAME, MQTT_PASSWORD);
mosquitto_tls_set(mosq, "ca-cert.pem", NULL, NULL, NULL, NULL);
int ret = mosquitto_connect(mosq, MQTT_HOSTNAME, MQTT_PORT, 0);

Complete publish – subscribe sample

Available at https://github.com/thanhphu/mosquitto-sample. Happy cloning!

 

Leave a Reply

Your email address will not be published. Required fields are marked *