Echanges de chaînes de caractères entre deux NUCLEO-WB55

Ce tutoriel montre comment utiliser un service BLE construit de toutes pièces permettant à un central et un périphérique d’échanger des séquences d’octets qui pourront être interprétées comme des commandes, des valeurs numériques ou encore des séquences de caractères affichables (ie du texte). On désigne abusivement ce service par BLE UART car il apporte une fonctionnalité similaire à l’UART.

Les scripts de ce tutoriel sont adaptés des exemples ble_simple_central.py et ble_simple_peripheral.py disponibles dans le dépôt Github de MicroPython, ici.

Plus précisément, nous allons mettre en oeuvre le BLE UART avec :

  • Un périphérique (serveur) qui va partager des données de température, pression et humidité au sein d’une caractéristique TX qui expose une chaîne de caractères. Il sert également une autre caractéristique exposant une chaîne de caractères, RX, dans laquelle le client pourra écrire sa réponse ;
  • Un central (client) qui va lire le contenu de la caractéristique TX, décoder les messages en provenance du périphérique qu’elle contient et envoyer en retour à ce dernier un message via la caractéristique RX (en modifiant les octets qui conviennent dans celle-ci).

Le schéma de principe suivant résume le cas d’usage que nous allons réaliser :


BLE UART use case


Il montre le protocole GATT en action après la phase d’advertising GAP et la connexion entre le central et un périphérique. Le périphérique “partage” avec le central deux caractéristiques :

  • UART_TX dotée de l’attribut “Notify”. Le central (client) peut lire son contenu quand bon lui semble et est informé (NOTIFY) des modifications que le périphérique (serveur) apporte à celle-ci.
  • UART_RX dotée le l’attributs “Write”. Le central (client) peut donc y écrire (WRITE) du contenu quand bon lui semble. L’usage conjoint de ces deux caractéristiques permet de simuler un port série de faible débit avec sa ligne d’émission (RX) et de transmission (TX), ce qui justifie le nom d’UART attribué à ce service.

Matériel requis

Deux cartes NUCLEO-WB55, dont une équipée avec une carte d’extension X-NUCLEO-IKS01A3 ; on aura donc :

  • Central : NUCLEO-WB55
  • Périphérique : NUCLEO-WB55 avec une carte d’extension X-NUCLEO-IKS01A3

Les codes MicroPython pour le central

Les scripts présentés ci-après sont disponibles dans la zone de téléchargement.

Deux fichiers scripts MicroPython seront nécessaires pour le central :

  • Le script permettant de construire les trames d’advertising, intitulé ble_advertising.py. Nous ne détaillerons pas son contenu, vous pouvez le copier directement dans le répertoire PYBFLASH.
  • Le script du programme principal, main.py, qui intègre la classe BLECentral simulant un UART côté client à l’aide du protocole GATT.

Le script du programme principal

Éditez le script main.py contenu dans le répertoire PYBFLASH du disque USB virtuel associé à la NUCLEO-WB55 qui fera office de central.

# Ce script montre comment créer un central UART c'est à dire comment :
# 1 - Détecter un périphérique exécutant le service UART et exposant deux caractéristiques : TX et RX.
# 2 - Se connecter à ce périphérique pour recevoir, sous forme de caractères encodés UTF-8, un message notifié par TX.
# 3 - Répondre au périphérique en écrivant 20 octets max. dans RX.
# Dans cet exemple :
#   - le périphérique envoie une chaîne de caractères contenant la représentation affichable de valeurs
#     de température, pression et humidité qu'il a mesurées.
#   - le central reçoit cette chaîne, la découpe et affiche les mesures sur le terminal série de l'USB USER
#   - le central renvoie au périphérique un simple accusé de réception. On notera que ce message retour ne peut pas être 
#     plus long que 20 caractères du fait d'une limitation de gattc_write dans sa version actuelle (?).

# Source : https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_simple_central.py

import bluetooth # Classes "primitives du BLE"
from ble_advertising import decode_services, decode_name # Pour décoder les messages reçus

# Constantes requises pour construire le service GATT BLE UART
# Voir : https://docs.micropython.org/en/latest/library/ubluetooth.html
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)

# Objet connectables avec advertising scannable
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)

# Paramètres pour fixer le rapport cyclique du scan GAP
_SCAN_DURATION_MS = const(2000)
_SCAN_INTERVAL_US = const(30000)
_SCAN_WINDOW_US = const(30000)

