Echanges de chaînes de caractères entre deux NUCLEO-WB55

Ce tutoriel montre comment utiliser un service BLE construit de toutes pièces permettant à un central et un périphérique d’échanger des séquences d’octets qui pourront être interprétées comme des commandes, des valeurs numériques ou encore des séquences de caractères affichables (i.e. du texte). On désigne abusivement ce service par BLE UART car il apporte une fonctionnalité similaire à l’UART.

Les scripts de ce tutoriel sont adaptés des exemples ble_simple_central.py et ble_simple_peripheral.py disponibles dans le dépôt GitHub de MicroPython, ici.

Plus précisément, nous allons mettre en œuvre le BLE UART avec :

  • Un périphérique (serveur) qui va partager des données de température, pression et humidité au sein d’une caractéristique TX qui expose une chaîne de caractères. Il publie également une autre caractéristique exposant une chaîne de caractères RX, dans laquelle le client pourra écrire sa réponse ;
  • Un central (client) qui va lire le contenu de la caractéristique TX, décoder les messages en provenance du périphérique qu’elle contient et envoyer en retour à ce dernier un message en écrivant dans la caractéristique RX.

Le schéma de principe suivant résume ce que nous allons réaliser :


BLE UART use case


Il montre le protocole GATT en action après la phase d’advertising GAP et la connexion entre le central et un périphérique. Le périphérique “partage” avec le central deux caractéristiques :

  • UART_TX dotée de l’attribut “Notify”. Le central (client) peut lire son contenu quand bon lui semble et est informé (NOTIFY) des modifications que le périphérique (serveur) réalise sur celle-ci.
  • UART_RX dotée le l’attribut “Write”. Le central (client) peut donc y écrire (WRITE) du contenu quand bon lui semble. L’usage conjoint de ces deux caractéristiques permet de simuler un port série de faible débit avec sa ligne d’émission (RX) et de transmission (TX), ce qui justifie le nom d’UART attribué à ce service.

Matériel requis

  • Central : Une carte NUCLEO-WB55
  • Périphérique : Une carte NUCLEO-WB55 avec une carte d’extension X-NUCLEO-IKS01A3

Attention, il est possible que vous deviez mettre à jour le firmware BLE HCI de vos cartes NUCLEO-WB55, la procédure est expliquée ici.

Les codes MicroPython pour le central

Vous pouvez télécharger les scripts MicroPython de ce tutoriel (entre autres) en cliquant ici.

Deux fichiers scripts MicroPython seront nécessaires pour le central :

  • Le script permettant de construire les trames d’advertising, intitulé ble_advertising.py. Nous ne détaillerons pas son contenu, vous pouvez le copier directement dans le répertoire PYBFLASH.
  • Le script du programme principal, main.py, qui intègre la classe BLECentral simulant un UART côté client à l’aide du protocole GATT.

Le script du programme principal

Éditez le script main.py contenu dans le répertoire PYBFLASH du disque USB virtuel associé à la NUCLEO-WB55 qui fera office de central et copiez-y le code suivant :

# Ce script montre comment créer un central UART c'est à dire comment :
# 1 - Détecter un périphérique exécutant le service UART et exposant deux caractéristiques : TX et RX.
# 2 - Se connecter à ce périphérique pour recevoir, sous forme de caractères encodés UTF-8, un message notifié par TX.
# 3 - Répondre au périphérique en écrivantdans RX.
# Dans cet exemple :
#   - le périphérique envoie une chaîne de caractères contenant la représentation affichable de valeurs
#     de température, pression et humidité qu'il a mesurées.
#   - le central reçoit cette chaîne, la découpe et affiche les mesures sur le terminal série de l'USB USER
#   - le central renvoie au périphérique un simple accusé de réception. 
# Source : https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_simple_central.py

# Nom du périphérique auquel on veut se connecter, doit être le même que celui déclaré 
# dans le script "main.py" du peripheral, afin que la connexion s'établisse.
# ATTENTION : NE PAS DEPASSER 8 CARACTERES !
_TARGET_PERIPHERAL_NAME = "ID000001"
_NBCHAR = const(8)

