Utiliser le protocole MQTT avec ThingsBoard

L’ensemble des sketchs de notre projet peuvent être téléchargés ici.

Notre sketch est prêt pour l’amélioration suivante qui lui permettra d’envoyer des trames à un serveur MQTT intégré à l’environnement ThingsBoard. La station deviendra un objet connecté et nous pourrons consulter l’historique de ses mesures depuis le navigateur de n’importe quel terminal (smartphone, PC, MAC…) connecté à Internet.

Si vous avez une bonne affinité avec l’informatique, vous pourrez installer vous-même « Thingsboard Community » (qui est open source) sur un ordinateur-serveur, connecté à Internet via un DNS. Vous trouverez un ensemble de tutoriels pour ce faire sur ce site.

Pour une mise en œuvre presque immédiate vous pourrez autrement créer gratuitement un compte de démo chez ThingsBoard et y configurer un tableau de bord pour afficher vos mesures dans une jolie interface graphique. Le site « Steve’s Internet Guide » explique très bien comment faire.

C’est cette deuxième approche que nous retenons pour ce tutoriel. Suivez ces instructions1 et vous obtiendrez un jeton d’accès (en anglais « access token ») depuis le site Internet de ThingsBoard. Une fois ce jeton intégré au sketch de notre station il l’identifiera de façon unique dans l’infrastructure ThingsBoard et l’autorisera à s’y connecter et à y poster ses mesures.

La figure 2.1 montre un tableau de bord réalisé avec ThingsBoard qui affiche les données postées par notre station météo. Il ne m’a fallu que quelques minutes pour le construire, ce qui est assez enthousiasmant lorsqu’on mesure combien de travail nécessiterait la création d’une interface graphique de cette qualité sur la station elle-même !


tableau de bord ThingsBoard


Le sketch « thingsboard_posting.ino » contient les modifications apportées à « time_management.ino » pour la publication sur ThingsBoard. Il utilise la bibliothèque Arduino PubSubClient qui contient toutes les classes nécessaires à l’implémentation d’un client MQTT.

Dans les déclarations globales nous avons apporté les modifications suivantes :

  • Nous avons renommé la directive RECORD_PAYLOAD_SIZE en MQTT_PAYLOAD_SIZE ;
  • Nous avons renommé le tableau Record_Payload en MQTT_Payload ;
  • Nous avons ajouté les déclarations relatives au protocole MQTT et à ThingsBoard :
0132 // Déclarations pour le protocole MQTT
0133 #include <WiFiEspClient.h>
0134 #include <PubSubClient.h>
0135 
0136 WiFiEspClient espClient;
0137 PubSubClient MQTT_Client(espClient); // Pointeur vers la classe PubSubClient
0138 
0139 #define MQTT_PAYLOAD_SIZE (165)
0140 #define MQTT_TOKEN_SIZE (21)
0141 #define MQTT_KEEP_ALIVE (30)
0142 #define MQTT_SOCK_TOUT (30)
0143 #define MQTT_RECONNECT_TOUT (10)
0144 #define MQTT_DEVICE "Station_Meteo"
0145 #define MQTT_PUB_TOPIC "v1/devices/me/telemetry"
0146 #define MQTT_PORT (1883)
0147 #define MQTT_PAYLOAD_SIZE (165)
0148 #define TB_SERVER "demo.thingsboard.io" // URL du serveur Thingsboard
0149 #define MQTT_TOKEN "my_access_token" // Jeton d'accès Thingsboard
0150 
0151 char MQTTtoken[] = MQTT_TOKEN;
0152 char TBServer[] = TB_SERVER;
0153 char MQTT_Payload[MQTT_PAYLOAD_SIZE] = {0};

Dans la fonction setup nous avons apporté les modifications suivantes :

  • Suppression de l’appel à LogFile_Delete qui n’a plus lieu d’être compte tenu des autres modifications que nous allons apporter au sketch ;
  • Appel à la fonction de paramétrage / initialisation du service MQTT (ligne 204) :
0203   // Initialise le service MQTT
0204   Initialize_MQTT(&Serial);