# Définition du service UART avec ses deux caractéristiques RX et TX
_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")

# Variables globales partagées par les fonctions asynchrones qui répondent aux évènements (callback) 
adresse_MAC = 0 # Adresse matérielle de la radio BLE du central
AR_central_requis = 0 # Est-ce que le central doit envoyer un accusé de réception au périphérique ?

# Classe pour gérer le Central BLE
class BLECentral:

	# Initialisation 
	def __init__(self, ble):
		self._ble = ble
		self._ble.active(True)
		self._ble.irq(self._irq)
		self._reset()
	
	# Réinitialisation (appelée lors des déconnexions)
	def _reset(self):
		# Efface le cache des adresses et des noms des scans
		self._name = None
		self._addr_type = None
		self._addr = None
		
		# Fonctions de réponses (callback) à la complétion de différents évènements
		self._scan_callback = None
		self._conn_callback = None
		self._read_callback = None

		# Fonction de réponse du central aux notifications des périphériques 
		self._notify_callback = None

		# Adresses et caractéristiques du périphérique connecté
		self._conn_handle = None
		self._start_handle = None
		self._end_handle = None
		self._tx_handle = None
		self._rx_handle = None

	# Interruptions de gestion des évènements
	def _irq(self, event, data):
	
		# Evènement "Résultat de scan"
		if event == _IRQ_SCAN_RESULT:
			# Lecture du contenu de la trame d'advertising
			addr_type, addr, adv_type, rssi, adv_data = data
			# Si l'advertising signale un service UART
			if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _UART_SERVICE_UUID in decode_services(adv_data):
				# Un périphérique potentiel est identifié, référence le et arrète le scan.
				self._addr_type = addr_type
				self._addr = bytes(addr) # Note: le tampon addr a pour propriétaire l'appelant, donc il faut le copier.
				self._name = decode_name(adv_data) or "?"
				self._ble.gap_scan(None)

		# Evènement "Scan terminé"
		elif event == _IRQ_SCAN_DONE:
			if self._scan_callback:
				if self._addr:
					# Un périphérique a été détecté (et le scan a été explicitement interrompu en conséquence)
					self._scan_callback(self._addr_type, self._addr, self._name)
					self._scan_callback = None
				else:
					# Le scan a dépassé son délai de "time-out".
					self._scan_callback(None, None, None)

		# Evènement "Connexion réussie"
		elif event == _IRQ_PERIPHERAL_CONNECT:
			conn_handle, addr_type, addr = data
			if addr_type == self._addr_type and addr == self._addr:
				self._conn_handle = conn_handle
				self._ble.gattc_discover_services(self._conn_handle)

		# Evènement "Déconnexion" (initié par le central ou par le périphérique)
		elif event == _IRQ_PERIPHERAL_DISCONNECT:
			conn_handle, _, _ = data
			if conn_handle == self._conn_handle:
				# Si déconnexion initiée par le central, le reset a déjà été fait
				self._reset()

		# Evènement "Le périphérique connecté a notifié un service au central"
		elif event == _IRQ_GATTC_SERVICE_RESULT:
			conn_handle, start_handle, end_handle, uuid = data
			if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID:
				self._start_handle, self._end_handle = start_handle, end_handle

		# Evènement "Recherche de services terminée"
		elif event == _IRQ_GATTC_SERVICE_DONE:
			if self._start_handle and self._end_handle:
				self._ble.gattc_discover_characteristics(
					self._conn_handle, self._start_handle, self._end_handle
				)
			else:
				print("Le service UART est introuvable.")

		# Evènement "Le périphérique connecté a notifié une caractéristique au central"
		elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
			conn_handle, def_handle, value_handle, properties, uuid = data
			if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID:
				self._rx_handle = value_handle
			if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID:
				self._tx_handle = value_handle

		# Evènement "Recherche de caractéristiques terminée"
		elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
			if self._tx_handle is not None and self._rx_handle is not None:
				# Nous avons terminé la connexion et la découverte de périphériques, 
				# génère le callback de connexion.
				if self._conn_callback:
					self._conn_callback()
			else:
				print("Caractéristique UART RX introuvable.")

		# Evènement "Accusé de réception du périphérique", 
		# qui survient lorsque le central envoie un message, si on a explicitement demandé un AR
		elif event == _IRQ_GATTC_WRITE_DONE:
			conn_handle, value_handle, status = data
			print("Ecriture dans RX réalisée")

		# Evènement "Réponse aux notifications du périphérique" sur la caractéristique TX
		elif event == _IRQ_GATTC_NOTIFY:
			conn_handle, value_handle, notify_data = data
			if conn_handle == self._conn_handle and value_handle == self._tx_handle:
				if self._notify_callback:
					self._notify_callback(notify_data)

	# Revoie True si nous sommes connectés au service UART.
	def is_connected(self):
		return (
			self._conn_handle is not None
			and self._tx_handle is not None
			and self._rx_handle is not None
		)

	# Recherche un périphérique qui propose le service UART
	def scan(self, callback=None):
		self._addr_type = None
		self._addr = None
		self._scan_callback = callback
		# Scanne pendant _SCAN_DURATION_MS, pendant des durées de _SCAN_WINDOWS_US espacées de _SCAN_INTERVAL_US
		self._ble.gap_scan(_SCAN_DURATION_MS, _SCAN_INTERVAL_US, _SCAN_WINDOW_US)

	# Se connecte au périphérique spécifié
	# Si aucun périphérique spécifié, utilise les adresses mises en cache après un scan
	def connect(self, addr_type=None, addr=None, callback=None):
		self._addr_type = addr_type or self._addr_type
		self._addr = addr or self._addr
		self._conn_callback = callback
		if self._addr_type is None or self._addr is None:
			return False
		self._ble.gap_connect(self._addr_type, self._addr)
		return True

	# Se déconnecte du périphérique
	def disconnect(self):
		if not self._conn_handle:
			return
		self._ble.gap_disconnect(self._conn_handle)
		self._reset()

	# Envoie des données sur l'UART (écriture dans la caractéristique RX)
	# Cette méthode permet au central d'envoyer un message au périphérique connecté.

	def write(self, v, response = False):
		
		if not self.is_connected():
			return
		self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0)
		
		# Confirme que l'accusé de réception a bien été envoyé
		global AR_central_requis
		AR_central_requis = 0

	# Active le gestionnaire des évènements de réception sur l'UART
	def on_notify(self, callback):
		self._notify_callback = callback