import bluetooth # Classes "primitives du BLE"
from ble_advertising import decode_services, decode_name # Pour décoder les messages reçus
from binascii import hexlify # Pour convertir une donnée binaire en sa représentation hexadécimale
import ubinascii # Pour convertir des informations binaires en texte

# Constantes requises pour construire le service GATT BLE UART
# Voir : https://docs.micropython.org/en/latest/library/ubluetooth.html
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_advertising_payload_CONNECT = const(7)
_IRQ_advertising_payload_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_MTU_EXCHANGED = const(21)

# Objet connectables avec advertising scannable
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)

# Paramètres pour fixer le rapport cyclique du scan GAP
_SCAN_DURATION_MS = const(30000)
_SCAN_INTERVAL_US = const(30000)
_SCAN_WINDOW_US = const(30000)

# Définition du service UART avec ses deux caractéristiques RX et TX
_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")

# Variables globales partagées par les fonctions asynchrones qui répondent aux évènements (callback) 
MAC_address = 0 # Adresse matérielle de la radio BLE du central
Central_ACK_required = 0 # Est-ce que le central doit envoyer un accusé de réception au périphérique ?

# Nombre maximum d'octets qui pourront être écrits dans RX et TX.
# Ceci correspond au nombre de caractère maximum des messages échangés.
_MAX_NB_BYTES = const(128)

