Echange de la température et de l’humidité entre deux NUCLEO-WB55

Ce tutoriel montre comment construire un couple périphérique - central avec deux cartes NUCLEO-WB55 pour leur faire échanger des données environnementales (en l’occurence température et humidité) avec un service et des caractéristiques construits selon le standard Bluetooth SIG. Le périphérique se limitera à publier des informations sur ses services et à les notifier au central qui les affichera.

Les scripts de ce tutoriel sont adaptés des exemples ble_temperature.py et ble_temperature_central.py disponibles dans le dépôt Github de MicroPython, ici.

Plus précisément, nous allons mettre en oeuvre un service environnemental avec :

  • Un périphérique (serveur) qui va partager des données de température et pression par un service muni de deux caractéristiques ;
  • Un central (client) qui va se connecter aux caractéristiques, décoder les messages que le périphérique y inscrit et lui envoyer en retour un accusé de réception (en écrivant les octets qui conviennent dans les caractéristiques mises à sa disposition).

Le schéma de principe suivant résume le cas d’usage que nous allons réaliser :


BLE UART use case


Le périphérique partage deux caractéristiques (humidité et température) avec le central, ayant les attributs :

  • Read : Le central peut lire la valeur de la caractéristique à tout moment
  • Notify : Le périphérique signale au central les modifications de la valeur de la caractéristique aux moments où elles sont réalisées
  • Indicate : Comme Notify mais le périphérique demande au central de lui envoyer un accusé de réception.

Matériel requis

Deux cartes NUCLEO-WB55, une qui jouera le rôle du central, une autre celui du périphérique. Dans ce tutoriel, nous n’utiliserons pas de capteurs de température et d’humidité, nous les simulerons avec un générateur de nombres aléatoires afin d’alléger (un peu) les scripts.

Les codes MicroPython pour le central

Les scripts présentés ci-après sont disponibles dans la zone de téléchargement.

Deux fichiers scripts MicroPython seront nécessaires pour le central :

  • Le script permettant de construire les trames d’advertising, intitulé ble_advertising.py. Nous ne détaillerons pas son contenu, vous pouvez le copier directement dans le répertoire PYBFLASH.
  • Le script du programme principal, main.py, qui intègre la classe BLEEnvironmentCentral mettant en oeuvre le protocole GATT.

Le script du programme principal

Éditez le script main.py contenu dans le répertoire PYBFLASH du disque USB virtuel associé à la NUCLEO-WB55 qui fera office de central.

# Exemple de mise en oeuvre d'un central BLE à l'écoute de deux caractéristiques d'un périphérique construites 
# selon le standard Bluetooth SIG.
# Le central se connecte à un périphérique qui partage des données de température et d'humidité et les affiche.
# Amélioration possible : ajouter un service qui peut être écrit par le central de sorte à pouvoir lui envoyer 
# des informations en retour.
# Code adapté de : https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_temperature_central.py 

import bluetooth # Pour gérer le BLE
from struct import unpack # Pour extraire les octets des payloads des caractéristiques
from time import sleep_ms # Pour générer des temporisations en millisecondes
from ble_advertising import decode_services, decode_name # Pour décoder le contenu des trames d'advertising
from micropython import const # Pour déclarer des constantes (entières)

# Constantes utilisées pour GATT
# Voir : https://docs.micropython.org/en/latest/library/ubluetooth.html
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_NOTIFY = const(18)

# Objet connectables avec advertising scannable
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)

# Paramètres pour fixer le rapport cyclique du scan GAP
_SCAN_DURATION_MS = const(2000)
_SCAN_INTERVAL_US = const(30000)
_SCAN_WINDOW_US = const(30000)

# Identifiant unique du service environnemental
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)

# Identifiant unique de la caractéristique de température
# org.bluetooth.characteristic.temperature
_TEMP_UUID = bluetooth.UUID(0x2A6E)

# Identifiant unique de la caractéristique d'humidité
# org.bluetooth.characteristic.humidity
_HUMI_UUID = bluetooth.UUID(0x2A6F)

