Sending data over MQTT

Sending data over MQTT#

MQTT (Message Queuing Telemetry Transport) is a protocol that is widely used to send messages from sensors to be logged. We will demonstrate how to connect a Pico to a MQTT broker.

Requirements

  1. A Pico

  2. A Wifi adapator (we are using a Pico W which has built in Wifi)

Set-up#

Since the Wifi module is built in - we don’t need to worry about wiring anything to the Pico W.

The only step we need to first take is to create a settings.toml file in the CIRCUITPY drive. This is a type of text file which will hold all our sensitive network information separate to the main code.py file.

The information that we will hold here is your WiFi’s SSID (its name) and password, as well as the IP address which you are making the request to.

INATORNAME = ""        # Name of the device, will be used when sending data
INATORTOPIC = ""       # The topic to subscribe to on the MQTT broker
ACQUIRETIME =          # The value (in seconds) of how often to acquire data and log for
RECONTIME =            # The value (in seconds) of how often to attempt to reconnect to the wifi network if lost
WIFI_SSID = ""         # Fill in with your WiFi network's name 
WIFI_PASSWORD = ""     # Fill in with your WiFi network's password 
IP_ADDRESS = ""        # Fill in with your Raspberry Pi's password 
PORT =               # Fill in the port you are connecting to the MQTT Broker
URL = ""               # Fill in the URL you wish to connect to
MQTT_USERNAME = ""  #The user name on the MQTT broker that can publish
MQTT_KEY = ""       #The password for the MQTT User
BROKER = ""         #The address for the MQTT broker

Warning

Remember this information is stored in plain text and so can be read by anyone with access to the Pico

There may be other information you want to include here like the port or other URL extensions.

Note

Make sure to keep the speech marks when putting your own network details into the settings.toml. This ensures that CircuitPython recognises the variables as strings. The port number is best left as an integer

Code set-up#

Now we can program our Pico to connect to WiFi and send data to a MQTT broker

Below is an example for sending temperature and time data to a broker.

Importing libraries#

Several libraries are required

  1. adafruit_requests

  2. adafruit_connection_manager

  3. adafruit_minimqtt

  4. adafruit_ticks

all four are in the circuitpython library package

a fifth useful library is adafruit_logging which can be helpful to diagnose connection problems

import os
import time
import json
import supervisor

import microcontroller
import adafruit_connection_manager
import wifi
import adafruit_requests
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import adafruit_logging as logging

#Function for Wifi Connection
def conn_wifi(wifi_ssid, wifi_password):
    print(f"\nConnecting to {wifi_ssid} network...")
    try:
        wifi.radio.connect(wifi_ssid, wifi_password)
        wifi_con = True
    except OSError as e:
        print(f"❌ OSError: {e}")
        wifi_con = False

    return wifi_con


# Retrieve variables from settings.toml
inatorname = os.getenv("INATORNAME")
ssid = os.getenv("WIFI_SSID")
password = os.getenv("WIFI_PASSWORD")
ip = os.getenv("IP_ADDRESS")
# MQTT Topic to publish data from Pico to MQTT Cloud Broker
mqtt_topic = os.getenv("INATORTOPIC")
accTime = os.getenv(
    "ACQUIRETIME"
)  # how often to acquire and transmit data - should be number in settings
recTime = os.getenv(
    "RECONTIME"
)  # how oftern to reconnect in seconds - should be number in settings

wifi_con = False

# Initalize Wifi, Socket Pool, Request Session
pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio)
requests = adafruit_requests.Session(pool, ssl_context)

# Connect to the Wi-Fi network
wifi_con = conn_wifi(ssid, password)