# Classe pour gérer le Central BLE
class BLECentral:

	# Initialisation
	def __init__(self, ble):
		self._ble = ble
		self._ble.active(True)
		self._ble.irq(self._irq)
		self._ble.config(mtu=_MAX_NB_BYTES)
		self._reset()
		
		# Affiche l'adresse MAC de l'objet
		dummy, byte_mac = self._ble.config('mac')
		hex_mac = hexlify(byte_mac) 
		print("Mon adresse MAC : %s" %hex_mac.decode("ascii"))

	# Réinitialisation (appelée lors des déconnexions)
	def _reset(self):
		
		# Efface le cache des adresses et des noms des scans
		self._name = None
		self._addr_type = None
		self._addr = None
		
		# Fonctions de réponses (callback) à la complétion de différents évènements
		self._scan_callback = None
		self._conn_callback = None
		self._read_callback = None

		# Fonction de réponse du central aux notifications des périphériques 
		self._notify_callback = None

		# Adresses et caractéristiques du périphérique connecté
		self._conn_handle = None
		self._start_handle = None
		self._end_handle = None
		self._tx_handle = None
		self._rx_handle = None

	# Interruptions de gestion des évènements
	def _irq(self, event, data):

		# Evènement "Résultat de scan"
		if event == _IRQ_SCAN_RESULT:
		
			# Lecture du contenu de la trame d'advertising
			addr_type, addr, adv_type, rssi, adv_data = data

			# Si l'advertising signale un périphérique proposant un service UART
			if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _UART_SERVICE_UUID in decode_services(adv_data):
			
				# S'il s'agit du périphérique associé, référence le et arrète le scan.
				self._name = decode_name(adv_data) or "?"
				if self._name[0:_NBCHAR] == _TARGET_PERIPHERAL_NAME[0:_NBCHAR]:
					self._addr_type = addr_type
					self._addr = bytes(addr) # Note: le tampon addr a pour propriétaire l'appelant, donc il faut le copier.
					self._name = decode_name(adv_data) or "?"
					self._ble.gap_scan(None)

		# Evènement "Scan terminé"
		elif event == _IRQ_SCAN_DONE:
			if self._scan_callback:
				if self._addr:
					# Le périphérique associé a été détecté 
					# (et le scan a été explicitement interrompu en conséquence)
					self._scan_callback(self._addr_type, self._addr, self._name)
					self._scan_callback = None
					print("Scan terminé, succès : %s trouvé" %_TARGET_PERIPHERAL_NAME)
				else:
					# Le scan a dépassé son délai de "time-out" avant de
					# trouver le périphérique associé
					self._scan_callback(None, None, None)
					print("Scan terminé, échec : %s non trouvé après %s s" %(_TARGET_PERIPHERAL_NAME,_SCAN_DURATION_MS/1000))
					print("Relancez le scan avec *[CTRL]-[C]* puis *[CTRL]-[D]*.")

		# Evènement "Connexion réussie"
		elif event == _IRQ_advertising_payload_CONNECT:
			conn_handle, addr_type, addr = data
			if addr_type == self._addr_type and addr == self._addr:
				self._conn_handle = conn_handle
				self._ble.gattc_exchange_mtu(self._conn_handle)
				self._ble.gattc_discover_services(self._conn_handle)
				print("Connecté à %s" %_TARGET_PERIPHERAL_NAME)

		# Evènement "Déconnexion" (initié par le central ou par le périphérique)
		elif event == _IRQ_advertising_payload_DISCONNECT:
			print("Déconnecté de %s" %_TARGET_PERIPHERAL_NAME)
			conn_handle, _, _ = data
			if conn_handle == self._conn_handle:
				# Si déconnexion initiée par le central, le reset a déjà été fait
				self._reset()

		# Evènement "Le périphérique connecté a notifié un service au central"
		elif event == _IRQ_GATTC_SERVICE_RESULT:
			conn_handle, start_handle, end_handle, uuid = data
			if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID:
				self._start_handle, self._end_handle = start_handle, end_handle

		# Evènement "Recherche de services terminée"
		elif event == _IRQ_GATTC_SERVICE_DONE:
			if self._start_handle and self._end_handle:
				self._ble.gattc_discover_characteristics(
					self._conn_handle, self._start_handle, self._end_handle
				)
			else:
				print("Le service UART est introuvable.")

		# Evènement "Le périphérique connecté a notifié une caractéristique au central"
		elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
			conn_handle, def_handle, value_handle, properties, uuid = data
			if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID:
				self._rx_handle = value_handle
			if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID:
				self._tx_handle = value_handle

		# Evènement "Recherche de caractéristiques terminée"
		elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
			if self._tx_handle is not None and self._rx_handle is not None:
				# Nous avons terminé la connexion et la découverte de périphériques, 
				# génère le callback de connexion.
				if self._conn_callback:
					self._conn_callback()
			else:
				print("Caractéristique UART RX introuvable.")

		# Evènement "Accusé de réception du périphérique", 
		# qui survient lorsque le central envoie un message, si on a explicitement demandé un AR
		elif event == _IRQ_GATTC_WRITE_DONE:
			conn_handle, value_handle, status = data
			print("Ecriture dans RX réalisée")

		# Evènement "Réponse aux notifications du périphérique" sur la caractéristique TX
		elif event == _IRQ_GATTC_NOTIFY:
			conn_handle, value_handle, notify_data = data
			if conn_handle == self._conn_handle and value_handle == self._tx_handle:
				if self._notify_callback:
					self._notify_callback(notify_data)

		# Evènement de modification de la taille de la payload
		elif event == _IRQ_MTU_EXCHANGED:
			print("La taille maximum des messages est désormais " + str(_MAX_NB_BYTES) + " octets")

	# Revoie True si nous sommes connectés au service UART.
	def is_connected(self):
		return (
			self._conn_handle is not None
			and self._tx_handle is not None
			and self._rx_handle is not None
		)

	# Recherche un périphérique qui propose le service UART
	def scan(self, callback=None):
		print("Scan démarré, recherche %s" %_TARGET_PERIPHERAL_NAME)
		self._addr_type = None
		self._addr = None
		self._scan_callback = callback
		# Scanne pendant _SCAN_DURATION_MS, pendant des durées de _SCAN_WINDOWS_US espacées de _SCAN_INTERVAL_US
		self._ble.gap_scan(_SCAN_DURATION_MS, _SCAN_INTERVAL_US, _SCAN_WINDOW_US)

	# Arrête le scan
	# Se connecte au périphérique spécifié
	# Si aucun périphérique spécifié, utilise les adresses mises en cache après un scan
	def connect(self, addr_type=None, addr=None, callback=None):

		self._addr_type = addr_type or self._addr_type
		self._addr = addr or self._addr
		self._conn_callback = callback
		if self._addr_type is None or self._addr is None:
			return False
		self._ble.gap_connect(self._addr_type, self._addr)
		return True
		

	# Se déconnecte du périphérique
	def disconnect(self):
		if not self._conn_handle:
			return
		self._ble.gap_disconnect(self._conn_handle)
		self._reset()

	# Envoie des données sur l'UART (écriture dans la caractéristique RX)
	# Cette méthode permet au central d'envoyer un message au périphérique connecté.
	def write(self, v, response = False):
		
		if not self.is_connected():
			return

		self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0)
		
		# Confirme que l'accusé de réception a bien été envoyé
		global Central_ACK_required
		Central_ACK_required = 0

	# Active le gestionnaire des évènements de réception sur l'UART
	def on_notify(self, callback):
		self._notify_callback = callback