Le code de la fonction Initialize_MQTT est directement tiré des exemples de PubSubClient :

0847 void Initialize_MQTT(HardwareSerial *serial) {
0848 
0849   // Définition du serveur et du port
0850   MQTT_Client.setServer(TBServer, MQTT_PORT);
0851 
0852   // Autres paramétrages (d'après les exemples de la bibliothèque PubSubClient)
0853   MQTT_Client.setKeepAlive(MQTT_KEEP_ALIVE);
0854   MQTT_Client.setSocketTimeout(MQTT_SOCK_TOUT);
0855 
0856   serial->println("MQTT server set to " + (String)TBServer + ":" + String(MQTT_PORT)) ;
0857   serial->println("MQTT publishing on topic " + (String)MQTT_PUB_TOPIC);
0858 }

Pour une meilleure vue d’ensemble des modifications cumulées jusqu’ici, le code de la fonction loop est reproduit intégralement ci-après. Les lignes ajoutées ou modifiées sont : 230, 231, 250 à 257, 273.

0218 void loop() {
0219 
0220   if (rtc_mod == SET_RTC_MOD) { // Toutes les SET_RTC_MOD itérations...
0221 
0222     rtc_mod = 0;
0223 
0224     // Re-connexion du module Wi-Fi au réseau spécifié
0225     Connect_WiFi(&Serial);
0226 
0227     // Acquisition NTP de la date et de l'heure de la mesure, recalage de la RTC
0228     Set_RTC_Time(&Serial);
0229 
0230     // Poste à nouveau les trames MQTT éventuellement sauvegardées sur la carte SD
0231     Post_SD_Records(logfilename, &Serial, MQTT_Payload);
0232 
0233     // Ré-arme l'IDWG
0234     IWatchdog.reload();
0235 
0236   } // Clôture de if (rtc_mod == SET_RTC_MOD)
0237 
0238   rtc_mod++;
0239 
0240   if (rec_mod == SET_REC_MOD) { // Toutes les SET_REC_MOD itérations...
0241 
0242     rec_mod = 0;
0243 
0244     // Allume la LED utilisateur
0245     digitalWrite(LED_BUILTIN, HIGH);
0246 
0247     // Lecture des capteurs environnementaux
0248     Read_Sensors(&Serial);
0249 
0250     // Construit la trame MQTT (JSON)
0251     Build_Payload(&Serial, MQTT_Payload);
0252 
0253     // Tente de publier la trame MQTT. Si la publication échoue, écrit la trame sur la 
0254     // carte SD pour sa republication ultérieure.
0255     if (!Post_Payload(&Serial, MQTT_Payload)) {
0256       Serial.println("MQTT server did not respond : payload stored in SD card");
0257       SD_Write_Payload(logfilename, MQTT_Payload);
0258     }
0259 
0260     // Eteint la LED utilisateur
0261     digitalWrite(LED_BUILTIN, LOW);
0262 
0263   } // Clôture de if (rec_mod == SET_REC_MOD)
0264 
0265   rec_mod++;
0266 
0267   if (sd_card_listing) { // Liste des enregistrements sur la carte SD
0268     LogFile_List(logfilename, &Serial);
0269     sd_card_listing = false;
0270   }
0271 
0272   // Entretient la connexion MQTT et gère d'éventuels callbacks.
0273   MQTT_Client.loop();
0274 
0275   // Ré-arme l'IDWG
0276   IWatchdog.reload();
0277 
0278   // Mise en veille (mode "STOP") avant l'itération suivante
0279   delay(5);
0280   LowPower.deepSleep(main_loop_delay);
0281 }

La ligne 251 appelle la fonction Build_Payload qui construit la trame MQTT dans le respect du format JSON imposé par l’API ThingsBoard. Nous utilisons un format de trame (voir plus loin) préfixé avec la date UTC de la mesure, pour que le serveur MQTT puisse par la suite trier les trames dans l’ordre de leur création plutôt que celui de leur réception. Cette précaution nous est imposée par la fonction Post_SD_records, qui enverra occasionnellement des trames ayant été mémorisées sur la carte SD (voir plus loin).

