Utiliser le protocole MQTT avec ThingsBoard
L’ensemble des sketchs de notre projet peuvent être téléchargés ici.
Notre sketch est prêt pour l’amélioration suivante qui lui permettra d’envoyer des trames à un serveur MQTT intégré à l’environnement ThingsBoard. La station deviendra un objet connecté et nous pourrons consulter l’historique de ses mesures depuis le navigateur de n’importe quel terminal (smartphone, PC, MAC…) connecté à Internet.
Si vous avez une bonne affinité avec l’informatique, vous pourrez installer vous-même « Thingsboard Community » (qui est open source) sur un ordinateur-serveur, connecté à Internet via un DNS. Vous trouverez un ensemble de tutoriels pour ce faire sur ce site.
Pour une mise en œuvre presque immédiate vous pourrez autrement créer gratuitement un compte de démo chez ThingsBoard et y configurer un tableau de bord pour afficher vos mesures dans une jolie interface graphique. Le site « Steve’s Internet Guide » explique très bien comment faire.
C’est cette deuxième approche que nous retenons pour ce tutoriel. Suivez ces instructions1 et vous obtiendrez un jeton d’accès (en anglais « access token ») depuis le site Internet de ThingsBoard. Une fois ce jeton intégré au sketch de notre station il l’identifiera de façon unique dans l’infrastructure ThingsBoard et l’autorisera à s’y connecter et à y poster ses mesures.
La figure 2.1 montre un tableau de bord réalisé avec ThingsBoard qui affiche les données postées par notre station météo. Il ne m’a fallu que quelques minutes pour le construire, ce qui est assez enthousiasmant lorsqu’on mesure combien de travail nécessiterait la création d’une interface graphique de cette qualité sur la station elle-même !
Le sketch « thingsboard_posting.ino » contient les modifications apportées à « time_management.ino » pour la publication sur ThingsBoard. Il utilise la bibliothèque Arduino PubSubClient qui contient toutes les classes nécessaires à l’implémentation d’un client MQTT.
Dans les déclarations globales nous avons apporté les modifications suivantes :
- Nous avons renommé la directive RECORD_PAYLOAD_SIZE en MQTT_PAYLOAD_SIZE ;
- Nous avons renommé le tableau Record_Payload en MQTT_Payload ;
- Nous avons ajouté les déclarations relatives au protocole MQTT et à ThingsBoard :
0132 // Déclarations pour le protocole MQTT
0133 #include <WiFiEspClient.h>
0134 #include <PubSubClient.h>
0135
0136 WiFiEspClient espClient;
0137 PubSubClient MQTT_Client(espClient); // Pointeur vers la classe PubSubClient
0138
0139 #define MQTT_PAYLOAD_SIZE (165)
0140 #define MQTT_TOKEN_SIZE (21)
0141 #define MQTT_KEEP_ALIVE (30)
0142 #define MQTT_SOCK_TOUT (30)
0143 #define MQTT_RECONNECT_TOUT (10)
0144 #define MQTT_DEVICE "Station_Meteo"
0145 #define MQTT_PUB_TOPIC "v1/devices/me/telemetry"
0146 #define MQTT_PORT (1883)
0147 #define MQTT_PAYLOAD_SIZE (165)
0148 #define TB_SERVER "demo.thingsboard.io" // URL du serveur Thingsboard
0149 #define MQTT_TOKEN "my_access_token" // Jeton d'accès Thingsboard
0150
0151 char MQTTtoken[] = MQTT_TOKEN;
0152 char TBServer[] = TB_SERVER;
0153 char MQTT_Payload[MQTT_PAYLOAD_SIZE] = {0};
Dans la fonction setup nous avons apporté les modifications suivantes :
- Suppression de l’appel à LogFile_Delete qui n’a plus lieu d’être compte tenu des autres modifications que nous allons apporter au sketch ;
- Appel à la fonction de paramétrage / initialisation du service MQTT (ligne 204) :
0203 // Initialise le service MQTT
0204 Initialize_MQTT(&Serial);
Le code de la fonction Initialize_MQTT est directement tiré des exemples de PubSubClient :
0847 void Initialize_MQTT(HardwareSerial *serial) {
0848
0849 // Définition du serveur et du port
0850 MQTT_Client.setServer(TBServer, MQTT_PORT);
0851
0852 // Autres paramétrages (d'après les exemples de la bibliothèque PubSubClient)
0853 MQTT_Client.setKeepAlive(MQTT_KEEP_ALIVE);
0854 MQTT_Client.setSocketTimeout(MQTT_SOCK_TOUT);
0855
0856 serial->println("MQTT server set to " + (String)TBServer + ":" + String(MQTT_PORT)) ;
0857 serial->println("MQTT publishing on topic " + (String)MQTT_PUB_TOPIC);
0858 }
Pour une meilleure vue d’ensemble des modifications cumulées jusqu’ici, le code de la fonction loop est reproduit intégralement ci-après. Les lignes ajoutées ou modifiées sont : 230, 231, 250 à 257, 273.
0218 void loop() {
0219
0220 if (rtc_mod == SET_RTC_MOD) { // Toutes les SET_RTC_MOD itérations...
0221
0222 rtc_mod = 0;
0223
0224 // Re-connexion du module Wi-Fi au réseau spécifié
0225 Connect_WiFi(&Serial);
0226
0227 // Acquisition NTP de la date et de l'heure de la mesure, recalage de la RTC
0228 Set_RTC_Time(&Serial);
0229
0230 // Poste à nouveau les trames MQTT éventuellement sauvegardées sur la carte SD
0231 Post_SD_Records(logfilename, &Serial, MQTT_Payload);
0232
0233 // Ré-arme l'IDWG
0234 IWatchdog.reload();
0235
0236 } // Clôture de if (rtc_mod == SET_RTC_MOD)
0237
0238 rtc_mod++;
0239
0240 if (rec_mod == SET_REC_MOD) { // Toutes les SET_REC_MOD itérations...
0241
0242 rec_mod = 0;
0243
0244 // Allume la LED utilisateur
0245 digitalWrite(LED_BUILTIN, HIGH);
0246
0247 // Lecture des capteurs environnementaux
0248 Read_Sensors(&Serial);
0249
0250 // Construit la trame MQTT (JSON)
0251 Build_Payload(&Serial, MQTT_Payload);
0252
0253 // Tente de publier la trame MQTT. Si la publication échoue, écrit la trame sur la
0254 // carte SD pour sa republication ultérieure.
0255 if (!Post_Payload(&Serial, MQTT_Payload)) {
0256 Serial.println("MQTT server did not respond : payload stored in SD card");
0257 SD_Write_Payload(logfilename, MQTT_Payload);
0258 }
0259
0260 // Eteint la LED utilisateur
0261 digitalWrite(LED_BUILTIN, LOW);
0262
0263 } // Clôture de if (rec_mod == SET_REC_MOD)
0264
0265 rec_mod++;
0266
0267 if (sd_card_listing) { // Liste des enregistrements sur la carte SD
0268 LogFile_List(logfilename, &Serial);
0269 sd_card_listing = false;
0270 }
0271
0272 // Entretient la connexion MQTT et gère d'éventuels callbacks.
0273 MQTT_Client.loop();
0274
0275 // Ré-arme l'IDWG
0276 IWatchdog.reload();
0277
0278 // Mise en veille (mode "STOP") avant l'itération suivante
0279 delay(5);
0280 LowPower.deepSleep(main_loop_delay);
0281 }
La ligne 251 appelle la fonction Build_Payload qui construit la trame MQTT dans le respect du format JSON imposé par l’API ThingsBoard. Nous utilisons un format de trame (voir plus loin) préfixé avec la date UTC de la mesure, pour que le serveur MQTT puisse par la suite trier les trames dans l’ordre de leur création plutôt que celui de leur réception. Cette précaution nous est imposée par la fonction Post_SD_records, qui enverra occasionnellement des trames ayant été mémorisées sur la carte SD (voir plus loin).
La ligne 255 appelle la fonction Post_Payload qui va publier la trame de mesures contenue dans MQTT_Payload sur le serveur MQTT de ThingsBoard, dans le topic MQTT_PUB_TOPIC (ligne 145).
Les lignes 231 et 257 ajoutent des appels aux fonctions Post_SD_Records et SD_Write_Payload qui donnent une réelle plus-value à la carte SD. Chaque fois que la boucle principale échouera à poster une trame chez ThingsBoard, elle l’écrira sur la carte SD avec SD_Write_Payload.
Dès la prochaine validation du test de la ligne 220, la fonction Post_SD_records lira toutes les trames enregistrées précédemment sur la carte SD, essaiera de les envoyer à nouveau chez ThingsBoard puis effacera finalement le fichier de logs et redémarrera la station par un software reset.
La ligne 273 maintient la connexion avec le serveur MQTT et réceptionne d’éventuels messages provenant d’un topic auquel notre station aurait éventuellement souscrit. Nous n’avons pas utilisé cette possibilité intéressante qui permettrait, par exemple, de piloter la station depuis une application Android sur téléphone comme l’explique ce tutoriel de l’excellent blog d’Eskimon.
Les codes source des fonctions SD_Write_Payload, Build_Payload, Post_Payload, Post_SD_Records sont donnés ci-après. La fonction MQTT_Reconnect, appelée par Post_Payload, s’ajoute à la liste.
0399 /*------------------------------------------------------------------------------*/
0400 /* Enregistrement de la trame MQTT sur la carte SD, dans le fichier logfilename */
0401 /*------------------------------------------------------------------------------*/
0402 void SD_Write_Payload(char* logfilename, char* MQTT_Payload) {
0403 logfile.println(MQTT_Payload); // Ecrit les mesures sur une ligne
0404 logfile.flush(); // Force l'écriture physique sur la carte SD
0405 }
0860 /*----------------------------------------------------------------------------*/
0861 /* Construction de la trame MQTT (format JSON) */
0862 /*----------------------------------------------------------------------------*/
0863 void Build_Payload(HardwareSerial *serial, char* MQTT_Payload) {
0864
0865 // Ecris la température dans la chaîne de caractères bufTemp avec une décimale
0866 char bufTemp[6];
0867 dtostrf(measure.temperature, 0, 1, bufTemp);
0868
0869 // Date et heure universelles au format UNIX (Epoch)
0870 uint32_t UTC_time = rtc.getEpoch();
0871 char bufEpoch[10];
0872
0873 // Ecris l'UTC au format UNIX dans bufEpoch
0874 ltoa(UTC_time, bufEpoch, 10);
0875
0876 // Ecris l'UTC au format "20/12/30 10:34:09" dans measure.datetime
0877 Make_Local_TimeStamp(measure.datetime, UTC_time, serial);
0878
0879 // Construit une trame MQTT conforme à l'API Thingsboard
0880 String payload = "{";
0881 payload += "\"ts\":";
0882 payload += bufEpoch;
0883 payload += "000";
0884 payload += ", ";
0885 payload += "\"values\":";
0886 payload += "{";
0887 payload += "\"Time\":\"";
0888 payload += measure.datetime;
0889 payload += "\", ";
0890 payload += "\"Temperature\":";
0891 payload += bufTemp;
0892 payload += ", ";
0893 payload += "\"Pressure\":";
0894 payload += measure.pressure;
0895 payload += ", ";
0896 payload += "\"Humidity\":";
0897 payload += measure.humidity;
0898 payload += ", ";
0899 payload += "\"CO2\":";
0900 payload += measure.co2;
0901 payload += "}";
0902 payload += "}";
0903
0904 payload.toCharArray( MQTT_Payload, MQTT_PAYLOAD_SIZE );
0905
0906 serial->println("MQTT post : " + payload);
0907 }
Voici la trame MQTT affichée par la ligne 906 sur le port série serial :
MQTT post : {"ts":1609506637000, "values":{"Time":"21/01/01 14:10:37", "Temperature":21.4, "Pressure":991, "Humidity":47, "CO2":852}}
0909 /*----------------------------------------------------------------------------*/
0910 /* Publication des mesures sur Thingsboard */
0911 /*----------------------------------------------------------------------------*/
0912 bool Post_Payload(HardwareSerial *serial, char* MQTT_Payload) {
0913
0914 bool ret;
0915 if (MQTT_Reconnect(serial)) {
0916 if (MQTT_Client.publish(MQTT_PUB_TOPIC, MQTT_Payload)) {
0917 ret = true;
0918 }
0919 else {
0920 ret = false;
0921 }
0922 }
0923 else {
0924 ret = false;
0925 }
0926 return ret;
0927 }
0929 /*----------------------------------------------------------------------------*/
0930 /* Connexion / reconnexion au serveur MQTT */
0931 /*----------------------------------------------------------------------------*/
0932 bool MQTT_Reconnect(HardwareSerial *serial) {
0933
0934 bool ret;
0935 uint8_t n = 0;
0936
0937 while (!MQTT_Client.connected()) {
0938
0939 if (!MQTT_Client.connect(MQTT_DEVICE, MQTTtoken, NULL)) {
0940
0941 IWatchdog.reload();
0942
0943 if (n == MQTT_RECONNECT_TOUT + 1) {
0944 ret = false;
0945 break;
0946 }
0947 n++;
0948 delay(DELAY_5S);
0949 }
0950 else {
0951 ret = true;
0952 }
0953 }
0954 return ret;
0955 }
0957 /*----------------------------------------------------------------------------*/
0958 /* Parcours le fichier logfile, poste ses enregistrements sur Thingsboard. */
0959 /* Efface le fichier une fois l'opération terminée. */
0960 /*----------------------------------------------------------------------------*/
0961 void Post_SD_Records(char* logfilename, HardwareSerial *serial, char* MQTT_Payload) {
0962
0963 if (logfile.size()) {
0964
0965 // Ferme logfile
0966 if (logfile) logfile.close();
0967
0968 // Ouvre logfile en lecture seule
0969 logfile = SD.open(logfilename, FILE_READ);
0970
0971 if (logfile) {
0972
0973 serial->println("Resending MQTT frames stored in SD card");
0974
0975 uint32_t n = 0;
0976 String buffer;
0977
0978 // Parcours logfile ligne par ligne (trames MQTT)
0979 while (logfile.available()) {
0980
0981 // Charge la trame dans buffer
0982 buffer = logfile.readStringUntil('\n');
0983
0984 // Place le contenu du buffer dans MQTT_Payload
0985 buffer.toCharArray(MQTT_Payload, MQTT_PAYLOAD_SIZE);
0986
0987 // Envoi du contenu de MQTT_Payload à Thingsboard
0988 if (!Post_Payload(serial, MQTT_Payload)) {
0989 serial->println("\nMQTT post ERROR : " + buffer);
0990 }
0991 else {
0992 serial->println("\nMQTT post : " + buffer);
0993 }
0994 // Re-chargement de l'IDWG
0995 IWatchdog.reload();
0996
0997 n++;
0998 }
0999
1000 if (n) serial->println("Number of MQTT frames sent : " + String(n));
1001
1002 // Ferme logfile et l'efface
1003 logfile.close();
1004 SD.remove(logfilename);
1005
1006 // Crée un nouveau fichier logfile (ouverture obligatoire)
1007 logfile = SD.open(logfilename, FILE_WRITE);
1008
1009 // Ferme immédiatement le nouveau fichier
1010 logfile.close();
1011
1012 // Software reset, pour ne pas segmenter la mémoire à force d'ouvertures et
1013 // fermetures de fichiers
1014 HAL_NVIC_SystemReset();
1015
1016 } else {
1017 serial->println("error opening " + (String)logfilename);
1018 while (true);
1019 }
1020 }
1021 }
-
Ignorez cependant la partie « Configuring The Python Client », dans notre cas le client sera la station météo avec son sketch STM32duino. ↩