# Classe pour créer un central BLE environnemental
class BLEEnvironmentCentral:

	# Initialisations
	def __init__(self, ble):
		self._ble = ble
		self._ble.active(True)
		self._ble.irq(self._irq)
		self._reset()

	# Effacement des données en mémoire cache
	def _reset(self):
		
		# Noms et adresses mises en mémoire cache après une étape de scan des périphériques
		self._name = None
		self._addr_type = None
		self._addr = None

		# Valeurs de caractéristiques mises en mémoire cache
		self._temp_value = None
		self._humi_value = None

		# "Callbacks" appelés suite à la validation de différentes opérations
		# Ils sont à usage unique et reprennent la valeur "None" après leur premier appel
		self._scan_callback = None
		self._conn_callback = None
		self._read_callback = None

		# Callback persistant pour traiter les nouvelles données notifiées par le périphérique
		self._notify_callback = None

		# Périphérique connecté
		self._conn_handle = None
		self._start_handle = None
		self._end_handle = None
		self._temp_handle = None
		self._humi_handle = None

	# Gestion des évènements
	def _irq(self, event, data):
		
		# Le scan des périphériques a permis d'identifier un serveur de données potentiel
		if event == _IRQ_SCAN_RESULT:
			addr_type, addr, adv_type, rssi, adv_data = data
			if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _ENV_SENSE_UUID in decode_services(adv_data):
				# Un périphérique potentiel a été trouvé, mémorise-le et arrète de scanner
				self._addr_type = addr_type
				# Note: le buffer addr est la propriété de l'appelant donc il est nécessaire de le copier.
				self._addr = bytes(addr)
				self._name = decode_name(adv_data) or "?"
				# Arrêt du scan
				self._ble.gap_scan(None)

		# Le scan a pris fin
		elif event == _IRQ_SCAN_DONE:
			if self._scan_callback:
				if self._addr:
					# Un périphérique a été identifié par le scan, qui a été explicitement arrêté
					self._scan_callback(self._addr_type, self._addr, self._name)
					self._scan_callback = None
				else:
					# Le scan a été interrompu car il a atteint son délai de time-out sans recenser 
					# un périphérique
					self._scan_callback(None, None, None)

		# Connexion réussie à un périphérique
		elif event == _IRQ_PERIPHERAL_CONNECT:
			conn_handle, addr_type, addr = data
			if addr_type == self._addr_type and addr == self._addr:
				self._conn_handle = conn_handle
				# On recherche les services mis à disposition par le périphérique
				self._ble.gattc_discover_services(self._conn_handle)

		# Déconnexion du périphérique (à l'initiative du périphérique ou bien du central)
		elif event == _IRQ_PERIPHERAL_DISCONNECT:
			conn_handle, _, _ = data
			if conn_handle == self._conn_handle:
				# Si la déconnexion a été générée par le central, alors le reset a déjà eu lieu
				self._reset()

		# Le périphérique auquel le central est connecté à renvoyé des informations sur l'un de ses services
		elif event == _IRQ_GATTC_SERVICE_RESULT:
			conn_handle, start_handle, end_handle, uuid = data
			if conn_handle == self._conn_handle and uuid == _ENV_SENSE_UUID:
				self._start_handle, self._end_handle = start_handle, end_handle

		# La recherche de services est terminée
		elif event == _IRQ_GATTC_SERVICE_DONE:
			if self._start_handle and self._end_handle:
				self._ble.gattc_discover_characteristics(self._conn_handle, self._start_handle, self._end_handle)
			else:
				print("Echec lors de la recherche de services environnementaux.")

		# Le périphérique connecté a renvoyé des informations sur l'une de ses caractéristiques
		elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
			conn_handle, def_handle, value_handle, properties, uuid = data
			if conn_handle == self._conn_handle :
				# S'il sagit de la température, attribue lui l'adresse "_temp_handle"
				if uuid == _TEMP_UUID:
					self._temp_handle = value_handle
				# S'il sagit de l'humidité, attribue lui l'adresse "_humi_handle"
				elif uuid == _HUMI_UUID :
					self._humi_handle = value_handle
		
		# La recherche de caractéristiques est terminée
		elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
			if self._temp_handle:
				# Nous avons terminé toutes les étapes de connexion, appel du callback "connect"
				if self._conn_callback:
					self._conn_callback()
			else:
				print("Aucune caractéristique de température ou d'humidité n'a été trouvée.")

		# Le serveur a pu lire une caractéristique
		elif event == _IRQ_GATTC_READ_RESULT:
			conn_handle, value_handle, char_data = data
			
			# S'il s'agit de la caractéristique de température, appelle _update_temp_value
			if conn_handle == self._conn_handle and value_handle == self._temp_handle:
				self._update_temp_value(char_data)
				if self._read_callback:
					self._read_callback(self._temp_value)
					self._read_callback = None
			
			# S'il s'agit de la caractéristique d'humidité, appelle _update_humi_value
			elif conn_handle == self._conn_handle and value_handle == self._humi_handle:
				self._update_humi_value(char_data)
				if self._read_callback:
					self._read_callback(self._humi_value)
					self._read_callback = None

		# Lecture d'une caractéristique
		elif event == _IRQ_GATTC_READ_DONE:
			conn_handle, value_handle, status = data

		# Une caractéristique se notifie au central et modifie périodiquement sa valeur
		elif event == _IRQ_GATTC_NOTIFY:
			conn_handle, value_handle, notify_data = data
			
			# S'il s'agit de la caractéristique de température, appelle _update_temp_value
			if conn_handle == self._conn_handle and value_handle == self._temp_handle:
				self._update_temp_value(notify_data)
				if self._notify_callback:
					self._notify_callback(self._temp_value)
			
			# S'il s'agit de la caractéristique d'humidité, appelle _update_humi_value
			elif conn_handle == self._conn_handle and value_handle == self._humi_handle:
				self._update_humi_value(notify_data)
				if self._notify_callback:
					self._notify_callback(self._humi_value)

	# Renvoie vrai si des caractéristiques ont été découvertes et que le central s'y est connecté
	def is_connected(self):
		return self._conn_handle is not None and self._temp_handle is not None and self._humi_handle is not None

	# Recherche un (futur) périphérique qui publie un service environnemental (GAP)
	def scan(self, callback = None):
		self._addr_type = None
		self._addr = None
		self._scan_callback = callback
		
		# Scanne pendant _SCAN_DURATION_MS, pendant des durées de _SCAN_WINDOWS_US espacées de _SCAN_INTERVAL_US
		self._ble.gap_scan(_SCAN_DURATION_MS, _SCAN_INTERVAL_US, _SCAN_WINDOW_US)

	# Se connecte au périphérique spécifié (autrement, utilise une adresse en mémoire cache)
	def connect(self, addr_type=None, addr=None, callback=None):
		self._addr_type = addr_type or self._addr_type
		self._addr = addr or self._addr
		self._conn_callback = callback
		if self._addr_type is None or self._addr is None:
			return False
		self._ble.gap_connect(self._addr_type, self._addr)
		return True

	# Se déconnecte du périphérique
	def disconnect(self):
		if not self._conn_handle:
			return
		self._ble.gap_disconnect(self._conn_handle)
		self._reset()

	# Réalise une lecture asynchrone des données de température, qui sera traitée par callback
	def read_temp(self, callback):
		if not self.is_connected():
			return
		self._read_callback = callback
		self._ble.gattc_read(self._conn_handle, self._temp_handle)

	# Réalise une lecture asynchrone des données d'humidité, qui sera traitée par callback
	def read_humi(self, callback):
		if not self.is_connected():
			return
		self._read_callback = callback
		self._ble.gattc_read(self._conn_handle, self._humi_handle)

	# Callback pour répondre aux notifications d'un périphérique
	def on_notify(self, callback):
		self._notify_callback = callback

	# Callback des notifications de la caractéristique de température
	# Les valeurs sont codées par des nombres à virgule flottante.
	def _update_temp_value(self, data):
		self._temp_value = unpack("<f", data)[0]
		print("Le central reçoit :")
		print(" - Température (°C) : %6.1f" % self._temp_value)
		return self._temp_value

	# Callback des notifications de la caractéristique d'humidité
	# Les valeurs sont codées par des nombres à virgule flottante.
	def _update_humi_value(self, data):
		self._humi_value = unpack("<f", data)[0]
		print("Le central reçoit :")
		print(" - Humidité relative (%%) : %6.1f" % self._humi_value)
		return self._humi_value

	# Réalise une lecture directe des données de température
	def temp_value(self):
		return self._temp_value

	# Réalise une lecture directe des données d'humidité
	def humi_value(self):
		return self._humi_value

