Home

Published

- 8 min read

CPE241 Week 10

img of CPE241 Week 10

MQTT (Message Queuing Telemetry Transport)

  • Lightweight Protocol: Designed for low-bandwidth and high-latency networks.
  • Publish/Subscribe Model: Clients can publish messages to topics and subscribe to receive them, promoting decoupled communication.
  • Quality of Service (QoS):
    • QoS 0: At most once (fire-and-forget).
    • QoS 1: At least once (guaranteed delivery).
    • QoS 2: Exactly once (no duplicates).
  • Retained Messages: Last message on a topic can be retained by the broker for new subscribers.
  • Last Will and Testament (LWT): Clients can specify a message for the broker to send if they disconnect unexpectedly.
  • Security: Can be secured using TLS/SSL for encrypted communication.
  • Common Use Cases: Smart homes, remote monitoring, sensor networks, and fleet management.

MQTT Architecture Diagram

Here’s a diagram illustrating the MQTT architecture:

   graph TD
    A[Client A] -->|Publish to Topic| B[MQTT Broker]
    B -->|Retained Message| A
    A -->|Subscribe to Topic| B
    B -->|Send Message to Subscribers| C[Client B]
    B -->|Send Message to Subscribers| D[Client C]
    A -->|Last Will Message| B
    B -->|Notify on Disconnect| E[Client B]

    subgraph QoS Levels
        QoS0[QoS 0: At most once]
        QoS1[QoS 1: At least once]
        QoS2[QoS 2: Exactly once]
    end

    B --> QoS0
    B --> QoS1
    B --> QoS2

MQTT

NodeMcu

   #include <ESP8266WiFi.h>
#include <PubSubClient.h>

const char* ssid = "your_ssid"; // WiFi SSID
const char* password = "your_password"; // WiFi Password
const char* mqtt_server = "broker.mqtt-dashboard.com"; // MQTT Broker

WiFiClient espClient;
PubSubClient client(espClient);

long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;

// Function to connect to WiFi
void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Connecting to ");
    Serial.println(ssid);
    WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    randomSeed(micros());
    Serial.println("");
    Serial.println("WiFi connected");
    Serial.print("IP address: ");
    Serial.println(WiFi.localIP());
}

// Callback function for incoming messages
void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("Message arrived [");
    Serial.print(topic);
    Serial.print("] ");
    for (int i = 0; i < length; i++) {
        Serial.print((char)payload[i]);
    }
    Serial.println();

    // Control the built-in LED based on the payload
    if ((char)payload[0] == '1') {
        digitalWrite(BUILTIN_LED, LOW);
    } else {
        digitalWrite(BUILTIN_LED, HIGH);
    }
}

// Function to reconnect to the MQTT broker
void reconnect() {
    while (!client.connected()) {
        Serial.print("Attempting MQTT connection...");
        String clientId = "ESP8266Client-";
        clientId += String(random(0xffff), HEX);
        
        if (client.connect(clientId.c_str())) {
            Serial.println("connected");
            client.publish("outTopic", "hello world");
            client.subscribe("inTopic");
        } else {
            Serial.print("failed, rc=");
            Serial.print(client.state());
            Serial.println(" try again in 5 seconds");
            delay(5000);
        }
    }
}

// Setup function
void setup() {
    pinMode(BUILTIN_LED, OUTPUT);
    Serial.begin(115200);
    setup_wifi();
    client.setServer(mqtt_server, 1883);
    client.setCallback(callback);
}

// Main loop function
void loop() {
    if (!client.connected()) {
        reconnect();
    }
    client.loop();

    long now = millis();
    if (now - lastMsg > 2000) {
        lastMsg = now;
        ++value;
        snprintf(msg, MSG_BUFFER_SIZE, "hello world #%ld", value);
        Serial.print("Publish message: ");
        Serial.println(msg);
        client.publish("outTopic", msg);
    }
}

Publisher

   #include <ESP8266WiFi.h>
#include <PubSubClient.h>

const char* ssid = "........"; // WiFi SSID
const char* password = "........"; // WiFi Password
const char* mqtt_server = "broker.mqtt-dashboard.com"; // MQTT Broker

WiFiClient espClient;
PubSubClient client(espClient);

long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;

// Function to connect to WiFi
void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Connecting to ");
    Serial.println(ssid);
    WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    randomSeed(micros());
    Serial.println("");
    Serial.println("WiFi connected");
    Serial.print("IP address: ");
    Serial.println(WiFi.localIP());
}