# Gestionnaire de l'évènement de réception qui répond à une notification lorsque la caractéristique TX
# est modifiée.
def on_receipt(v):
	
	# Conversion en octets de la charge utile la caractéristique TX
	b = bytes(v)

	# On convertit les octets reçus en caractères codés au format UTF-8
	payload = b.decode('utf-8')

	print("Message recu de " + str(adresse_MAC) + " : ", payload)
	
	# On sépare les mesures grâce à l'instruction split
	temp, humi, press = payload.split("|")

	# On affiche les valeurs de température, de pression et d'humidité relative
	print("Température = " + temp + " °C")
	print("Humidité relative = " + humi + " %")
	print("Pression = " + press + " hPa")

	# Le central a bien reçu un message du périphérique, donc il doit lui envoyer un accusé de réception
	global AR_central_requis
	AR_central_requis = 1

# Création d'une instance de la classe central
ble = bluetooth.BLE()
central = BLECentral(ble)

aucun_peripherique = 0 # Vaudra 1 si un périphérique est détecté

# Gestionnaire des évènements de scan
def on_scan(addr_type, addr, name):

	import ubinascii # Pour convertir des informations binaires en texte
	from ubinascii import hexlify # Pour convertir un nombre hexadécimal en sa représentation binaire affichable

	if addr_type is not None:
		global adresse_MAC
		b = bytes(addr)
		print("Périphérique trouvé : ", name)
		adresse_MAC = hexlify(b).decode('utf-8')
		central.connect()
	else:
		global aucun_peripherique
		aucun_peripherique = 1
		print("Aucun périphérique trouvé.")

# Programme principal
def demo():

	print("Central BLE")

	import time # Pour gérér le temps et les temporisations

	aucun_peripherique = 0

	#Capture les évènements de scan
	central.scan(callback=on_scan)

	# Attente de connexion...
	while not central.is_connected():
		time.sleep_ms(100)
		if aucun_peripherique == 1:
			return
	print("Connecté")
 
	# Capture les évènements de réception. La notification provient de la caractéristique TX.
	central.on_notify(on_receipt)

	# Envoi d'un message d'accusé de réception du central au périphérique
	while central.is_connected():
		if AR_central_requis == 1:
			try: # Essaie d'envoyer un message
				v = "AR du central"
				central.write(v)
			except: # En cas d'échec...
				print("Echec d'émission de la réponse du central")

	# Pour le cas où le central se retrouverait déconnecté
	print("Déconnecté")