# Programme principal

def demo():

	print("Central BLE")

	# Instanciation du BLE
	ble = bluetooth.BLE()
	central = BLEEnvironmentCentral(ble)

	not_found = False

	# Fonction "callback" de scan des trames d'advertising
	def on_scan(addr_type, addr, name):
		if addr_type is not None:
			print("Capteur trouvé :", addr_type, addr, name)
			central.connect()
		else:
			nonlocal not_found
			not_found = True
			print("Aucun capteur trouvé.")

	# Lance le scan des trames d'advertising
	central.scan(callback=on_scan)

	# Attente de connexion...
	while not central.is_connected():
		sleep_ms(100)
		if not_found:
			return

	print("Connecté")

	# Trois alternatives pour afficher les valeurs lues dans les caractéristiques
	# du périphérique :

	# Alternative 1 : Lecture des caractéristiques initiée par le central
	#while central.is_connected():
	#	central.read_temp(callback=print)
	#	central.read_humi(callback=print)
	#	sleep_ms(2000)

	# Alternative 2 : On affiche les valeurs de la notification la plus réçente
	#while central.is_connected():
	#	print("Le central reçoit :")
	#	print(" - Température (°C) " + str(central.temp_value()))
	#	print(" - Humidité relative (%)" + str(central.humi_value()))
	#	sleep_ms(1000) # Temporisation d'une seconde

	# Alternative 3 : on ne fait rien dans le programme principal, on laisse le central
	# afficher de façon totalement asynchrone via les callbacks de notifications de la
	# classe BLEEnvironmentCentral

	while central.is_connected():
		pyb.wfi() # Place le microcontrôleur en mode économie d'énergie

	print("Déconnecté")

