Echange de la température et de l’humidité entre deux NUCLEO-WB55
Ce tutoriel montre comment construire un couple périphérique - central avec deux cartes NUCLEO-WB55 pour leur faire échanger des données environnementales (en l’occurence température et humidité) avec un service et des caractéristiques construits selon le standard Bluetooth SIG. Le périphérique se limitera à publier des informations sur ses services et à les notifier au central qui les affichera.
Les scripts de ce tutoriel sont adaptés des exemples ble_temperature.py et ble_temperature_central.py disponibles dans le dépôt Github de MicroPython, ici.
Plus précisément, nous allons mettre en oeuvre un service environnemental avec :
- Un périphérique (serveur) qui va partager des données de température et pression par un service muni de deux caractéristiques ;
- Un central (client) qui va se connecter aux caractéristiques, décoder les messages que le périphérique y inscrit et lui envoyer en retour un accusé de réception (en écrivant les octets qui conviennent dans les caractéristiques mises à sa disposition).
Le schéma de principe suivant résume le cas d’usage que nous allons réaliser :

Le périphérique partage deux caractéristiques (humidité et température) avec le central, ayant les attributs :
- Read : Le central peut lire la valeur de la caractéristique à tout moment
- Notify : Le périphérique signale au central les modifications de la valeur de la caractéristique aux moments où elles sont réalisées
- Indicate : Comme Notify mais le périphérique demande au central de lui envoyer un accusé de réception.
Matériel requis
Deux cartes NUCLEO-WB55, une qui jouera le rôle du central, une autre celui du périphérique. Dans ce tutoriel, nous n’utiliserons pas de capteurs de température et d’humidité, nous les simulerons avec un générateur de nombres aléatoires afin d’alléger (un peu) les scripts.
Les codes MicroPython pour le central
Les scripts présentés ci-après sont disponibles dans la zone de téléchargement.
Deux fichiers scripts MicroPython seront nécessaires pour le central :
- Le script permettant de construire les trames d’advertising, intitulé ble_advertising.py. Nous ne détaillerons pas son contenu, vous pouvez le copier directement dans le répertoire PYBFLASH.
- Le script du programme principal, main.py, qui intègre la classe BLEEnvironmentCentral mettant en oeuvre le protocole GATT.
Le script du programme principal
Éditez le script main.py contenu dans le répertoire PYBFLASH du disque USB virtuel associé à la NUCLEO-WB55 qui fera office de central.
# Exemple de mise en oeuvre d'un central BLE à l'écoute de deux caractéristiques d'un périphérique construites
# selon le standard Bluetooth SIG.
# Le central se connecte à un périphérique qui partage des données de température et d'humidité et les affiche.
# Amélioration possible : ajouter un service qui peut être écrit par le central de sorte à pouvoir lui envoyer
# des informations en retour.
# Code adapté de : https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_temperature_central.py
import bluetooth # Pour gérer le BLE
from struct import unpack # Pour extraire les octets des payloads des caractéristiques
from time import sleep_ms # Pour générer des temporisations en millisecondes
from ble_advertising import decode_services, decode_name # Pour décoder le contenu des trames d'advertising
from micropython import const # Pour déclarer des constantes (entières)
# Constantes utilisées pour GATT
# Voir : https://docs.micropython.org/en/latest/library/ubluetooth.html
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_NOTIFY = const(18)
# Objet connectables avec advertising scannable
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
# Paramètres pour fixer le rapport cyclique du scan GAP
_SCAN_DURATION_MS = const(2000)
_SCAN_INTERVAL_US = const(30000)
_SCAN_WINDOW_US = const(30000)
# Identifiant unique du service environnemental
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# Identifiant unique de la caractéristique de température
# org.bluetooth.characteristic.temperature
_TEMP_UUID = bluetooth.UUID(0x2A6E)
# Identifiant unique de la caractéristique d'humidité
# org.bluetooth.characteristic.humidity
_HUMI_UUID = bluetooth.UUID(0x2A6F)
# Classe pour créer un central BLE environnemental
class BLEEnvironmentCentral:
# Initialisations
def __init__(self, ble):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._reset()
# Effacement des données en mémoire cache
def _reset(self):
# Noms et adresses mises en mémoire cache après une étape de scan des périphériques
self._name = None
self._addr_type = None
self._addr = None
# Valeurs de caractéristiques mises en mémoire cache
self._temp_value = None
self._humi_value = None
# "Callbacks" appelés suite à la validation de différentes opérations
# Ils sont à usage unique et reprennent la valeur "None" après leur premier appel
self._scan_callback = None
self._conn_callback = None
self._read_callback = None
# Callback persistant pour traiter les nouvelles données notifiées par le périphérique
self._notify_callback = None
# Périphérique connecté
self._conn_handle = None
self._start_handle = None
self._end_handle = None
self._temp_handle = None
self._humi_handle = None
# Gestion des évènements
def _irq(self, event, data):
# Le scan des périphériques a permis d'identifier un serveur de données potentiel
if event == _IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data
if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _ENV_SENSE_UUID in decode_services(adv_data):
# Un périphérique potentiel a été trouvé, mémorise-le et arrète de scanner
self._addr_type = addr_type
# Note: le buffer addr est la propriété de l'appelant donc il est nécessaire de le copier.
self._addr = bytes(addr)
self._name = decode_name(adv_data) or "?"
# Arrêt du scan
self._ble.gap_scan(None)
# Le scan a pris fin
elif event == _IRQ_SCAN_DONE:
if self._scan_callback:
if self._addr:
# Un périphérique a été identifié par le scan, qui a été explicitement arrêté
self._scan_callback(self._addr_type, self._addr, self._name)
self._scan_callback = None
else:
# Le scan a été interrompu car il a atteint son délai de time-out sans recenser
# un périphérique
self._scan_callback(None, None, None)
# Connexion réussie à un périphérique
elif event == _IRQ_PERIPHERAL_CONNECT:
conn_handle, addr_type, addr = data
if addr_type == self._addr_type and addr == self._addr:
self._conn_handle = conn_handle
# On recherche les services mis à disposition par le périphérique
self._ble.gattc_discover_services(self._conn_handle)
# Déconnexion du périphérique (à l'initiative du périphérique ou bien du central)
elif event == _IRQ_PERIPHERAL_DISCONNECT:
conn_handle, _, _ = data
if conn_handle == self._conn_handle:
# Si la déconnexion a été générée par le central, alors le reset a déjà eu lieu
self._reset()
# Le périphérique auquel le central est connecté à renvoyé des informations sur l'un de ses services
elif event == _IRQ_GATTC_SERVICE_RESULT:
conn_handle, start_handle, end_handle, uuid = data
if conn_handle == self._conn_handle and uuid == _ENV_SENSE_UUID:
self._start_handle, self._end_handle = start_handle, end_handle
# La recherche de services est terminée
elif event == _IRQ_GATTC_SERVICE_DONE:
if self._start_handle and self._end_handle:
self._ble.gattc_discover_characteristics(self._conn_handle, self._start_handle, self._end_handle)
else:
print("Echec lors de la recherche de services environnementaux.")
# Le périphérique connecté a renvoyé des informations sur l'une de ses caractéristiques
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
conn_handle, def_handle, value_handle, properties, uuid = data
if conn_handle == self._conn_handle :
# S'il sagit de la température, attribue lui l'adresse "_temp_handle"
if uuid == _TEMP_UUID:
self._temp_handle = value_handle
# S'il sagit de l'humidité, attribue lui l'adresse "_humi_handle"
elif uuid == _HUMI_UUID :
self._humi_handle = value_handle
# La recherche de caractéristiques est terminée
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
if self._temp_handle:
# Nous avons terminé toutes les étapes de connexion, appel du callback "connect"
if self._conn_callback:
self._conn_callback()
else:
print("Aucune caractéristique de température ou d'humidité n'a été trouvée.")
# Le serveur a pu lire une caractéristique
elif event == _IRQ_GATTC_READ_RESULT:
conn_handle, value_handle, char_data = data
# S'il s'agit de la caractéristique de température, appelle _update_temp_value
if conn_handle == self._conn_handle and value_handle == self._temp_handle:
self._update_temp_value(char_data)
if self._read_callback:
self._read_callback(self._temp_value)
self._read_callback = None
# S'il s'agit de la caractéristique d'humidité, appelle _update_humi_value
elif conn_handle == self._conn_handle and value_handle == self._humi_handle:
self._update_humi_value(char_data)
if self._read_callback:
self._read_callback(self._humi_value)
self._read_callback = None
# Lecture d'une caractéristique
elif event == _IRQ_GATTC_READ_DONE:
conn_handle, value_handle, status = data
# Une caractéristique se notifie au central et modifie périodiquement sa valeur
elif event == _IRQ_GATTC_NOTIFY:
conn_handle, value_handle, notify_data = data
# S'il s'agit de la caractéristique de température, appelle _update_temp_value
if conn_handle == self._conn_handle and value_handle == self._temp_handle:
self._update_temp_value(notify_data)
if self._notify_callback:
self._notify_callback(self._temp_value)
# S'il s'agit de la caractéristique d'humidité, appelle _update_humi_value
elif conn_handle == self._conn_handle and value_handle == self._humi_handle:
self._update_humi_value(notify_data)
if self._notify_callback:
self._notify_callback(self._humi_value)
# Renvoie vrai si des caractéristiques ont été découvertes et que le central s'y est connecté
def is_connected(self):
return self._conn_handle is not None and self._temp_handle is not None and self._humi_handle is not None
# Recherche un (futur) périphérique qui publie un service environnemental (GAP)
def scan(self, callback = None):
self._addr_type = None
self._addr = None
self._scan_callback = callback
# Scanne pendant _SCAN_DURATION_MS, pendant des durées de _SCAN_WINDOWS_US espacées de _SCAN_INTERVAL_US
self._ble.gap_scan(_SCAN_DURATION_MS, _SCAN_INTERVAL_US, _SCAN_WINDOW_US)
# Se connecte au périphérique spécifié (autrement, utilise une adresse en mémoire cache)
def connect(self, addr_type=None, addr=None, callback=None):
self._addr_type = addr_type or self._addr_type
self._addr = addr or self._addr
self._conn_callback = callback
if self._addr_type is None or self._addr is None:
return False
self._ble.gap_connect(self._addr_type, self._addr)
return True
# Se déconnecte du périphérique
def disconnect(self):
if not self._conn_handle:
return
self._ble.gap_disconnect(self._conn_handle)
self._reset()
# Réalise une lecture asynchrone des données de température, qui sera traitée par callback
def read_temp(self, callback):
if not self.is_connected():
return
self._read_callback = callback
self._ble.gattc_read(self._conn_handle, self._temp_handle)
# Réalise une lecture asynchrone des données d'humidité, qui sera traitée par callback
def read_humi(self, callback):
if not self.is_connected():
return
self._read_callback = callback
self._ble.gattc_read(self._conn_handle, self._humi_handle)
# Callback pour répondre aux notifications d'un périphérique
def on_notify(self, callback):
self._notify_callback = callback
# Callback des notifications de la caractéristique de température
# Les valeurs sont codées par des nombres à virgule flottante.
def _update_temp_value(self, data):
self._temp_value = unpack("<f", data)[0]
print("Le central reçoit :")
print(" - Température (°C) : %6.1f" % self._temp_value)
return self._temp_value
# Callback des notifications de la caractéristique d'humidité
# Les valeurs sont codées par des nombres à virgule flottante.
def _update_humi_value(self, data):
self._humi_value = unpack("<f", data)[0]
print("Le central reçoit :")
print(" - Humidité relative (%%) : %6.1f" % self._humi_value)
return self._humi_value
# Réalise une lecture directe des données de température
def temp_value(self):
return self._temp_value
# Réalise une lecture directe des données d'humidité
def humi_value(self):
return self._humi_value
# Programme principal
def demo():
print("Central BLE")
# Instanciation du BLE
ble = bluetooth.BLE()
central = BLEEnvironmentCentral(ble)
not_found = False
# Fonction "callback" de scan des trames d'advertising
def on_scan(addr_type, addr, name):
if addr_type is not None:
print("Capteur trouvé :", addr_type, addr, name)
central.connect()
else:
nonlocal not_found
not_found = True
print("Aucun capteur trouvé.")
# Lance le scan des trames d'advertising
central.scan(callback=on_scan)
# Attente de connexion...
while not central.is_connected():
sleep_ms(100)
if not_found:
return
print("Connecté")
# Trois alternatives pour afficher les valeurs lues dans les caractéristiques
# du périphérique :
# Alternative 1 : Lecture des caractéristiques initiée par le central
#while central.is_connected():
# central.read_temp(callback=print)
# central.read_humi(callback=print)
# sleep_ms(2000)
# Alternative 2 : On affiche les valeurs de la notification la plus réçente
#while central.is_connected():
# print("Le central reçoit :")
# print(" - Température (°C) " + str(central.temp_value()))
# print(" - Humidité relative (%)" + str(central.humi_value()))
# sleep_ms(1000) # Temporisation d'une seconde
# Alternative 3 : on ne fait rien dans le programme principal, on laisse le central
# afficher de façon totalement asynchrone via les callbacks de notifications de la
# classe BLEEnvironmentCentral
while central.is_connected():
pyb.wfi() # Place le microcontrôleur en mode économie d'énergie
print("Déconnecté")
# Appel du programme principal si le nom du présent script est "main.py"
if __name__ == "__main__":
demo()
Les codes MicroPython pour le périphérique
Les scripts présentés ci-après sont disponibles dans la zone de téléchargement.
Deux fichiers scripts MicroPython seront nécessaires pour le périphérique :
- Le script permettant de construire les trames d’advertising, intitulé ble_advertising.py. Nous ne détaillerons pas son contenu, vous pouvez le copier directement dans le répertoire PYBFLASH.
- Le script du programme principal, main.py, qui intègre la classe BLEenvironment mettant en oeuvre le protocole GATT.
Le script du programme principal
Éditez le script main.py contenu dans le répertoire PYBFLASH du disque USB virtuel associé à la NUCLEO-WB55 qui fera office de périphérique.
# Cet exemple montre comment programmer un périphérique BLE avec le standard Bluetooth SIG
# pour envoyer des mesures de température et d'humidité à l'aide d'un service contenant deux
# caractéristiques.
# Les mesures sont simulées avec un générateur de nombres aléatoires puis mises à jour toutes
# les cinq secondes par le périphérique, et notifiées à la même fréquence à un central.
import bluetooth # Pour la gestion du BLE
import random # Pour la génération de valeurs aléatoires
from struct import pack # Pour construire les "payloads" des caractéristiques BLE, en aggrégeant des octets
from time import sleep_ms # Pour la gestion des temporisations en millisecondes
from ble_advertising import advertising_payload # Pour construire des trames d'advertising
from micropython import const # Pour la déclaration de constantes entières
# Constantes pour construire les services BLE
# Voir : https://docs.micropython.org/en/latest/library/ubluetooth.html
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_INDICATE_DONE = const(20)
_FLAG_READ = const(0x0002)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)
# Identifiant SIG du service de données environnementales.
# Voir org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# Identifiant SIG de la caractéristique de température.
# Voir org.bluetooth.characteristic.temperature
_TEMP_CHAR = (
bluetooth.UUID(0x2A6E),
# La caractéristique peut être lue, se notifier et "s'indiquer"
_FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE,
)
# Identifiant SIG de la caractéristique d'humidité.
# Voir org.bluetooth.characteristic.temperature
_HUMI_CHAR = (
bluetooth.UUID(0x2A6F),
# La caractéristique peut être lue, se notifier et "s'indiquer"
_FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE,
)
# Construction d'un service à deux caractéristiques.
_ENV_SENSE_SERVICE = (
_ENV_SENSE_UUID,
(_TEMP_CHAR,_HUMI_CHAR,),
)
# Icône associée à un advertiser (GAP) de données environnementales
# Voir org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_ENVSENSOR = const(5696)
# Classe pour gérer le partage de données environnementales
class BLEenvironment:
# Initialisations
def __init__(self, ble, name="Nucleo-WB55"):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
# Prévoit deux caractéristiques (température et humidité)
((self._temp_handle,self._humi_handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
self._connections = set()
self._payload = advertising_payload(
name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_ENVSENSOR
)
self._advertise()
self._handler = None
# Gestion des évènements BLE
def _irq(self, event, data):
# Lorsqu'un central se connecte ...
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _ = data
self._connections.add(conn_handle)
print("Connecté")
# Lorsqu'un central se déconnecte ...
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
self._connections.remove(conn_handle)
# Relance l'advertising pour de futures connexions
self._advertise()
print("Déconnecté")
# Lorsqu'un évènement indicate est validé, accuse sa réception
elif event == _IRQ_GATTS_INDICATE_DONE:
conn_handle, value_handle, status = data
def set_temp(self, temp_deg_c, notify=False, indicate=False):
# Ecrit la température au format float "<f" et la laisse en lecture à un éventuel central.
self._ble.gatts_write(self._temp_handle, pack("<f", temp_deg_c))
if notify or indicate:
for conn_handle in self._connections:
if notify:
# Notifie les centraux connectés
self._ble.gatts_notify(conn_handle, self._temp_handle)
if indicate:
# "Indicate" les centraux connectés (comme Notify, mais avec un accusé de réception)
self._ble.gatts_indicate(conn_handle, self._temp_handle)
def set_humi(self, humi_percent, notify=False, indicate=False):
# Ecrit l'humidité au format float "<f" et la laisse en lecture à un éventuel central.
self._ble.gatts_write(self._humi_handle, pack("<f", humi_percent))
if notify or indicate:
for conn_handle in self._connections:
if notify:
# Notifie les centraux connectés
self._ble.gatts_notify(conn_handle, self._humi_handle)
if indicate:
# "Indicate" les centraux connectés (comme Notify, mais avec un accusé de réception)
self._ble.gatts_indicate(conn_handle, self._humi_handle)
# Envoie des trames d'advertising toutes les 5 secondes, précise que l'on pourra se connecter à l'objet
def _advertise(self, interval_us=500000):
self._ble.gap_advertise(interval_us, adv_data=self._payload, connectable = True)
# Programme principal
print("Périphérique BLE")
ble = bluetooth.BLE()
ble_device = BLEenvironment(ble)
while True:
# valeur environnementales simulées
temperature = random.randint(-20, 90) # Valeur aléatoire entre -20 et 90 °C
humidite = random.randint(0, 100) # Valeur aléatoire entre 0 et 100 %
print("Le périphérique envoit :")
print(" - Température (°C) : " + str(temperature))
print(" - Humidité relative (%) : " + str(humidite))
# Envoi en BLE de la température en choisissant de notifier l'application
ble_device.set_temp(temperature, notify=True, indicate = False)
# Envoi en BLE de l'humidité en choisissant de notifier l'application
ble_device.set_humi(humidite, notify=True, indicate = False)
# Temporisation de cinq secondes
sleep_ms(5000)
Mise en oeuvre
Commencez par démarrer le script du périphérique (CTRL+D dans le terminal PuTTY), et ensuite le central (idem). Si tout se déroule correctement vous devriez observer les échanges ci-dessous (aux valeurs de température et humidité près) sur les deux terminaux :

Pour aller plus loin
Une première amélioration possible de cet exemple consisterait à faire exposer par le périphérique un service qui pourait être écrit par le central pour échanger des commandes. Si cet exercice vous tente, nous vous conseillons de vous inspirer de cet autre exemple.
Il pourrait être aussi intéressant de permettre au central de se connecter à deux périphériques (ou plus) simultanément. Pour cela, il faut généraliser le script du central en s’inspirant de ce qui est proposé sur l’exemple de mise en oeuvre du protocole GAP.