Echanges de chaînes de caractères entre deux NUCLEO-WB55
Ce tutoriel montre comment utiliser un service BLE construit de toutes pièces permettant à un central et un périphérique d’échanger des séquences d’octets qui pourront être interprétées comme des commandes, des valeurs numériques ou encore des séquences de caractères affichables (i.e. du texte). On désigne abusivement ce service par BLE UART car il apporte une fonctionnalité similaire à l’UART.
Les scripts de ce tutoriel sont adaptés des exemples ble_simple_central.py et ble_simple_peripheral.py disponibles dans le dépôt GitHub de MicroPython, ici.
Plus précisément, nous allons mettre en œuvre le BLE UART avec :
- Un périphérique (serveur) qui va partager des données de température, pression et humidité au sein d’une caractéristique TX qui expose une chaîne de caractères. Il publie également une autre caractéristique exposant une chaîne de caractères RX, dans laquelle le client pourra écrire sa réponse ;
- Un central (client) qui va lire le contenu de la caractéristique TX, décoder les messages en provenance du périphérique qu’elle contient et envoyer en retour à ce dernier un message en écrivant dans la caractéristique RX.
Le schéma de principe suivant résume ce que nous allons réaliser :
Il montre le protocole GATT en action après la phase d’advertising GAP et la connexion entre le central et un périphérique. Le périphérique “partage” avec le central deux caractéristiques :
- UART_TX dotée de l’attribut “Notify”. Le central (client) peut lire son contenu quand bon lui semble et est informé (NOTIFY) des modifications que le périphérique (serveur) réalise sur celle-ci.
- UART_RX dotée le l’attribut “Write”. Le central (client) peut donc y écrire (WRITE) du contenu quand bon lui semble. L’usage conjoint de ces deux caractéristiques permet de simuler un port série de faible débit avec sa ligne d’émission (RX) et de transmission (TX), ce qui justifie le nom d’UART attribué à ce service.
Matériel requis
- Central : Une carte NUCLEO-WB55
- Périphérique : Une carte NUCLEO-WB55 avec une carte d’extension X-NUCLEO-IKS01A3
Attention, il est possible que vous deviez mettre à jour le firmware BLE HCI de vos cartes NUCLEO-WB55, la procédure est expliquée ici.
Les codes MicroPython pour le central
Vous pouvez télécharger les scripts MicroPython de ce tutoriel (entre autres) en cliquant ici.
Deux fichiers scripts MicroPython seront nécessaires pour le central :
- Le script permettant de construire les trames d’advertising, intitulé ble_advertising.py. Nous ne détaillerons pas son contenu, vous pouvez le copier directement dans le répertoire PYBFLASH.
- Le script du programme principal, main.py, qui intègre la classe BLECentral simulant un UART côté client à l’aide du protocole GATT.
Le script du programme principal
Éditez le script main.py contenu dans le répertoire PYBFLASH du disque USB virtuel associé à la NUCLEO-WB55 qui fera office de central et copiez-y le code suivant :
# Ce script montre comment créer un central UART c'est à dire comment :
# 1 - Détecter un périphérique exécutant le service UART et exposant deux caractéristiques : TX et RX.
# 2 - Se connecter à ce périphérique pour recevoir, sous forme de caractères encodés UTF-8, un message notifié par TX.
# 3 - Répondre au périphérique en écrivantdans RX.
# Dans cet exemple :
# - le périphérique envoie une chaîne de caractères contenant la représentation affichable de valeurs
# de température, pression et humidité qu'il a mesurées.
# - le central reçoit cette chaîne, la découpe et affiche les mesures sur le terminal série de l'USB USER
# - le central renvoie au périphérique un simple accusé de réception.
# Source : https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_simple_central.py
# Nom du périphérique auquel on veut se connecter, doit être le même que celui déclaré
# dans le script "main.py" du peripheral, afin que la connexion s'établisse.
# ATTENTION : NE PAS DEPASSER 8 CARACTERES !
_TARGET_PERIPHERAL_NAME = "ID000001"
_NBCHAR = const(8)
import bluetooth # Classes "primitives du BLE"
from ble_advertising import decode_services, decode_name # Pour décoder les messages reçus
from binascii import hexlify # Pour convertir une donnée binaire en sa représentation hexadécimale
import ubinascii # Pour convertir des informations binaires en texte
# Constantes requises pour construire le service GATT BLE UART
# Voir : https://docs.micropython.org/en/latest/library/ubluetooth.html
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_advertising_payload_CONNECT = const(7)
_IRQ_advertising_payload_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_MTU_EXCHANGED = const(21)
# Objet connectables avec advertising scannable
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
# Paramètres pour fixer le rapport cyclique du scan GAP
_SCAN_DURATION_MS = const(30000)
_SCAN_INTERVAL_US = const(30000)
_SCAN_WINDOW_US = const(30000)
# Définition du service UART avec ses deux caractéristiques RX et TX
_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")
# Variables globales partagées par les fonctions asynchrones qui répondent aux évènements (callback)
MAC_address = 0 # Adresse matérielle de la radio BLE du central
Central_ACK_required = 0 # Est-ce que le central doit envoyer un accusé de réception au périphérique ?
# Nombre maximum d'octets qui pourront être écrits dans RX et TX.
# Ceci correspond au nombre de caractère maximum des messages échangés.
_MAX_NB_BYTES = const(128)
# Classe pour gérer le Central BLE
class BLECentral:
# Initialisation
def __init__(self, ble):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._ble.config(mtu=_MAX_NB_BYTES)
self._reset()
# Affiche l'adresse MAC de l'objet
dummy, byte_mac = self._ble.config('mac')
hex_mac = hexlify(byte_mac)
print("Mon adresse MAC : %s" %hex_mac.decode("ascii"))
# Réinitialisation (appelée lors des déconnexions)
def _reset(self):
# Efface le cache des adresses et des noms des scans
self._name = None
self._addr_type = None
self._addr = None
# Fonctions de réponses (callback) à la complétion de différents évènements
self._scan_callback = None
self._conn_callback = None
self._read_callback = None
# Fonction de réponse du central aux notifications des périphériques
self._notify_callback = None
# Adresses et caractéristiques du périphérique connecté
self._conn_handle = None
self._start_handle = None
self._end_handle = None
self._tx_handle = None
self._rx_handle = None
# Interruptions de gestion des évènements
def _irq(self, event, data):
# Evènement "Résultat de scan"
if event == _IRQ_SCAN_RESULT:
# Lecture du contenu de la trame d'advertising
addr_type, addr, adv_type, rssi, adv_data = data
# Si l'advertising signale un périphérique proposant un service UART
if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _UART_SERVICE_UUID in decode_services(adv_data):
# S'il s'agit du périphérique associé, référence le et arrète le scan.
self._name = decode_name(adv_data) or "?"
if self._name[0:_NBCHAR] == _TARGET_PERIPHERAL_NAME[0:_NBCHAR]:
self._addr_type = addr_type
self._addr = bytes(addr) # Note: le tampon addr a pour propriétaire l'appelant, donc il faut le copier.
self._name = decode_name(adv_data) or "?"
self._ble.gap_scan(None)
# Evènement "Scan terminé"
elif event == _IRQ_SCAN_DONE:
if self._scan_callback:
if self._addr:
# Le périphérique associé a été détecté
# (et le scan a été explicitement interrompu en conséquence)
self._scan_callback(self._addr_type, self._addr, self._name)
self._scan_callback = None
print("Scan terminé, succès : %s trouvé" %_TARGET_PERIPHERAL_NAME)
else:
# Le scan a dépassé son délai de "time-out" avant de
# trouver le périphérique associé
self._scan_callback(None, None, None)
print("Scan terminé, échec : %s non trouvé après %s s" %(_TARGET_PERIPHERAL_NAME,_SCAN_DURATION_MS/1000))
print("Relancez le scan avec *[CTRL]-[C]* puis *[CTRL]-[D]*.")
# Evènement "Connexion réussie"
elif event == _IRQ_advertising_payload_CONNECT:
conn_handle, addr_type, addr = data
if addr_type == self._addr_type and addr == self._addr:
self._conn_handle = conn_handle
self._ble.gattc_exchange_mtu(self._conn_handle)
self._ble.gattc_discover_services(self._conn_handle)
print("Connecté à %s" %_TARGET_PERIPHERAL_NAME)
# Evènement "Déconnexion" (initié par le central ou par le périphérique)
elif event == _IRQ_advertising_payload_DISCONNECT:
print("Déconnecté de %s" %_TARGET_PERIPHERAL_NAME)
conn_handle, _, _ = data
if conn_handle == self._conn_handle:
# Si déconnexion initiée par le central, le reset a déjà été fait
self._reset()
# Evènement "Le périphérique connecté a notifié un service au central"
elif event == _IRQ_GATTC_SERVICE_RESULT:
conn_handle, start_handle, end_handle, uuid = data
if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID:
self._start_handle, self._end_handle = start_handle, end_handle
# Evènement "Recherche de services terminée"
elif event == _IRQ_GATTC_SERVICE_DONE:
if self._start_handle and self._end_handle:
self._ble.gattc_discover_characteristics(
self._conn_handle, self._start_handle, self._end_handle
)
else:
print("Le service UART est introuvable.")
# Evènement "Le périphérique connecté a notifié une caractéristique au central"
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
conn_handle, def_handle, value_handle, properties, uuid = data
if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID:
self._rx_handle = value_handle
if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID:
self._tx_handle = value_handle
# Evènement "Recherche de caractéristiques terminée"
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
if self._tx_handle is not None and self._rx_handle is not None:
# Nous avons terminé la connexion et la découverte de périphériques,
# génère le callback de connexion.
if self._conn_callback:
self._conn_callback()
else:
print("Caractéristique UART RX introuvable.")
# Evènement "Accusé de réception du périphérique",
# qui survient lorsque le central envoie un message, si on a explicitement demandé un AR
elif event == _IRQ_GATTC_WRITE_DONE:
conn_handle, value_handle, status = data
print("Ecriture dans RX réalisée")
# Evènement "Réponse aux notifications du périphérique" sur la caractéristique TX
elif event == _IRQ_GATTC_NOTIFY:
conn_handle, value_handle, notify_data = data
if conn_handle == self._conn_handle and value_handle == self._tx_handle:
if self._notify_callback:
self._notify_callback(notify_data)
# Evènement de modification de la taille de la payload
elif event == _IRQ_MTU_EXCHANGED:
print("La taille maximum des messages est désormais " + str(_MAX_NB_BYTES) + " octets")
# Revoie True si nous sommes connectés au service UART.
def is_connected(self):
return (
self._conn_handle is not None
and self._tx_handle is not None
and self._rx_handle is not None
)
# Recherche un périphérique qui propose le service UART
def scan(self, callback=None):
print("Scan démarré, recherche %s" %_TARGET_PERIPHERAL_NAME)
self._addr_type = None
self._addr = None
self._scan_callback = callback
# Scanne pendant _SCAN_DURATION_MS, pendant des durées de _SCAN_WINDOWS_US espacées de _SCAN_INTERVAL_US
self._ble.gap_scan(_SCAN_DURATION_MS, _SCAN_INTERVAL_US, _SCAN_WINDOW_US)
# Arrête le scan
# Se connecte au périphérique spécifié
# Si aucun périphérique spécifié, utilise les adresses mises en cache après un scan
def connect(self, addr_type=None, addr=None, callback=None):
self._addr_type = addr_type or self._addr_type
self._addr = addr or self._addr
self._conn_callback = callback
if self._addr_type is None or self._addr is None:
return False
self._ble.gap_connect(self._addr_type, self._addr)
return True
# Se déconnecte du périphérique
def disconnect(self):
if not self._conn_handle:
return
self._ble.gap_disconnect(self._conn_handle)
self._reset()
# Envoie des données sur l'UART (écriture dans la caractéristique RX)
# Cette méthode permet au central d'envoyer un message au périphérique connecté.
def write(self, v, response = False):
if not self.is_connected():
return
self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0)
# Confirme que l'accusé de réception a bien été envoyé
global Central_ACK_required
Central_ACK_required = 0
# Active le gestionnaire des évènements de réception sur l'UART
def on_notify(self, callback):
self._notify_callback = callback
# Gestionnaire de l'évènement de réception qui répond à une notification lorsque la caractéristique TX
# est modifiée.
def on_receipt(v):
global MAC_address
# Conversion en octets de la charge utile la caractéristique TX
b = bytes(v)
# On convertit les octets reçus en caractères codés au format UTF-8
payload = b.decode('utf-8')
print("Message recu de " + str(MAC_address) + " : ", payload)
# On sépare les mesures grâce à l'instruction split
temp, humi, press = payload.split("|")
# On affiche les valeurs de température, de pression et d'humidité relative
print("Température = " + temp + " °C")
print("Humidité relative = " + humi + " %")
print("Pression = " + press + " hPa")
# Le central a bien reçu un message du périphérique, donc il doit lui envoyer un accusé de réception
global Central_ACK_required
Central_ACK_required = 1
# Création d'une instance de la classe central
ble = bluetooth.BLE()
central = BLECentral(ble)
# Gestionnaire des évènements de scan
def on_scan(addr_type, addr, name):
# Si le périphérique associé est identifié
if addr_type is not None:
# Extrait son adresse MAC
global MAC_address
b = bytes(addr)
MAC_address = hexlify(b).decode("ascii")
# Connexion avec le central
central.connect()
# Programme principal
def demo():
print("Central BLE associé au périphérique %s" %_TARGET_PERIPHERAL_NAME)
import time # Pour gérér le temps et les temporisations
# Capture les évènements de scan
central.scan(callback = on_scan)
# Scanne jusqu'à trouver le périphérique associé
while not central.is_connected():
time.sleep_ms(100)
# Capture les évènements de réception. La notification provient de la caractéristique TX.
central.on_notify(on_receipt)
# Envoi d'un message d'accusé de réception du central au périphérique
while central.is_connected():
global Central_ACK_required
if Central_ACK_required == 1:
try: # Essaie d'envoyer un message
v = "AR du central " + MAC_address
central.write(v)
except: # En cas d'échec...
print("Echec d'émission de la réponse du central")
# Si le nom du script est "main", exécute la fonction "demo()"
if __name__ == "__main__":
demo()
Les codes MicroPython pour le périphérique
Vous pouvez télécharger les scripts MicroPython de ce tutoriel (entre autres) en cliquant ici.
Deux fichiers scripts MicroPython seront nécessaires pour le périphérique :
- Le script permettant de construire les trames d’advertising, intitulé ble_advertising.py. Nous ne détaillerons pas son contenu, vous pouvez le copier directement dans le répertoire PYBFLASH.
- Le script du programme principal, main.py, qui intègre la classe BLEPeripheral simulant un UART côté serveur à l’aide du protocole GATT.
Le script du programme principal
Éditez le script main.py contenu dans le répertoire PYBFLASH du disque USB virtuel associé à la NUCLEO-WB55 qui fera office de périphérique et enregistrez-y ce code :
# Ce script montre comment créer un periphérique UART avec le Nordic Uart Service.
# Il va :
# 1 - Exposer deux caractéristiques TX et RX pour échanger des données.
# 2 - Se connecter à un central et lui notifier des messages dans TX.
# 3 - Lire les données écrites en retour par le central dans RX.
# Le périphérique envoie au central une chaîne de caractères contenant la température, l'humidité et
# la pression mesurées avec une carte d'extension X-NUCLEO-IKS01A3.
# Sources :
# https://docs.micropython.org/en/latest/library/ubluetooth.html
# https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_uart_peripheral.py
# Nom du périphérique, doit être le même que celui déclaré dans le script "main.py" du central
# afin que la connexion s'établisse.
# ATTENTION : NE PAS DEPASSER 8 CARACTERES !
_MY_NAME = "ID000001"
import bluetooth # Classes "primitives du BLE"
from ble_advertising import advertising_payload # Pour construire la trame d'advertising
from binascii import hexlify # Convertit une donnée binaire en sa représentation hexadécimale
# Constantes requises pour construire le service BLE UART
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_MTU_EXCHANGED = const(21)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)
# Définition du service UART avec ses deux caractéristiques RX et TX
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
_FLAG_NOTIFY, # Cette caractéristique notifiera le central des modifications que lui apportera le périphérique
)
_UART_RX = (
bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
_FLAG_WRITE, # Le central pourra écrire dans cette caractéristique
)
_UART_SERVICE = (
_UART_UUID,
(_UART_TX, _UART_RX),
)
# Nombre maximum d'octets (de caractères) qui peuvent être échangés par les caractéristiques TX & RX
_MAX_NB_BYTES = const(128)
class BLEperipheral:
# Initialisations
def __init__(self, ble, name=_MY_NAME, charbuf=_MAX_NB_BYTES):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._ble.config(mtu=_MAX_NB_BYTES)
# Enregistrement du service
((self._tx_handle, self._rx_handle),) = self._ble.gatts_register_services((_UART_SERVICE,))
# Augmente la taille des tampons rx et tx, active le mode "append"
self._ble.gatts_set_buffer(self._tx_handle, charbuf, True)
self._ble.gatts_set_buffer(self._rx_handle, charbuf, True)
self._ble.gatts_write(self._tx_handle, bytes(charbuf))
self._ble.gatts_write(self._rx_handle, bytes(charbuf))
self._connections = set()
self._rx_buffer = bytearray()
self._handler = None
# Advertising du service (services=[_UART_UUID] est indispensable pour que le central identifie le service)
self._payload = advertising_payload(name=name, services=[_UART_UUID])
self._advertise()
# Affiche l'adresse MAC de l'objet
dummy, byte_mac = self._ble.config('mac')
hex_mac = hexlify(byte_mac)
print("Mon adresse MAC : %s" %hex_mac.decode("ascii"))
# Interruption pour gérer les réceptions
def irq(self, handler):
self._handler = handler
# Surveille les connexions afin d'envoyer des notifications
def _irq(self, event, data):
# Si un central se connecte
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _ = data
self._connections.add(conn_handle)
print("New connection", conn_handle)
# Si un central se déconnecte
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
print("Disconnected", conn_handle)
if conn_handle in self._connections:
self._connections.remove(conn_handle)
# Redémarre l'advertising pour permettre de nouvelles connexions
self._advertise()
# Lorsqu'un client écrit dans une caractéristique exposée par le serveur
# (gestion des évènements de recéption depuis le central)
elif event == _IRQ_GATTS_WRITE:
conn_handle, value_handle = data
if conn_handle in self._connections and value_handle == self._rx_handle:
self._rx_buffer += self._ble.gatts_read(self._rx_handle)
if self._handler:
self._handler()
# Evènement de modification de la taille de la payload
elif event == _IRQ_MTU_EXCHANGED:
print("La taille maximum des messages est désormais " + str(_MAX_NB_BYTES) + " octets")
# Appelée pour vérifier s'il y a des messages en attente de lecture dans RX
def any(self):
return len(self._rx_buffer)
# Retourne les catactères reçus dans RX
def read(self, sz=None):
if not sz:
sz = len(self._rx_buffer)
result = self._rx_buffer[0:sz]
self._rx_buffer = self._rx_buffer[sz:]
return result
# Ecrit dans TX un message à l'attention du central
def write(self, data):
for conn_handle in self._connections:
self._ble.gatts_notify(conn_handle, self._tx_handle, data)
# Mets fin à la connexion au port série simulé
def close(self):
for conn_handle in self._connections:
self._ble.gap_disconnect(conn_handle)
self._connections.clear()
# Pour démarrer l'advertising, précise qu'un central pourra se connecter au périphérique
def _advertise(self, interval_us=500000):
self._ble.gap_advertise(interval_us, adv_data=self._payload, connectable = True)
# Est-ce que le périphérique est connecté à un central ?
def is_connected(self):
return len(self._connections) > 0
# Valeur de l'altitude locale (en mètres)
LOCAL_ALTITUDE = const(470)
# Approximation de la pression corrigée au niveau de la mer (nécessite la valeur de l'altitude locale)
def SeeLevelPressure(pressure, altitude):
return pressure * pow(1.0 - (altitude * 2.255808707E-5), -5.255)
# Est-ce qu'on un shield IKS01A3 / des capteurs HTS221 et LPS22 sur notre carte ?
# (1 si oui, 0 si non, dans ce deuxième cas on les simulera).
REAL_SENSORS = const(1)
# Programme principal
def demo():
print("Périphérique BLE : %s" %_MY_NAME)
import time # Pour gérer les temporisations et la mesure du temps écoulé
# Si on ne simule pas les capteurs
if REAL_SENSORS:
print("Capteurs HTS221 et LPS22 présents")
from machine import I2C # Pour gérer le bus I2C
import HTS221 # Pour gérer le capteur MEMS HTS221
import LPS22 # Pour gérer le capteur MEMS LPS22
# On utilise l'I2C n°1 de la carte NUCLEO-WB55 pour communiquer avec le capteur
i2c = I2C(1)
# Instances des capteursIs
sensor1 = HTS221.HTS221(i2c)
sensor2 = LPS22.LPS22(i2c)
# Pause d'une seconde pour laisser à l'I2C le temps de s'initialiser
time.sleep_ms(1000)
# Si on simule les capteurs
else:
print("Capteurs HTS221 et LPS22 absents, mesures simulées")
import random # Pour la génération de valeurs aléatoires
# Instanciation du BLE
ble = bluetooth.BLE()
uart = BLEperipheral(ble)
# Gestionnaire de l'évènement de réception
def on_rx():
print("Données reçues du central : ", uart.read().decode().strip())
# Réception (asynchrone) des données (i.e. réaction aux écritures du central dans RX).
uart.irq(handler=on_rx)
# Structure de gestion des erreurs pour les interruptions du clavier
try:
while True:
#Si on ne simuple pas les capteurs
if REAL_SENSORS:
# Lecture des capteurs
temp = sensor1.temperature()
humi = sensor1.humidity()
pres = sensor2.pressure()
else:
temp = random.randint(-1, 50) # Valeur aléatoire entre -1 et 50 °C
humi = random.randint(0, 100) # Valeur aléatoire entre 0 et 100 %
pres = random.randint(900, 1013) # Valeur aléatoire entre 900 et 1013 hPa
# Conversion en texte des valeurs renvoyées par les capteurs
stemp = str(round(temp,1))
shumi = str(int(humi))
spres = str(int(SeeLevelPressure(pres, LOCAL_ALTITUDE)))
# Affichage sur le port série de l'USB USER
print("Température : " + stemp + " °C, Humidité relative : " + shumi + " %, Pression : " + spres + " hPa")
if uart.is_connected():
# On concatène les données :
data = stemp + "|" + shumi + "|" + spres
# On les envoie au central (i.e. on les notifie dans TX):
uart.write(data)
print("Données envoyées au central : " + data)
# Temporisation de 5 secondes
time.sleep_ms(5000)
# En cas d'interruption clavier (l'utilisateur appuie sur *[CTRL]-[C]*)
except KeyboardInterrupt:
pass # Ne quitte pas l'application et passe à la suite
# Ferme l'UART actif
uart.close()
# Si le nom du script est "main", exécute la fonction "demo()"
if __name__ == "__main__":
demo()
Mise en œuvre
Assurez vous que l’identifiant _MY_NAME dans le script du périphérique et l’identifiant _TARGET_PERIPHERAL_NAME dans le script du central ont bien la même valeur (ici ID000001). Ceci permet de s’assurer que votre central va bien se connecter au périphérique que vous avez sélectionné dans le cas où d’autres périphériques BLE UART seraient présents à proximité. NB ; on aurait également pu choisir l’adresse MC du périphérique plutôt que le code arbitraire ID000001).
Commencez par démarrer le script du périphérique ([CTRL]-[D] dans le terminal PuTTY), et ensuite le script du central (idem). Si tout se déroule correctement vous devriez observer les échanges ci-dessous (aux valeurs de température, humidité et pression près) sur les deux terminaux :
Console PuTTY du périphérique :
MPY: sync filesystems
MPY: soft reboot
Périphérique BLE : ID000001
Capteurs HTS221 et LPS22 présents
Mon adresse MAC : 02020722281e
Température : 22.6 °C, Humidité relative : 55 %, Pression : 1016 hPa
New connection 2049
La taille maximum des messages est désormais 128 octets
Température : 22.6 °C, Humidité relative : 55 %, Pression : 1016 hPa
Données envoyées au central : 22.6|55|1016
Données reçues du central :
Température : 22.6 °C, Humidité relative : 55 %, Pression : 1016 hPa
Données envoyées au central : 22.6|55|1016
Données reçues du central : AR du central 02020722281e
Température : 22.6 °C, Humidité relative : 55 %, Pression : 1016 hPa
Données envoyées au central : 22.6|55|1016
Données reçues du central : AR du central 02020722281e
Température : 22.6 °C, Humidité relative : 55 %, Pression : 1016 hPa
Données envoyées au central : 22.6|55|1016
Données reçues du central : AR du central 02020722281e
Disconnected 2049
Console PuTTY du central :
MPY: sync filesystems
MPY: soft reboot
Mon adresse MAC : 0202072226be
Central BLE associé au périphérique ID000001
Scan démarré, recherche ID000001
Scan terminé, succès : ID000001 trouvé
Connecté à ID000001
La taille maximum des messages est désormais 128 octets
Message recu de 02020722281e : 22.6|55|1016
Température = 22.6 °C
Humidité relative = 55 %
Pression = 1016 hPa
Message recu de 02020722281e : 22.6|55|1016
Température = 22.6 °C
Humidité relative = 55 %
Pression = 1016 hPa
Message recu de 02020722281e : 22.6|55|1016
Température = 22.6 °C
Humidité relative = 55 %
Pression = 1016 hPa
Message recu de 02020722281e : 22.6|55|1016
Température = 22.6 °C
Humidité relative = 55 %
Pression = 1016 hPa
Déconnecté de ID000001