La ligne 255 appelle la fonction Post_Payload qui va publier la trame de mesures contenue dans MQTT_Payload sur le serveur MQTT de ThingsBoard, dans le topic MQTT_PUB_TOPIC (ligne 145).

Les lignes 231 et 257 ajoutent des appels aux fonctions Post_SD_Records et SD_Write_Payload qui donnent une réelle plus-value à la carte SD. Chaque fois que la boucle principale échouera à poster une trame chez ThingsBoard, elle l’écrira sur la carte SD avec SD_Write_Payload.

Dès la prochaine validation du test de la ligne 220, la fonction Post_SD_records lira toutes les trames enregistrées précédemment sur la carte SD, essaiera de les envoyer à nouveau chez ThingsBoard puis effacera finalement le fichier de logs et redémarrera la station par un software reset.

La ligne 273 maintient la connexion avec le serveur MQTT et réceptionne d’éventuels messages provenant d’un topic auquel notre station aurait éventuellement souscrit. Nous n’avons pas utilisé cette possibilité intéressante qui permettrait, par exemple, de piloter la station depuis une application Android sur téléphone comme l’explique ce tutoriel de l’excellent blog d’Eskimon.

Les codes source des fonctions SD_Write_Payload, Build_Payload, Post_Payload, Post_SD_Records sont donnés ci-après. La fonction MQTT_Reconnect, appelée par Post_Payload, s’ajoute à la liste.

0399 /*------------------------------------------------------------------------------*/
0400 /* Enregistrement de la trame MQTT sur la carte SD, dans le fichier logfilename */
0401 /*------------------------------------------------------------------------------*/
0402 void SD_Write_Payload(char* logfilename, char* MQTT_Payload) {
0403   logfile.println(MQTT_Payload); // Ecrit les mesures sur une ligne
0404   logfile.flush(); // Force l'écriture physique sur la carte SD
0405 }
0860 /*----------------------------------------------------------------------------*/
0861 /* Construction de la trame MQTT (format JSON)                                */
0862 /*----------------------------------------------------------------------------*/
0863 void Build_Payload(HardwareSerial *serial, char* MQTT_Payload) {
0864 
0865   // Ecris la température dans la chaîne de caractères bufTemp avec une décimale
0866   char bufTemp[6];
0867   dtostrf(measure.temperature, 0, 1, bufTemp);
0868 
0869   // Date et heure universelles au format UNIX (Epoch)
0870   uint32_t UTC_time = rtc.getEpoch();
0871   char bufEpoch[10];
0872 
0873   // Ecris l'UTC au format UNIX dans bufEpoch
0874   ltoa(UTC_time, bufEpoch, 10);
0875 
0876   // Ecris l'UTC au format "20/12/30 10:34:09" dans measure.datetime
0877   Make_Local_TimeStamp(measure.datetime, UTC_time, serial);
0878 
0879   // Construit une trame MQTT conforme à l'API Thingsboard
0880   String payload = "{";
0881   payload += "\"ts\":";
0882   payload += bufEpoch;
0883   payload += "000";
0884   payload += ", ";
0885   payload += "\"values\":";
0886   payload += "{";
0887   payload += "\"Time\":\"";
0888   payload += measure.datetime;
0889   payload += "\", ";
0890   payload += "\"Temperature\":";
0891   payload += bufTemp;
0892   payload += ", ";
0893   payload += "\"Pressure\":";
0894   payload += measure.pressure;
0895   payload += ", ";
0896   payload += "\"Humidity\":";
0897   payload += measure.humidity;
0898   payload += ", ";
0899   payload += "\"CO2\":";
0900   payload += measure.co2;
0901   payload += "}";
0902   payload += "}";
0903 
0904   payload.toCharArray( MQTT_Payload, MQTT_PAYLOAD_SIZE );
0905 
0906   serial->println("MQTT post : " + payload);
0907 }

Voici la trame MQTT affichée par la ligne 906 sur le port série serial :

MQTT post : {"ts":1609506637000, "values":{"Time":"21/01/01 14:10:37", "Temperature":21.4, "Pressure":991, "Humidity":47, "CO2":852}}