# Appel du programme principal si le nom du présent script est "main.py"
if __name__ == "__main__":
	demo()

Les codes MicroPython pour le périphérique

Les scripts présentés ci-après sont disponibles dans la zone de téléchargement.

Deux fichiers scripts MicroPython seront nécessaires pour le périphérique :

  • Le script permettant de construire les trames d’advertising, intitulé ble_advertising.py. Nous ne détaillerons pas son contenu, vous pouvez le copier directement dans le répertoire PYBFLASH.
  • Le script du programme principal, main.py, qui intègre la classe BLEenvironment mettant en oeuvre le protocole GATT.

Le script du programme principal

Éditez le script main.py contenu dans le répertoire PYBFLASH du disque USB virtuel associé à la NUCLEO-WB55 qui fera office de périphérique.

# Cet exemple montre comment programmer un périphérique BLE avec le standard Bluetooth SIG
# pour envoyer des mesures de température et d'humidité à l'aide d'un service contenant deux
# caractéristiques.
# Les mesures sont simulées avec un générateur de nombres aléatoires puis mises à jour toutes 
# les cinq secondes par le périphérique, et notifiées à la même fréquence à un central.

import bluetooth # Pour la gestion du BLE
import random # Pour la génération de valeurs aléatoires
from struct import pack # Pour construire les "payloads" des caractéristiques BLE, en aggrégeant des octets
from time import sleep_ms # Pour la gestion des temporisations en millisecondes
from ble_advertising import advertising_payload # Pour construire des trames d'advertising
from micropython import const # Pour la déclaration de constantes entières

# Constantes pour construire les services BLE
# Voir : https://docs.micropython.org/en/latest/library/ubluetooth.html

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_INDICATE_DONE = const(20)

_FLAG_READ = const(0x0002)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)

# Identifiant SIG du service de données environnementales.

# Voir org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)

# Identifiant SIG de la caractéristique de température.
# Voir org.bluetooth.characteristic.temperature

_TEMP_CHAR = (
	bluetooth.UUID(0x2A6E),
	# La caractéristique peut être lue, se notifier et "s'indiquer"
	_FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE,
)

# Identifiant SIG de la caractéristique d'humidité.
# Voir org.bluetooth.characteristic.temperature
_HUMI_CHAR = (
	bluetooth.UUID(0x2A6F),
	# La caractéristique peut être lue, se notifier et "s'indiquer"
	_FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE,
)

# Construction d'un service à deux caractéristiques.
_ENV_SENSE_SERVICE = (
	_ENV_SENSE_UUID,
	(_TEMP_CHAR,_HUMI_CHAR,),
)

# Icône associée à un advertiser (GAP) de données environnementales
# Voir org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_ENVSENSOR = const(5696)