# Gestionnaire de l'évènement de réception qui répond à une notification lorsque la caractéristique TX
# est modifiée.
def on_receipt(v):
	
	global MAC_address
	
	# Conversion en octets de la charge utile la caractéristique TX
	b = bytes(v)

	# On convertit les octets reçus en caractères codés au format UTF-8
	payload = b.decode('utf-8')

	print("Message recu de " + str(MAC_address) + " : ", payload)
	
	# On sépare les mesures grâce à l'instruction split
	temp, humi, press = payload.split("|")

	# On affiche les valeurs de température, de pression et d'humidité relative
	print("Température = " + temp + " °C")
	print("Humidité relative = " + humi + " %")
	print("Pression = " + press + " hPa")

	# Le central a bien reçu un message du périphérique, donc il doit lui envoyer un accusé de réception
	global Central_ACK_required
	Central_ACK_required = 1

# Création d'une instance de la classe central
ble = bluetooth.BLE()
central = BLECentral(ble)

# Gestionnaire des évènements de scan
def on_scan(addr_type, addr, name):
	
	# Si le périphérique associé est identifié
	if addr_type is not None:

		# Extrait son adresse MAC
		global MAC_address
		b = bytes(addr)
		MAC_address = hexlify(b).decode("ascii")
		
		# Connexion avec le central
		central.connect()

# Programme principal
def demo():

	print("Central BLE associé au périphérique %s" %_TARGET_PERIPHERAL_NAME)

	import time # Pour gérér le temps et les temporisations
		
	# Capture les évènements de scan
	central.scan(callback = on_scan)
	
	# Scanne jusqu'à trouver le périphérique associé
	while not central.is_connected():
		time.sleep_ms(100)
	
	# Capture les évènements de réception. La notification provient de la caractéristique TX.
	central.on_notify(on_receipt)

	# Envoi d'un message d'accusé de réception du central au périphérique
	while central.is_connected():
	
		global Central_ACK_required
	
		if Central_ACK_required == 1:
			try:  # Essaie d'envoyer un message
				v = "AR du central " + MAC_address 
				central.write(v)
			except:  # En cas d'échec...
				print("Echec d'émission de la réponse du central")

# Si le nom du script est "main", exécute la fonction "demo()"
if __name__ == "__main__":
	demo()

Les codes MicroPython pour le périphérique

Vous pouvez télécharger les scripts MicroPython de ce tutoriel (entre autres) en cliquant ici.

Deux fichiers scripts MicroPython seront nécessaires pour le périphérique :

  • Le script permettant de construire les trames d’advertising, intitulé ble_advertising.py. Nous ne détaillerons pas son contenu, vous pouvez le copier directement dans le répertoire PYBFLASH.
  • Le script du programme principal, main.py, qui intègre la classe BLEPeripheral simulant un UART côté serveur à l’aide du protocole GATT.

Le script du programme principal

Éditez le script main.py contenu dans le répertoire PYBFLASH du disque USB virtuel associé à la NUCLEO-WB55 qui fera office de périphérique et enregistrez-y ce code :