# Si le nom du script est "main", exécute la fonction "demo()"
if __name__ == "__main__":
	demo()

Les codes MicroPython pour le périphérique

Les scripts présentés ci-après sont disponibles dans la zone de téléchargement.

Deux fichiers scripts MicroPython seront nécessaires pour le périphérique :

  • Le script permettant de construire les trames d’advertising, intitulé ble_advertising.py. Nous ne détaillerons pas son contenu, vous pouvez le copier directement dans le répertoire PYBFLASH.
  • Le script du programme principal, main.py, qui intègre la classe BLEPeripheral simulant un UART côté serveur à l’aide du protocole GATT.

Le script du programme principal

Éditez le script main.py contenu dans le répertoire PYBFLASH du disque USB virtuel associé à la NUCLEO-WB55 qui fera office de périphérique.

# Ce script montre comment créer un periphérique UART  avec le Nordic Uart Service.
# Il va :
# 1 - Exposer deux caractéristiques TX et RX pour échanger des données.
# 2 - Se connecter à un central et lui notifier des messages dans TX.
# 3 - Lire les données écrites en retour par le central dans RX. 
# Le périphérique envoie au central une chaîne de caractères contenant la température, l'humidité et
# la pression mesurées avec une carte d'extension X-NUCLEO IKS01A3.
# Sources :
#	https://docs.micropython.org/en/latest/library/ubluetooth.html
# 	https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_uart_peripheral.py

import bluetooth # Classes "primitives du BLE"
from ble_advertising import advertising_payload # Pour construire la trame d'advertising
from binascii import hexlify # Convertit une donnée binaire en sa représentation hexadécimale

# Constantes requises pour construire le service BLE UART
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)

# Définition du service UART avec ses deux caractéristiques RX et TX

_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
	bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
	_FLAG_NOTIFY, # Cette caractéristique notifiera le central des modifications que lui apportera le périphérique
)
_UART_RX = (
	bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
	_FLAG_WRITE, # Le central pourra écrire dans cette caractéristique
)
_UART_SERVICE = (
	_UART_UUID,
	(_UART_TX, _UART_RX),
)

# Nombre maximum d'octets qui peuvent être échangés par la caractéristique RX
_MAX_NB_BYTES = const(100)

class BLEUART:

	# Initialisations
	def __init__(self, ble, name="mpy-uart", rxbuf=_MAX_NB_BYTES):
		self._ble = ble
		self._ble.active(True)
		self._ble.irq(self._irq)
		# Enregistrement du service
		((self._tx_handle, self._rx_handle),) = self._ble.gatts_register_services((_UART_SERVICE,))
		# Augmente la taille du tampon rx et active le mode "append"
		self._ble.gatts_set_buffer(self._rx_handle, rxbuf, True)
		self._connections = set()
		self._rx_buffer = bytearray()
		self._handler = None
		# Advertising du service (services=[_UART_UUID] est indispensable pour que le central identifie le service)
		self._payload = advertising_payload(name=name, services=[_UART_UUID])
		self._advertise()
		
		# Affiche l'adresse MAC de l'objet
		dummy, byte_mac = self._ble.config('mac')
		hex_mac = hexlify(byte_mac) 
		print("Adresse MAC : %s" %hex_mac.decode("ascii"))
		

	# Interruption pour gérer les réceptions
	def irq(self, handler):
		self._handler = handler

	# Surveille les connexions afin d'envoyer des notifications
	def _irq(self, event, data):

		# Si un central se connecte
		if event == _IRQ_CENTRAL_CONNECT:
			conn_handle, _, _ = data
			self._connections.add(conn_handle)
			print("Nouvelle connexion", conn_handle)

		# Si un central se déconnecte
		elif event == _IRQ_CENTRAL_DISCONNECT:
			conn_handle, _, _ = data
			print("Déconnecté", conn_handle)
			if conn_handle in self._connections:
				self._connections.remove(conn_handle)
			# Redémarre l'advertising pour permettre de nouvelles connexions
			self._advertise()

		# Lorsqu'un client écrit dans une caractéristique exposée par le serveur
		# (gestion des évènements de recéption depuis le central)
		elif event == _IRQ_GATTS_WRITE:
			conn_handle, value_handle = data
			if conn_handle in self._connections and value_handle == self._rx_handle:
				self._rx_buffer += self._ble.gatts_read(self._rx_handle)
				if self._handler:
					self._handler()
	
	# Appelée pour vérifier s'il y a des messages en attente de lecture dans RX
	def any(self):
		return len(self._rx_buffer)

	# Retourne les catactères reçus dans RX
	def read(self, sz=None):
		if not sz:
			sz = len(self._rx_buffer)
		result = self._rx_buffer[0:sz]
		self._rx_buffer = self._rx_buffer[sz:]
		return result

	# Ecrit dans TX un message à l'attention du central
	def write(self, data):
		for conn_handle in self._connections:
			self._ble.gatts_notify(conn_handle, self._tx_handle, data)

	# Mets fin à la connexion au port série simulé
	def close(self):
		for conn_handle in self._connections:
			self._ble.gap_disconnect(conn_handle)
		self._connections.clear()

	# Pour démarrer l'advertising, précise qu'un central pourra se connecter au périphérique
	def _advertise(self, interval_us=500000):
		self._ble.gap_advertise(interval_us, adv_data=self._payload, connectable = True)

	# Est-ce que le périphérique est connecté à un central ?
	def is_connected(self):
		return len(self._connections) > 0