if wifi_con:
    print("✅ Wifi!\n")
    print("IP-Adress is", wifi.radio.ipv4_address)

    # MQTT Topic to publish data from Pico to MQTT Cloud broker
    mqtt_topic = f"{mqtt_topic}/{inatorname}"

    # Define callback methods which are called when events occur
    def connect(mqtt_client, userdata, flags, rc):
        # This function will be called when the mqtt_client is connected
        # successfully to the broker.
        print("Connected to MQTT Broker!")
        print(f"Flags: {flags}\n RC: {rc}")

    def disconnect(mqtt_client, userdata, rc):
        # This method is called when the mqtt_client disconnects
        # from the broker.
        print("Disconnected from MQTT Broker!")

    def subscribe(mqtt_client, userdata, topic, granted_qos):
        # This method is called when the mqtt_client subscribes to a new feed.
        print(f"Subscribed to {topic} with QOS level {granted_qos}")

    def unsubscribe(mqtt_client, userdata, topic, pid):
        # This method is called when the mqtt_client unsubscribes from a feed.
        print(f"Unsubscribed from {topic} with PID {pid}")

    def publish(mqtt_client, userdata, topic, pid):
        # This method is called when the mqtt_client publishes data to a feed.
        print(f"Published to {topic} with PID {pid}")

    def message(client, topic, message):
        print(f"New message on topic {topic}: {message}")

    # Set up the MQTT client
    mqtt_client = MQTT.MQTT(
        broker=os.getenv("BROKER"),
        port=os.getenv("PORT"),
        username=os.getenv("MQTT_USERNAME"),
        password=os.getenv("MQTT_KEY"),
        socket_pool=pool,
        is_ssl=True,
        keep_alive=3600,
        ssl_context=ssl_context,
    )
    # Set up logging if having connection difficulties
    # mqtt_client.logger = logging.getLogger('test')
    # mqtt_client.logger.setLevel(logging.DEBUG)

    # Connect callback handlers to mqtt_client
    mqtt_client.on_connect = connect
    mqtt_client.on_disconnect = disconnect
    mqtt_client.on_subscribe = subscribe
    mqtt_client.on_unsubscribe = unsubscribe
    mqtt_client.on_publish = publish
    mqtt_client.on_message = message

    print(f"Attempting to connect to {mqtt_client.broker}:{mqtt_client.port}")
    mqtt_client.connect()

    while True:
        if wifi.radio.connected:
            try:
                temp = microcontroller.cpu.temperature
                timetrial = time.monotonic()
                # Create dictionary
                json_dict = {
                    "inator": inatorname,
                    "cpu_temp": temp,
                    "time": timetrial,
                }  # Send data with JSON syntax
                json_data = json.dumps(json_dict)
                print(f" | ✅ Sending JSON ('key':'value'): {json_data}")
                mqtt_client.publish(mqtt_topic, str(json_data))
                print("| ✅ Sent")
                time.sleep(accTime)
            except (ValueError, RuntimeError) as e:
                # this is a broad error, would be better to catch specifics
                # maybe RuntimeError
                print(f"Error {e}")
                # Attemp reconnect (for example if e is EBDAF as MQTT connection is down)
                mqtt_client.reconnect()
            except Exception as e:
                print(f"Error {e}")
                supervisor.reload()
        else:
            print(f"Wifi Disconnected - will attempt to reconnect every {recTime} mins")
            wifi_con = conn_wifi(ssid, password)
            if not wifi_con:
                # wait recTime until reattempt connection
                time.sleep(recTime)
            else:
                # Attemp reconnect (for example if e is EBDAF as MQTT connection is down)
                mqtt_client.reconnect()
else:
    print("❌ Wifi!\n")
    print("Connection attempt failed")

Something to note is the mqtt_client.reconnect() command. This is to deal with situations where the connection to the Broker is lost or reset. This reconnects and also re-subscribes to the topics.

More complicated examples, with more sensors connected can be found in the project repo under Labinator - Pico - MQTT - examples

Recieving Messages#

In this example we are only publishing to a topic via MQTT. We can also recieve messages for a topic via MQTT

Below is an example that can control 3 lights, a red, yellow and green. It is very similar to before. The main addition is some code to control the LEDs in the on_message property of the MQTT connection. Then in the main loop of the programme a connection is made to the MQTT connection’s loop property. This programme will now monitor a given MQTT topic and if the correct message is sent change the LEDs accordingly


import os
import microcontroller
import adafruit_connection_manager
import wifi
import adafruit_requests
import time
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import adafruit_logging as logging
import json
import board
import digitalio

led = digitalio.DigitalInOut(board.LED)
led.direction = digitalio.Direction.OUTPUT

green_lt = digitalio.DigitalInOut(board.GP19)
green_lt.direction = digitalio.Direction.OUTPUT
yell_lt = digitalio.DigitalInOut(board.GP20)
yell_lt.direction = digitalio.Direction.OUTPUT
red_lt = digitalio.DigitalInOut(board.GP18)
red_lt.direction = digitalio.Direction.OUTPUT

def conn_wifi(wifi_ssid,wifi_password):
    print(f"\nConnecting to {wifi_ssid} network...")
    try:
        wifi.radio.connect(wifi_ssid, wifi_password)
        wifi_con = True
    except OSError as e:
        print(f"❌ OSError: {e}")
        wifi_con = False

    return wifi_con