# Ce script montre comment créer un periphérique UART avec le Nordic Uart Service.
# Il va :
# 1 - Exposer deux caractéristiques TX et RX pour échanger des données.
# 2 - Se connecter à un central et lui notifier des messages dans TX.
# 3 - Lire les données écrites en retour par le central dans RX. 
# Le périphérique envoie au central une chaîne de caractères contenant la température, l'humidité et
# la pression mesurées avec une carte d'extension X-NUCLEO-IKS01A3.
# Sources :
#	https://docs.micropython.org/en/latest/library/ubluetooth.html
# 	https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_uart_peripheral.py

# Nom du périphérique, doit être le même que celui déclaré dans le script "main.py" du central 
# afin que la connexion s'établisse.
# ATTENTION : NE PAS DEPASSER 8 CARACTERES !
_MY_NAME = "ID000001"

import bluetooth # Classes "primitives du BLE"
from ble_advertising import advertising_payload # Pour construire la trame d'advertising
from binascii import hexlify # Convertit une donnée binaire en sa représentation hexadécimale

# Constantes requises pour construire le service BLE UART
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_MTU_EXCHANGED = const(21)

_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)

# Définition du service UART avec ses deux caractéristiques RX et TX

_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
	bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
	_FLAG_NOTIFY, # Cette caractéristique notifiera le central des modifications que lui apportera le périphérique
)
_UART_RX = (
	bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
	_FLAG_WRITE,  # Le central pourra écrire dans cette caractéristique
)
_UART_SERVICE = (
	_UART_UUID,
	(_UART_TX, _UART_RX),
)

# Nombre maximum d'octets (de caractères) qui peuvent être échangés par les caractéristiques TX & RX
_MAX_NB_BYTES = const(128)

class BLEperipheral:

	# Initialisations
	def __init__(self, ble, name=_MY_NAME, charbuf=_MAX_NB_BYTES):
		self._ble = ble
		self._ble.active(True)
		self._ble.irq(self._irq)
		self._ble.config(mtu=_MAX_NB_BYTES)

		# Enregistrement du service
		((self._tx_handle, self._rx_handle),) = self._ble.gatts_register_services((_UART_SERVICE,))
		
		# Augmente la taille des tampons rx et tx, active le mode "append"
		self._ble.gatts_set_buffer(self._tx_handle, charbuf, True)
		self._ble.gatts_set_buffer(self._rx_handle, charbuf, True)
		
		self._ble.gatts_write(self._tx_handle, bytes(charbuf))
		self._ble.gatts_write(self._rx_handle, bytes(charbuf))
		
		self._connections = set()
		self._rx_buffer = bytearray()
		self._handler = None

		# Advertising du service (services=[_UART_UUID] est indispensable pour que le central identifie le service)
		self._payload = advertising_payload(name=name, services=[_UART_UUID])
		self._advertise()
		
		# Affiche l'adresse MAC de l'objet
		dummy, byte_mac = self._ble.config('mac')
		hex_mac = hexlify(byte_mac) 
		print("Mon adresse MAC : %s" %hex_mac.decode("ascii"))

	# Interruption pour gérer les réceptions
	def irq(self, handler):
		self._handler = handler

	# Surveille les connexions afin d'envoyer des notifications
	def _irq(self, event, data):

		# Si un central se connecte
		if event == _IRQ_CENTRAL_CONNECT:
			conn_handle, _, _ = data
			self._connections.add(conn_handle)
			print("New connection", conn_handle)

		# Si un central se déconnecte
		elif event == _IRQ_CENTRAL_DISCONNECT:
			conn_handle, _, _ = data
			print("Disconnected", conn_handle)
			if conn_handle in self._connections:
				self._connections.remove(conn_handle)
			# Redémarre l'advertising pour permettre de nouvelles connexions
			self._advertise()

		# Lorsqu'un client écrit dans une caractéristique exposée par le serveur
		# (gestion des évènements de recéption depuis le central)
		elif event == _IRQ_GATTS_WRITE:
			conn_handle, value_handle = data
			if conn_handle in self._connections and value_handle == self._rx_handle:
				self._rx_buffer += self._ble.gatts_read(self._rx_handle)
				if self._handler:
					self._handler()

		# Evènement de modification de la taille de la payload
		elif event == _IRQ_MTU_EXCHANGED:
			print("La taille maximum des messages est désormais " + str(_MAX_NB_BYTES) + " octets")

	# Appelée pour vérifier s'il y a des messages en attente de lecture dans RX
	def any(self):
		return len(self._rx_buffer)

	# Retourne les catactères reçus dans RX
	def read(self, sz=None):
		if not sz:
			sz = len(self._rx_buffer)
		result = self._rx_buffer[0:sz]
		self._rx_buffer = self._rx_buffer[sz:]
		return result

	# Ecrit dans TX un message à l'attention du central
	def write(self, data):
		for conn_handle in self._connections:
			self._ble.gatts_notify(conn_handle, self._tx_handle, data)

	# Mets fin à la connexion au port série simulé
	def close(self):
		for conn_handle in self._connections:
			self._ble.gap_disconnect(conn_handle)
		self._connections.clear()

	# Pour démarrer l'advertising, précise qu'un central pourra se connecter au périphérique
	def _advertise(self, interval_us=500000):
		self._ble.gap_advertise(interval_us, adv_data=self._payload, connectable = True)

	# Est-ce que le périphérique est connecté à un central ?
	def is_connected(self):
		return len(self._connections) > 0