// Callback function for incoming messages
void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("Message arrived [");
    Serial.print(topic);
    Serial.print("] ");
    for (int i = 0; i < length; i++) {
        Serial.print((char)payload[i]);
    }
    Serial.println();
}

// Function to reconnect to the MQTT broker
void reconnect() {
    while (!client.connected()) {
        Serial.print("Attempting MQTT connection...");
        
        // Create a random client ID
        String clientId = "ESP8266Client-";
        clientId += String(random(0xffff), HEX);
        
        // Attempt to connect
        if (client.connect(clientId.c_str())) {
            Serial.println("connected");
            client.publish("outTopic", "hello world"); // Publish an announcement
            // client.subscribe("inTopic"); // Resubscribe removed
        } else {
            Serial.print("failed, rc=");
            Serial.print(client.state());
            Serial.println(" try again in 5 seconds");
            delay(5000);
        }
    }
}

// Setup function
void setup() {
    Serial.begin(115200); // Start serial communication at 115200 baud
    setup_wifi(); // Connect to WiFi
    client.setServer(mqtt_server, 1883); // Set MQTT server and port
    client.setCallback(callback); // Set the callback function for incoming messages
}

// Main loop function
void loop() {
    if (!client.connected()) {
        reconnect(); // Reconnect if not connected
    }
    client.loop(); // Process incoming messages

    long now = millis();
    if (now - lastMsg > 2000) { // Publish a message every 2 seconds
        lastMsg = now;
        ++value;
        snprintf(msg, MSG_BUFFER_SIZE, "hello world #%ld", value);
        Serial.print("Publish message: ");
        Serial.println(msg);
        client.publish("outTopic", msg); // Publish the message
    }
}

subscriber

   #include <ESP8266WiFi.h>
#include <PubSubClient.h>

const char* ssid = "........"; // WiFi SSID
const char* password = "........"; // WiFi Password
const char* mqtt_server = "broker.mqtt-dashboard.com"; // MQTT Broker

WiFiClient espClient;
PubSubClient client(espClient);

long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;

// Function to connect to WiFi
void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Connecting to ");
    Serial.println(ssid);
    WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    randomSeed(micros());
    Serial.println("");
    Serial.println("WiFi connected");
    Serial.print("IP address: ");
    Serial.println(WiFi.localIP());
}

// Callback function for incoming messages
void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("Message arrived [");
    Serial.print(topic);
    Serial.print("] ");
    for (int i = 0; i < length; i++) {
        Serial.print((char)payload[i]);
    }
    Serial.println();
}

// Function to reconnect to the MQTT broker
void reconnect() {
    while (!client.connected()) {
        Serial.print("Attempting MQTT connection...");
        
        // Create a random client ID
        String clientId = "ESP8266Client-";
        clientId += String(random(0xffff), HEX);
        
        // Attempt to connect
        if (client.connect(clientId.c_str())) {
            Serial.println("connected");
            // client.publish("outTopic", "hello world"); // Removed
            // client.subscribe("inTopic"); // Removed
        } else {
            Serial.print("failed, rc=");
            Serial.print(client.state());
            Serial.println(" try again in 5 seconds");
            delay(5000);
        }
    }
}

// Setup function
void setup() {
    // pinMode(BUILTIN_LED, OUTPUT); // Removed for Publisher #2
    Serial.begin(115200); // Start serial communication at 115200 baud
    setup_wifi(); // Connect to WiFi
    client.setServer(mqtt_server, 1883); // Set MQTT server and port
    client.setCallback(callback); // Set the callback function for incoming messages
}

// Main loop function
void loop() {
    if (!client.connected()) {
        reconnect(); // Reconnect if not connected
    }
    client.loop(); // Process incoming messages
}

2 Topic

Publisher

   #include <ESP8266WiFi.h>
#include <PubSubClient.h>

// WiFi credentials
const char* ssid = "your_SSID"; // Replace with your SSID
const char* password = "your_PASSWORD"; // Replace with your password

// MQTT server details
const char* mqtt_server = "broker.mqtt-dashboard.com";
WiFiClient espClient;
PubSubClient client(espClient);

unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char temp[MSG_BUFFER_SIZE];
char humid[MSG_BUFFER_SIZE];
int value = 0;

