Sending data over MQTT#
MQTT (Message Queuing Telemetry Transport) is a protocol that is widely used to send messages from sensors to be logged. We will demonstrate how to connect a Pico to a MQTT broker.
Requirements
A Pico
A Wifi adapator (we are using a Pico W which has built in Wifi)
Set-up#
Since the Wifi module is built in - we don’t need to worry about wiring anything to the Pico W.
The only step we need to first take is to create a settings.toml file in the CIRCUITPY drive. This is a type of text file which will hold all our sensitive network information separate to the main code.py file.
The information that we will hold here is your WiFi’s SSID (its name) and password, as well as the IP address which you are making the request to.
INATORNAME = "" # Name of the device, will be used when sending data
INATORTOPIC = "" # The topic to subscribe to on the MQTT broker
ACQUIRETIME = # The value (in seconds) of how often to acquire data and log for
RECONTIME = # The value (in seconds) of how often to attempt to reconnect to the wifi network if lost
WIFI_SSID = "" # Fill in with your WiFi network's name
WIFI_PASSWORD = "" # Fill in with your WiFi network's password
IP_ADDRESS = "" # Fill in with your Raspberry Pi's password
PORT = # Fill in the port you are connecting to the MQTT Broker
URL = "" # Fill in the URL you wish to connect to
MQTT_USERNAME = "" #The user name on the MQTT broker that can publish
MQTT_KEY = "" #The password for the MQTT User
BROKER = "" #The address for the MQTT broker
Warning
Remember this information is stored in plain text and so can be read by anyone with access to the Pico
There may be other information you want to include here like the port or other URL extensions.
Note
Make sure to keep the speech marks when putting your own network details into the settings.toml. This ensures that CircuitPython recognises the variables as strings.
The port number is best left as an integer
Code set-up#
Now we can program our Pico to connect to WiFi and send data to a MQTT broker
Below is an example for sending temperature and time data to a broker.
Importing libraries#
Several libraries are required
adafruit_requests
adafruit_connection_manager
adafruit_minimqtt
adafruit_ticks
all four are in the circuitpython library package
a fifth useful library is adafruit_logging which can be helpful to diagnose connection problems
import os
import time
import json
import supervisor
import microcontroller
import adafruit_connection_manager
import wifi
import adafruit_requests
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import adafruit_logging as logging
#Function for Wifi Connection
def conn_wifi(wifi_ssid, wifi_password):
print(f"\nConnecting to {wifi_ssid} network...")
try:
wifi.radio.connect(wifi_ssid, wifi_password)
wifi_con = True
except OSError as e:
print(f"❌ OSError: {e}")
wifi_con = False
return wifi_con
# Retrieve variables from settings.toml
inatorname = os.getenv("INATORNAME")
ssid = os.getenv("WIFI_SSID")
password = os.getenv("WIFI_PASSWORD")
ip = os.getenv("IP_ADDRESS")
# MQTT Topic to publish data from Pico to MQTT Cloud Broker
mqtt_topic = os.getenv("INATORTOPIC")
accTime = os.getenv(
"ACQUIRETIME"
) # how often to acquire and transmit data - should be number in settings
recTime = os.getenv(
"RECONTIME"
) # how oftern to reconnect in seconds - should be number in settings
wifi_con = False
# Initalize Wifi, Socket Pool, Request Session
pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio)
requests = adafruit_requests.Session(pool, ssl_context)
# Connect to the Wi-Fi network
wifi_con = conn_wifi(ssid, password)
if wifi_con:
print("✅ Wifi!\n")
print("IP-Adress is", wifi.radio.ipv4_address)
# MQTT Topic to publish data from Pico to MQTT Cloud broker
mqtt_topic = f"{mqtt_topic}/{inatorname}"
# Define callback methods which are called when events occur
def connect(mqtt_client, userdata, flags, rc):
# This function will be called when the mqtt_client is connected
# successfully to the broker.
print("Connected to MQTT Broker!")
print(f"Flags: {flags}\n RC: {rc}")
def disconnect(mqtt_client, userdata, rc):
# This method is called when the mqtt_client disconnects
# from the broker.
print("Disconnected from MQTT Broker!")
def subscribe(mqtt_client, userdata, topic, granted_qos):
# This method is called when the mqtt_client subscribes to a new feed.
print(f"Subscribed to {topic} with QOS level {granted_qos}")
def unsubscribe(mqtt_client, userdata, topic, pid):
# This method is called when the mqtt_client unsubscribes from a feed.
print(f"Unsubscribed from {topic} with PID {pid}")
def publish(mqtt_client, userdata, topic, pid):
# This method is called when the mqtt_client publishes data to a feed.
print(f"Published to {topic} with PID {pid}")
def message(client, topic, message):
print(f"New message on topic {topic}: {message}")
# Set up the MQTT client
mqtt_client = MQTT.MQTT(
broker=os.getenv("BROKER"),
port=os.getenv("PORT"),
username=os.getenv("MQTT_USERNAME"),
password=os.getenv("MQTT_KEY"),
socket_pool=pool,
is_ssl=True,
keep_alive=3600,
ssl_context=ssl_context,
)
# Set up logging if having connection difficulties
# mqtt_client.logger = logging.getLogger('test')
# mqtt_client.logger.setLevel(logging.DEBUG)
# Connect callback handlers to mqtt_client
mqtt_client.on_connect = connect
mqtt_client.on_disconnect = disconnect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_unsubscribe = unsubscribe
mqtt_client.on_publish = publish
mqtt_client.on_message = message
print(f"Attempting to connect to {mqtt_client.broker}:{mqtt_client.port}")
mqtt_client.connect()
while True:
if wifi.radio.connected:
try:
temp = microcontroller.cpu.temperature
timetrial = time.monotonic()
# Create dictionary
json_dict = {
"inator": inatorname,
"cpu_temp": temp,
"time": timetrial,
} # Send data with JSON syntax
json_data = json.dumps(json_dict)
print(f" | ✅ Sending JSON ('key':'value'): {json_data}")
mqtt_client.publish(mqtt_topic, str(json_data))
print("| ✅ Sent")
time.sleep(accTime)
except (ValueError, RuntimeError) as e:
# this is a broad error, would be better to catch specifics
# maybe RuntimeError
print(f"Error {e}")
# Attemp reconnect (for example if e is EBDAF as MQTT connection is down)
mqtt_client.reconnect()
except Exception as e:
print(f"Error {e}")
supervisor.reload()
else:
print(f"Wifi Disconnected - will attempt to reconnect every {recTime} mins")
wifi_con = conn_wifi(ssid, password)
if not wifi_con:
# wait recTime until reattempt connection
time.sleep(recTime)
else:
# Attemp reconnect (for example if e is EBDAF as MQTT connection is down)
mqtt_client.reconnect()
else:
print("❌ Wifi!\n")
print("Connection attempt failed")
Something to note is the mqtt_client.reconnect() command. This is to deal with situations where the connection to the Broker is lost or reset. This reconnects and also re-subscribes to the topics.
More complicated examples, with more sensors connected can be found in the project repo under Labinator - Pico - MQTT - examples
Recieving Messages#
In this example we are only publishing to a topic via MQTT. We can also recieve messages for a topic via MQTT
Below is an example that can control 3 lights, a red, yellow and green. It is very similar to before. The main addition is some code to control the LEDs in the on_message property of the MQTT connection. Then in the main loop of the programme a connection is made to the MQTT connection’s loop property. This programme will now monitor a given MQTT topic and if the correct message is sent change the LEDs accordingly
import os
import microcontroller
import adafruit_connection_manager
import wifi
import adafruit_requests
import time
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import adafruit_logging as logging
import json
import board
import digitalio
led = digitalio.DigitalInOut(board.LED)
led.direction = digitalio.Direction.OUTPUT
green_lt = digitalio.DigitalInOut(board.GP19)
green_lt.direction = digitalio.Direction.OUTPUT
yell_lt = digitalio.DigitalInOut(board.GP20)
yell_lt.direction = digitalio.Direction.OUTPUT
red_lt = digitalio.DigitalInOut(board.GP18)
red_lt.direction = digitalio.Direction.OUTPUT
def conn_wifi(wifi_ssid,wifi_password):
print(f"\nConnecting to {wifi_ssid} network...")
try:
wifi.radio.connect(wifi_ssid, wifi_password)
wifi_con = True
except OSError as e:
print(f"❌ OSError: {e}")
wifi_con = False
return wifi_con
#Retrieve variables from settings.toml
inatorname = os.getenv("INATORNAME")
ssid = os.getenv("WIFI_SSID")
password = os.getenv("WIFI_PASSWORD")
ip = os.getenv('IP_ADDRESS')
#port=os.getenv('PORT')
accTime = os.getenv('ACQUIRETIME') #how often to acquire and transmit data
recTime = os.getenv('RECONTIME') #how oftern to reconnect in seconds
wifi_con = False
# Initalize Wifi, Socket Pool, Request Session
pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio)
requests = adafruit_requests.Session(pool, ssl_context)
# Connect to the Wi-Fi network
wifi_con = conn_wifi(ssid,password)
yell_lt.value = True
green_lt.value = False
red_lt.value = False
if wifi_con:
print("✅ Wifi!\n")
green_lt.value = True
print("IP-Adress is", wifi.radio.ipv4_address)
# MQTT Topic to publish data from Pico to HiveMQ Cloud
mqtt_topic = "/pan"
# Define callback methods which are called when events occur
def connect(mqtt_client, userdata, flags, rc):
# This function will be called when the mqtt_client is connected
# successfully to the broker.
print("Connected to MQTT Broker!")
print(f"Flags: {flags}\n RC: {rc}")
def disconnect(mqtt_client, userdata, rc):
# This method is called when the mqtt_client disconnects
# from the broker.
print("Disconnected from MQTT Broker!")
def subscribe(mqtt_client, userdata, topic, granted_qos):
# This method is called when the mqtt_client subscribes to a new feed.
print(f"Subscribed to {topic} with QOS level {granted_qos}")
def unsubscribe(mqtt_client, userdata, topic, pid):
# This method is called when the mqtt_client unsubscribes from a feed.
print(f"Unsubscribed from {topic} with PID {pid}")
def publish(mqtt_client, userdata, topic, pid):
# This method is called when the mqtt_client publishes data to a feed.
print(f"Published to {topic} with PID {pid}")
def message(client, topic, message):
print(f"New message on topic {topic}: {message}")
if message == "panic":
led.value = True
green_lt.value=False
yell_lt.value=False
red_lt.value=True
elif message == "warn":
led.value = True
green_lt.value=False
yell_lt.value=True
red_lt.value=False
else:
led.value = False
green_lt.value=True
yell_lt.value=False
red_lt.value=False
#Set up the MQTT client
mqtt_client = MQTT.MQTT(
broker=os.getenv('BROKER'),
port=os.getenv('PORT'),
username=os.getenv('MQTT_USERNAME'),
password=os.getenv('MQTT_KEY'),
socket_pool=pool,
is_ssl = True,
keep_alive = 3600,
ssl_context = ssl_context
)
#Set up logging if having connection difficulties
#mqtt_client.logger = logging.getLogger('test')
#mqtt_client.logger.setLevel(logging.DEBUG)
# Connect callback handlers to mqtt_client
mqtt_client.on_connect = connect
mqtt_client.on_disconnect = disconnect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_unsubscribe = unsubscribe
mqtt_client.on_publish = publish
mqtt_client.on_message = message
print(f"Attempting to connect to {mqtt_client.broker}:{mqtt_client.port}")
mqtt_client.connect()
print(f"Subscribing to {mqtt_topic}")
mqtt_client.subscribe(mqtt_topic)
green_lt.value=True
yell_lt.value=False
red_lt.value=False
last_send = time.monotonic()
while True:
if wifi.radio.connected:
try:
# Poll the message queue
mqtt_client.loop()
if time.monotonic() - last_send > accTime:
last_send = time.monotonic()
except Exception as e:
#this is a broad error, would be better to catch specifics
# maybe RuntimeError
red_lt.value=True
green_lt.value=False
print(f"Error {e}")
else:
print(f"Wifi Disconnected - will attempt to reconnect every {recTime} mins")
wifi_con = conn_wifi(ssid, password)
if not wifi_con:
#wait recTime until reattempt connection
time.sleep(recTime)
else:
print("❌ Wifi!\n")
print("Connection attempt failed")