# Valeur de l'altitude locale (en mètres)
LOCAL_ALTITUDE = const(470)

# Approximation de la pression corrigée au niveau de la mer (nécessite la valeur de l'altitude locale)
def SeeLevelPressure(pressure, altitude):
	return pressure * pow(1.0 - (altitude * 2.255808707E-5), -5.255)

# Est-ce qu'on un shield IKS01A3 / des capteurs HTS221 et LPS22 sur notre carte ?
# (1 si oui, 0 si non, dans ce deuxième cas on les simulera).
REAL_SENSORS = const(1)

# Programme principal
def demo():
	
	print("Périphérique BLE : %s" %_MY_NAME) 

	import time # Pour gérer les temporisations et la mesure du temps écoulé

	# Si on ne simule pas les capteurs
	if REAL_SENSORS:
		print("Capteurs HTS221 et LPS22 présents")
		from machine import I2C # Pour gérer le bus I2C
		import HTS221 # Pour gérer le capteur MEMS HTS221
		import LPS22 # Pour gérer le capteur MEMS LPS22

		# On utilise l'I2C n°1 de la carte NUCLEO-WB55 pour communiquer avec le capteur
		i2c = I2C(1) 

		# Instances des capteursIs
		sensor1 = HTS221.HTS221(i2c)
		sensor2 = LPS22.LPS22(i2c)

		# Pause d'une seconde pour laisser à l'I2C le temps de s'initialiser
		time.sleep_ms(1000)
		
	# Si on simule les capteurs
	else:
	
		print("Capteurs HTS221 et LPS22 absents, mesures simulées")
		import random # Pour la génération de valeurs aléatoires

	# Instanciation du BLE
	ble = bluetooth.BLE()
	uart = BLEperipheral(ble)

	# Gestionnaire de l'évènement de réception
	def on_rx():
		print("Données reçues du central : ", uart.read().decode().strip())

	# Réception (asynchrone) des données (i.e. réaction aux écritures du central dans RX).
	uart.irq(handler=on_rx)

	# Structure de gestion des erreurs pour les interruptions du clavier
	try:
		while True:

			#Si on ne simuple pas les capteurs
			if REAL_SENSORS:
				# Lecture des capteurs
				temp = sensor1.temperature()
				humi = sensor1.humidity()
				pres = sensor2.pressure()
			else:
				temp = random.randint(-1, 50)  # Valeur aléatoire entre -1 et 50 °C
				humi = random.randint(0, 100) # Valeur aléatoire entre 0 et 100 %
				pres = random.randint(900, 1013) # Valeur aléatoire entre 900 et 1013 hPa

			# Conversion en texte des valeurs renvoyées par les capteurs
			stemp = str(round(temp,1))
			shumi = str(int(humi))
			spres = str(int(SeeLevelPressure(pres, LOCAL_ALTITUDE)))

			# Affichage sur le port série de l'USB USER
			print("Température : " + stemp + " °C, Humidité relative : " + shumi + " %, Pression : " + spres + " hPa")

			if uart.is_connected():

				# On concatène les données :
				data = stemp + "|" + shumi + "|" + spres

				# On les envoie au central (i.e. on les notifie dans TX):
				uart.write(data)

				print("Données envoyées au central : " + data)

			# Temporisation de 5 secondes
			time.sleep_ms(5000)

	# En cas  d'interruption clavier (l'utilisateur appuie sur *[CTRL]-[C]*)
	except KeyboardInterrupt:
		pass # Ne quitte pas l'application et passe à la suite

	# Ferme l'UART actif
	uart.close()