void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Connecting to ");
    Serial.println(ssid);
    
    WiFi.mode(WIFI_STA);
    WiFi.begin(ssid, password);
    
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    
    randomSeed(micros());
    Serial.println("");
    Serial.println("WiFi connected");
    Serial.println("IP address: ");
    Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("Message arrived [");
    Serial.print(topic);
    Serial.print("] ");
    
    for (int i = 0; i < length; i++) {
        Serial.print((char)payload[i]);
    }
    
    Serial.println();
}

void reconnect() {
    while (!client.connected()) {
        Serial.print("Attempting MQTT connection...");
        String clientId = "ESP8266Client-";
        clientId += String(random(0xffff), HEX);
        
        // Attempt to connect
        if (client.connect(clientId.c_str())) {
            Serial.println("connected");
            // Resend last published values
            client.publish("Dht22/temp", temp);
            client.publish("Dht22/humid", humid);
        } else {
            Serial.print("failed, rc=");
            Serial.print(client.state());
            Serial.println(" try again in 5 seconds");
            // Wait 5 seconds before retrying
            delay(5000);
        }
    }
}

void setup() {
    Serial.begin(115200);
    setup_wifi();
    client.setServer(mqtt_server, 1883);
    client.setCallback(callback);
}

void loop() {
    if (!client.connected()) {
        reconnect();
    }
    
    client.loop();

    // Generate random humidity and temperature values
    float h = random(0, 100);
    float t = random(0, 100);
    unsigned long now = millis();

    // Publish every 2 seconds
    if (now - lastMsg > 2000) {
        lastMsg = now;
        snprintf(temp, MSG_BUFFER_SIZE, "%.2f", t);
        snprintf(humid, MSG_BUFFER_SIZE, "%.2f", h);

        Serial.print("Publish message: Temp= ");
        Serial.println(temp);
        Serial.print("Publish message: Humid= ");
        Serial.println(humid);
        
        client.publish("Dht22/temp", temp);
        client.publish("Dht22/humid", humid);
    }
}

Subscriber

   #include <ESP8266WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <Adafruit_GFX.h>
#include <Adafruit_SSD1306.h>

// WiFi credentials
const char* ssid = "your_SSID"; // Replace with your SSID
const char* password = "your_PASSWORD"; // Replace with your password

// MQTT server details
const char* mqtt_server = "broker.mqtt-dashboard.com";
WiFiClient espClient;
PubSubClient client(espClient);

// Global variables
long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
String humid;
String temp;

#define OLED_RESET -1
Adafruit_SSD1306 OLED(OLED_RESET);

// Function to display data on the OLED
void oleddisplay(String topic, char* msgpayload) {
    if (topic == "Dht22/temp") {
        temp = String(msgpayload);
    }
    if (topic == "Dht22/humid") {
        humid = String(msgpayload);
    }

    // Update the OLED display
    OLED.clearDisplay();
    OLED.setTextColor(WHITE);
    OLED.setTextSize(1);
    OLED.setCursor(0, 0);
    OLED.println("DHT22");
    OLED.setCursor(0, 10);
    OLED.print("Temp: ");
    OLED.println(temp);
    OLED.setCursor(0, 20);
    OLED.print("Humid: ");
    OLED.println(humid);
    OLED.display();
}

// Function to connect to WiFi
void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Connecting to ");
    Serial.println(ssid);
    WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    randomSeed(micros());
    Serial.println("");
    Serial.println("WiFi connected");
    Serial.println("IP address: ");
    Serial.println(WiFi.localIP());
}

// MQTT callback function
void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("Message arrived [");
    Serial.print(topic);
    Serial.print("] ");
    for (int i = 0; i < length; i++) {
        msg[i] = (char)payload[i];
    }
    msg[length] = '\0'; // Null-terminate the string
    Serial.println(msg);
    oleddisplay(topic, msg);
}

// Function to reconnect to MQTT
void reconnect() {
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    String clientId = "ESP8266Client-";
    clientId += String(random(0xffff), HEX);
    if (client.connect(clientId.c_str())) {
      Serial.println("connected");
      client.subscribe("ryuface-Dht22/temp");
      client.subscribe("ryuface-Dht22/humid");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      delay(5000);
    }
  }
}

// Setup function
void setup() {
    Serial.begin(115200);
    OLED.begin(SSD1306_SWITCHCAPVCC, 0x3C);
    setup_wifi();
    client.setServer(mqtt_server, 1883);
    client.setCallback(callback);
}

// Main loop
void loop() {
    if (!client.connected()) {
        reconnect();
    }
    client.loop();
}