#Retrieve variables from settings.toml
inatorname = os.getenv("INATORNAME")
ssid = os.getenv("WIFI_SSID")
password = os.getenv("WIFI_PASSWORD")
ip = os.getenv('IP_ADDRESS')
#port=os.getenv('PORT')
accTime = os.getenv('ACQUIRETIME')  #how often to acquire and transmit data
recTime = os.getenv('RECONTIME') #how oftern to reconnect in seconds

wifi_con = False

# Initalize Wifi, Socket Pool, Request Session
pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio)
requests = adafruit_requests.Session(pool, ssl_context)

# Connect to the Wi-Fi network
wifi_con = conn_wifi(ssid,password)
yell_lt.value = True
green_lt.value = False
red_lt.value = False
if wifi_con:
    print("✅ Wifi!\n")
    green_lt.value = True
    print("IP-Adress is", wifi.radio.ipv4_address)

    # MQTT Topic to publish data from Pico to HiveMQ Cloud
    mqtt_topic = "/pan"

    # Define callback methods which are called when events occur
    def connect(mqtt_client, userdata, flags, rc):
        # This function will be called when the mqtt_client is connected
        # successfully to the broker.
        print("Connected to MQTT Broker!")
        print(f"Flags: {flags}\n RC: {rc}")

    def disconnect(mqtt_client, userdata, rc):
        # This method is called when the mqtt_client disconnects
        # from the broker.
        print("Disconnected from MQTT Broker!")

    def subscribe(mqtt_client, userdata, topic, granted_qos):
        # This method is called when the mqtt_client subscribes to a new feed.
        print(f"Subscribed to {topic} with QOS level {granted_qos}")

    def unsubscribe(mqtt_client, userdata, topic, pid):
        # This method is called when the mqtt_client unsubscribes from a feed.
        print(f"Unsubscribed from {topic} with PID {pid}")


    def publish(mqtt_client, userdata, topic, pid):
        # This method is called when the mqtt_client publishes data to a feed.
        print(f"Published to {topic} with PID {pid}")

    def message(client, topic, message):
        print(f"New message on topic {topic}: {message}")
        if message == "panic":
            led.value = True
            green_lt.value=False
            yell_lt.value=False
            red_lt.value=True
        elif message == "warn":
            led.value = True
            green_lt.value=False
            yell_lt.value=True
            red_lt.value=False
        else:
            led.value = False
            green_lt.value=True
            yell_lt.value=False
            red_lt.value=False

    #Set up the MQTT client
    mqtt_client = MQTT.MQTT(
                broker=os.getenv('BROKER'),
                port=os.getenv('PORT'),
                username=os.getenv('MQTT_USERNAME'),
                password=os.getenv('MQTT_KEY'),
                socket_pool=pool,
                is_ssl = True,
                keep_alive = 3600,
                ssl_context = ssl_context
                )
    #Set up logging if having connection difficulties
    #mqtt_client.logger = logging.getLogger('test')
    #mqtt_client.logger.setLevel(logging.DEBUG)

    # Connect callback handlers to mqtt_client
    mqtt_client.on_connect = connect
    mqtt_client.on_disconnect = disconnect
    mqtt_client.on_subscribe = subscribe
    mqtt_client.on_unsubscribe = unsubscribe
    mqtt_client.on_publish = publish
    mqtt_client.on_message = message

    print(f"Attempting to connect to {mqtt_client.broker}:{mqtt_client.port}")
    mqtt_client.connect()
    print(f"Subscribing to {mqtt_topic}")
    mqtt_client.subscribe(mqtt_topic)
    green_lt.value=True
    yell_lt.value=False
    red_lt.value=False

    last_send = time.monotonic()

    while True:
        if wifi.radio.connected:
            try:
                # Poll the message queue
                mqtt_client.loop()

                if time.monotonic() - last_send > accTime:
                    last_send = time.monotonic()
            except Exception as e:
                #this is a broad error, would be better to catch specifics
                # maybe RuntimeError
                red_lt.value=True
                green_lt.value=False
                print(f"Error {e}")
        else:
            print(f"Wifi Disconnected - will attempt to reconnect every {recTime} mins")
            wifi_con = conn_wifi(ssid, password)
            if not wifi_con:
                #wait recTime until reattempt connection
                time.sleep(recTime)
else:
    print("❌ Wifi!\n")
    print("Connection attempt failed")