# Si le nom du script est "main", exécute la fonction "demo()"
if __name__ == "__main__":
	demo()

Mise en œuvre

Assurez vous que l’identifiant _MY_NAME dans le script du périphérique et l’identifiant _TARGET_PERIPHERAL_NAME dans le script du central ont bien la même valeur (ici ID000001). Ceci permet de s’assurer que votre central va bien se connecter au périphérique que vous avez sélectionné dans le cas où d’autres périphériques BLE UART seraient présents à proximité. NB ; on aurait également pu choisir l’adresse MC du périphérique plutôt que le code arbitraire ID000001).

Commencez par démarrer le script du périphérique ([CTRL]-[D] dans le terminal PuTTY), et ensuite le script du central (idem). Si tout se déroule correctement vous devriez observer les échanges ci-dessous (aux valeurs de température, humidité et pression près) sur les deux terminaux :

Console PuTTY du périphérique :

MPY: sync filesystems
MPY: soft reboot
Périphérique BLE : ID000001
Capteurs HTS221 et LPS22 présents
Mon adresse MAC : 02020722281e
Température : 22.6 °C, Humidité relative : 55 %, Pression : 1016 hPa
New connection 2049
La taille maximum des messages est désormais 128 octets
Température : 22.6 °C, Humidité relative : 55 %, Pression : 1016 hPa
Données envoyées au central : 22.6|55|1016
Données reçues du central :
Température : 22.6 °C, Humidité relative : 55 %, Pression : 1016 hPa
Données envoyées au central : 22.6|55|1016
Données reçues du central :  AR du central 02020722281e
Température : 22.6 °C, Humidité relative : 55 %, Pression : 1016 hPa
Données envoyées au central : 22.6|55|1016
Données reçues du central :  AR du central 02020722281e
Température : 22.6 °C, Humidité relative : 55 %, Pression : 1016 hPa
Données envoyées au central : 22.6|55|1016
Données reçues du central :  AR du central 02020722281e
Disconnected 2049

Console PuTTY du central :

MPY: sync filesystems
MPY: soft reboot
Mon adresse MAC : 0202072226be
Central BLE associé au périphérique ID000001
Scan démarré, recherche ID000001
Scan terminé, succès : ID000001 trouvé
Connecté à ID000001
La taille maximum des messages est désormais 128 octets
Message recu de 02020722281e :  22.6|55|1016
Température = 22.6 °C
Humidité relative = 55 %
Pression = 1016 hPa
Message recu de 02020722281e :  22.6|55|1016
Température = 22.6 °C
Humidité relative = 55 %
Pression = 1016 hPa
Message recu de 02020722281e :  22.6|55|1016
Température = 22.6 °C
Humidité relative = 55 %
Pression = 1016 hPa
Message recu de 02020722281e :  22.6|55|1016
Température = 22.6 °C
Humidité relative = 55 %
Pression = 1016 hPa
Déconnecté de ID000001