# Valeur de l'altitude locale (en mètres)
altitude_locale = const(485)

# Calcul de la pression corrigée au niveau de la mer (nécessite la valeur de l'altitude locale)
def PressionNivMer(pression, altitude):
	return pression * pow(1.0 - (altitude * 2.255808707E-5), -5.255)

# Programme principal
def demo():

	print("Périphérique BLE")

	from machine import I2C # Pour gérer le bus I2C
	import HTS221 # Pour gérer le capteur MEMS HTS221
	import LPS22 # Pour gérer le capteur MEMS LPS22
	import time # Pour gérer les temporisations et la mesure du temps écoulé

	# On utilise l'I2C n°1 de la carte NUCLEO-WB55 pour communiquer avec le capteur
	i2c = I2C(1) 

	# Instances des capteurs
	sensor1 = HTS221.HTS221(i2c)
	sensor2 = LPS22.LPS22(i2c)

	# Pause d'une seconde pour laisser à l'I2C le temps de s'initialiser
	time.sleep_ms(1000)

	# Instanciation du BLE
	ble = bluetooth.BLE()
	uart = BLEUART(ble)

	# Gestionnaire de l'évènement de réception
	def on_rx():
		print("Données reçues du central : ", uart.read().decode().strip())

	# Réception (asynchrone) des données (ie réaction aux écritures du central dans RX).
	uart.irq(handler=on_rx)

	# Structure de gestion des erreurs pour gérer les interruptions du clavier
	try:
		while True:
		
			# Lecture des capteurs
			temp = sensor1.temperature()
			humi = sensor1.humidity()
			pres = sensor2.pressure()

			# Conversion en texte des valeurs renvoyées par les capteurs
			stemp = str(round(temp,1))
			shumi = str(int(humi))
			spres = str(int(PressionNivMer(pres, altitude_locale)))

			# Affichage sur le port série de l'USB USER
			print("Température : " + stemp + " °C, Humidité relative : " + shumi + " %, Pression : " + spres + " hPa")

			if uart.is_connected():

				# On concatène les données :
				data = stemp + "|" + shumi + "|" + spres

				# On les envoie au central (ie on les notifie dans TX):
				uart.write(data)

				print("Données envoyées au central : " + data)

			# Temporisation de 5 secondes
			time.sleep_ms(5000)

	# En cas  d'interruption clavier (l'utilisateur appuie sur CTRL+C)
	except KeyboardInterrupt:
		pass # Ne quitte pas l'application et passe à la suite

	# Ferme l'UART actif
	uart.close()

# Si le nom du script est "main", exécute la fonction "demo()"
if __name__ == "__main__":
	demo()

Mise en oeuvre

Commencez par démarrer le script du périphérique (CTRL+D dans le terminal PuTTY), et ensuite le central (idem). Si tout se déroule correctement vous devriez observer les échanges ci-dessous (aux valeurs de température, humidité et pression près) sur les deux terminaux :


Sortie UART BLE