Publishing Data Streams from Python

In this tutorial, we look into how to publish Data Stream messages into the IoTFlows console using a simple Python script. If you want to skip instructions and want the answer, head straight to Example Code.

Installing the Paho MQTT Library

You can install the MQTT client using PIP with the following command in your terminal window

sudo pip3 install paho-mqtt

NOTE: Depending on the version of Python you are running on your computer will depend on the version of PIP you will require. For older versions check out Intro to MQTT Python Client

Client Methods

Paho MQTT client has several methods, such as

  • publish()

  • subscribe()

  • unsubscribe()

  • connect()

  • disconnect()

These methods are associated with callbacks, which we will look into later in our example code.

Importing Client Class

First, we import the client class into our Python code and simplify the naming to just paho

import paho.mqtt.client as paho

Creating Client Instance

We begin by creating global variables for the client and client id. The client is the client constructor and can take up to 4 optional parameters, but only the client id is required and it must be unique.

Client(client_id=””, clean_session=True, userdata=None, protocol=MQTTv311, transport=”tcp”)

The client id will be created from the IoTFlows console as we have seen before. This can be seen in the device page for the specific device you will be streaming to. A specific client can be created for a Python application, similar to our Node-Red Example.

To create a new client you create the object shown below

client = paho.Client() 

Connecting to IoTFlows Broker

In order to publish messages to the IoTFlows console, you need to establish a connection with the IoTFlows broker.

To do this use the connect method of the Python MQTT client.

The connect method can be called with 4 parameters. This method declaration is shown below with its default parameters

connect(host, port=1883, keepalive=60, bind_address="")

NOTE: You only need to provide the host (broker name/IP address)

In order to connect to the IoTFlows broker through a secure port, we will create the connection below

client.connect("mqtts.connect.iotflows.com", 443, 60)

Publishing Messages

Once a connection is made, you will use the publish method. This method accepts 4 parameters, which is shown below

publish(topic, payload=None, qos=0, retain=False)

NOTE: The only parameters you must supply are the topic and the payload.

The payload is the message you want to publish.

Example Code

Putting everything together, the code below will publish a simulated Temperature value into the IoTFlows console. The parameters that are needed to be changed are

  • YOUR-DEVICE-CLIENT-ID: This is the Client ID that is displayed on your devices page in the IoTFlows console. See the image below or go to Creating a Device Client for more details.

  • YOUR-DEVICE-CLIENT-ID-PASSWORD: This is the password for the generated Client ID on the devices page in the IoTFlows console. Go to Creating a Device Client for more details.

  • YOUR-DEVICE-DATA-STREAM-ID: This is the Data Stream ID that is displayed on the devices page in the IoTFlows console. See the image below or go to Creating a Data Stream for more details.

  • YOUR-DATA-STREAM-MQTT-TOPIC: This is the Data Stream MQTT Topic that is displayed under each Data Stream. See the image below for more details.

  • YOUR-DATA: This data is what will be published to the IoTFlows console table.

import time, threading
import uuid, base64
import os, subprocess, signal, sys
import ssl, json
import paho.mqtt.client as paho # need to be installed: sudo pip3 install paho-mqtt

# ---------------------------------------------------------------------- MQTT Callback Functions
def on_connect(client, userdata, flags, rc):   
    print("Connected to the cloud!")Run
    
    # start publishing to the cloud
    startPublishTemperature()

def on_log(client, userdata, level, buf):
    a = 1

def on_message(client, userdata, message):  
    topic       = str(message.topic)
    payload     = str(message.payload.decode("utf-8"))
    print()
    print('MQTT new message received.')
    print('Topic: ' + topic)
    print('Payload: ' + payload)  
    payloadJson = json.loads(payload)  

def startPublishTemperature():  
  global client, client_id  
  print("I'm about to publish!")
  message = { "data": "YOUR-DATA", "data_stream_id": "YOUR-DEVICE-DATA-STREAM-ID", "client_id": client_id}
  print(message)
  jsonMessage = json.dumps(message)
  client.publish('YOUR-DATA-STREAM-MQTT-TOPIC', jsonMessage)    
  print("Just published a message to the cloud!")
  threading.Timer(2, startPublishTemperature).start() # Publish every 2 seconds

# ---------------------------------------------------------------------- Main Function
def main():    
    global client, client_id
    client_id = "YOUR-DEVICE-CLIENT-ID"
    print("Starting!")

    # Defining the mqtt connection  
    client = paho.Client() 
    client.on_message = on_message
    client.on_log = on_log
    client.on_connect = on_connect

    # Setting the username password
    client.username_pw_set(username=client_id, password='YOUR-DEVICE-CLIENT-ID-PASSWORD')

    # Connecting to the broker  
    client.tls_set(cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLS)
    client.connect("mqtts.connect.iotflows.com", 443, 60)
    client.loop_forever() #Continue publishing forever

    

if __name__== "__main__":
    main()

Running code

Now that you have everything in place, in your terminal run the following command to start your Python script. In our case, our file name is PublishToIoTFlows.py

sudo python3 PublishToIoTFlows.py 

Below is the example data coming from the Python script

Last updated