# Classe pour gérer le partage de données environnementales
class BLEenvironment:

	# Initialisations
	def __init__(self, ble, name="Nucleo-WB55"):
		self._ble = ble
		self._ble.active(True)
		self._ble.irq(self._irq)
		 # Prévoit deux caractéristiques (température et humidité)
		((self._temp_handle,self._humi_handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
		self._connections = set()
		self._payload = advertising_payload(
			name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_ENVSENSOR
		)
		self._advertise()
		self._handler = None

	# Gestion des évènements BLE
	def _irq(self, event, data):
		# Lorsqu'un central se connecte ...
		if event == _IRQ_CENTRAL_CONNECT:
			conn_handle, _, _ = data
			self._connections.add(conn_handle)
			print("Connecté")
			
		# Lorsqu'un central se déconnecte ...
		elif event == _IRQ_CENTRAL_DISCONNECT:
			conn_handle, _, _ = data
			self._connections.remove(conn_handle)
			# Relance l'advertising pour de futures connexions
			self._advertise()
			print("Déconnecté")
		# Lorsqu'un évènement indicate est validé, accuse sa réception
		elif event == _IRQ_GATTS_INDICATE_DONE:
			conn_handle, value_handle, status = data

	def set_temp(self, temp_deg_c, notify=False, indicate=False):
		
		# Ecrit la température au format float "<f" et la laisse en lecture à un éventuel central.
		self._ble.gatts_write(self._temp_handle, pack("<f", temp_deg_c))
		if notify or indicate:
			for conn_handle in self._connections:
				if notify:
					# Notifie les centraux connectés
					self._ble.gatts_notify(conn_handle, self._temp_handle)
				if indicate:
					# "Indicate" les centraux connectés (comme Notify, mais avec un accusé de réception)
					self._ble.gatts_indicate(conn_handle, self._temp_handle)

	def set_humi(self, humi_percent, notify=False, indicate=False):
		
		# Ecrit l'humidité au format float "<f" et la laisse en lecture à un éventuel central.
		self._ble.gatts_write(self._humi_handle, pack("<f", humi_percent))
		if notify or indicate:
			for conn_handle in self._connections:
				if notify:
					# Notifie les centraux connectés
					self._ble.gatts_notify(conn_handle, self._humi_handle)
				if indicate:
					# "Indicate" les centraux connectés (comme Notify, mais avec un accusé de réception)
					self._ble.gatts_indicate(conn_handle, self._humi_handle)

	# Envoie des trames d'advertising toutes les 5 secondes, précise que l'on pourra se connecter à l'objet
	def _advertise(self, interval_us=500000):
		self._ble.gap_advertise(interval_us, adv_data=self._payload, connectable = True)


# Programme principal

print("Périphérique BLE")

ble = bluetooth.BLE()
ble_device = BLEenvironment(ble)

while True:

	# valeur environnementales simulées
	temperature = random.randint(-20, 90) # Valeur aléatoire entre -20 et 90 °C
	humidite = random.randint(0, 100) # Valeur aléatoire entre 0 et 100 %

	print("Le périphérique envoit :")
	print(" - Température (°C) : " + str(temperature))
	print(" - Humidité relative (%) : " + str(humidite))

	# Envoi en BLE de la température en choisissant de notifier l'application
	ble_device.set_temp(temperature, notify=True, indicate = False)
	
	# Envoi en BLE de l'humidité en choisissant de notifier l'application
	ble_device.set_humi(humidite, notify=True, indicate = False)

	# Temporisation de cinq secondes
	sleep_ms(5000)

Mise en oeuvre

Commencez par démarrer le script du périphérique (CTRL+D dans le terminal PuTTY), et ensuite le central (idem). Si tout se déroule correctement vous devriez observer les échanges ci-dessous (aux valeurs de température et humidité près) sur les deux terminaux :


Sortie UART BLE


Pour aller plus loin

Une première amélioration possible de cet exemple consisterait à faire exposer par le périphérique un service qui pourait être écrit par le central pour échanger des commandes. Si cet exercice vous tente, nous vous conseillons de vous inspirer de cet autre exemple.

Il pourrait être aussi intéressant de permettre au central de se connecter à deux périphériques (ou plus) simultanément. Pour cela, il faut généraliser le script du central en s’inspirant de ce qui est proposé sur l’exemple de mise en oeuvre du protocole GAP.