0909 /*----------------------------------------------------------------------------*/
0910 /* Publication des mesures sur Thingsboard                                    */
0911 /*----------------------------------------------------------------------------*/
0912 bool Post_Payload(HardwareSerial *serial, char* MQTT_Payload) {
0913 
0914   bool ret;
0915   if (MQTT_Reconnect(serial)) {
0916     if (MQTT_Client.publish(MQTT_PUB_TOPIC, MQTT_Payload)) {
0917       ret = true;
0918     }
0919     else {
0920       ret = false;
0921     }
0922   }
0923   else {
0924     ret = false;
0925   }
0926   return ret;
0927 }
0929 /*----------------------------------------------------------------------------*/
0930 /* Connexion / reconnexion au serveur MQTT                                    */
0931 /*----------------------------------------------------------------------------*/
0932 bool MQTT_Reconnect(HardwareSerial *serial) {
0933 
0934   bool ret;
0935   uint8_t n = 0;
0936 
0937   while (!MQTT_Client.connected()) {
0938 
0939     if (!MQTT_Client.connect(MQTT_DEVICE, MQTTtoken, NULL)) {
0940 
0941       IWatchdog.reload();
0942 
0943       if (n == MQTT_RECONNECT_TOUT + 1) {
0944         ret = false;
0945         break;
0946       }
0947       n++;
0948       delay(DELAY_5S);
0949     }
0950     else {
0951       ret = true;
0952     }
0953   }
0954   return ret;
0955 }
0957 /*----------------------------------------------------------------------------*/
0958 /* Parcours le fichier logfile, poste ses enregistrements sur Thingsboard.    */
0959 /* Efface le fichier une fois l'opération terminée.                           */
0960 /*----------------------------------------------------------------------------*/
0961 void Post_SD_Records(char* logfilename, HardwareSerial *serial, char* MQTT_Payload) {
0962 
0963   if (logfile.size()) {
0964 
0965     // Ferme logfile
0966     if (logfile) logfile.close();
0967     
0968     // Ouvre logfile en lecture seule
0969     logfile = SD.open(logfilename, FILE_READ);
0970 
0971     if (logfile) {
0972       
0973       serial->println("Resending MQTT frames stored in SD card");
0974       
0975       uint32_t n = 0;
0976       String buffer;
0977       
0978       // Parcours logfile ligne par ligne (trames MQTT)
0979       while (logfile.available()) {
0980 
0981         // Charge la trame dans buffer
0982         buffer = logfile.readStringUntil('\n');
0983 
0984         // Place le contenu du buffer dans MQTT_Payload
0985         buffer.toCharArray(MQTT_Payload, MQTT_PAYLOAD_SIZE);
0986 
0987         // Envoi du contenu de MQTT_Payload à Thingsboard
0988         if (!Post_Payload(serial, MQTT_Payload)) {
0989           serial->println("\nMQTT post ERROR : " + buffer);
0990         }
0991         else {
0992           serial->println("\nMQTT post : " + buffer);
0993         }
0994         // Re-chargement de l'IDWG
0995         IWatchdog.reload();
0996 
0997         n++;
0998       }
0999 
1000       if (n) serial->println("Number of MQTT frames sent : " + String(n));
1001 
1002       // Ferme logfile et l'efface
1003       logfile.close();
1004       SD.remove(logfilename);
1005 
1006       // Crée un nouveau fichier logfile (ouverture obligatoire)
1007       logfile = SD.open(logfilename, FILE_WRITE);
1008       
1009       // Ferme immédiatement le nouveau fichier
1010       logfile.close();
1011 
1012       // Software reset, pour ne pas segmenter la mémoire à force d'ouvertures et
1013       // fermetures de fichiers
1014       HAL_NVIC_SystemReset();
1015 
1016     } else {
1017       serial->println("error opening " + (String)logfilename);
1018       while (true);
1019     }
1020   }
1021 }
  1. Ignorez cependant la partie « Configuring The Python Client », dans notre cas le client sera la station météo avec son sketch STM32duino.