Published
- 8 min read
CPE241 Week 10
MQTT (Message Queuing Telemetry Transport)
- Lightweight Protocol: Designed for low-bandwidth and high-latency networks.
- Publish/Subscribe Model: Clients can publish messages to topics and subscribe to receive them, promoting decoupled communication.
- Quality of Service (QoS):
- QoS 0: At most once (fire-and-forget).
- QoS 1: At least once (guaranteed delivery).
- QoS 2: Exactly once (no duplicates).
- Retained Messages: Last message on a topic can be retained by the broker for new subscribers.
- Last Will and Testament (LWT): Clients can specify a message for the broker to send if they disconnect unexpectedly.
- Security: Can be secured using TLS/SSL for encrypted communication.
- Common Use Cases: Smart homes, remote monitoring, sensor networks, and fleet management.
MQTT Architecture Diagram
Here’s a diagram illustrating the MQTT architecture:
graph TD
A[Client A] -->|Publish to Topic| B[MQTT Broker]
B -->|Retained Message| A
A -->|Subscribe to Topic| B
B -->|Send Message to Subscribers| C[Client B]
B -->|Send Message to Subscribers| D[Client C]
A -->|Last Will Message| B
B -->|Notify on Disconnect| E[Client B]
subgraph QoS Levels
QoS0[QoS 0: At most once]
QoS1[QoS 1: At least once]
QoS2[QoS 2: Exactly once]
end
B --> QoS0
B --> QoS1
B --> QoS2
MQTT
NodeMcu
#include <ESP8266WiFi.h>
#include <PubSubClient.h>
const char* ssid = "your_ssid"; // WiFi SSID
const char* password = "your_password"; // WiFi Password
const char* mqtt_server = "broker.mqtt-dashboard.com"; // MQTT Broker
WiFiClient espClient;
PubSubClient client(espClient);
long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;
// Function to connect to WiFi
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
randomSeed(micros());
Serial.println("");
Serial.println("WiFi connected");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
}
// Callback function for incoming messages
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
// Control the built-in LED based on the payload
if ((char)payload[0] == '1') {
digitalWrite(BUILTIN_LED, LOW);
} else {
digitalWrite(BUILTIN_LED, HIGH);
}
}
// Function to reconnect to the MQTT broker
void reconnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
String clientId = "ESP8266Client-";
clientId += String(random(0xffff), HEX);
if (client.connect(clientId.c_str())) {
Serial.println("connected");
client.publish("outTopic", "hello world");
client.subscribe("inTopic");
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
// Setup function
void setup() {
pinMode(BUILTIN_LED, OUTPUT);
Serial.begin(115200);
setup_wifi();
client.setServer(mqtt_server, 1883);
client.setCallback(callback);
}
// Main loop function
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
long now = millis();
if (now - lastMsg > 2000) {
lastMsg = now;
++value;
snprintf(msg, MSG_BUFFER_SIZE, "hello world #%ld", value);
Serial.print("Publish message: ");
Serial.println(msg);
client.publish("outTopic", msg);
}
}
Publisher
#include <ESP8266WiFi.h>
#include <PubSubClient.h>
const char* ssid = "........"; // WiFi SSID
const char* password = "........"; // WiFi Password
const char* mqtt_server = "broker.mqtt-dashboard.com"; // MQTT Broker
WiFiClient espClient;
PubSubClient client(espClient);
long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;
// Function to connect to WiFi
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
randomSeed(micros());
Serial.println("");
Serial.println("WiFi connected");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
}
// Callback function for incoming messages
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
// Function to reconnect to the MQTT broker
void reconnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
// Create a random client ID
String clientId = "ESP8266Client-";
clientId += String(random(0xffff), HEX);
// Attempt to connect
if (client.connect(clientId.c_str())) {
Serial.println("connected");
client.publish("outTopic", "hello world"); // Publish an announcement
// client.subscribe("inTopic"); // Resubscribe removed
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
// Setup function
void setup() {
Serial.begin(115200); // Start serial communication at 115200 baud
setup_wifi(); // Connect to WiFi
client.setServer(mqtt_server, 1883); // Set MQTT server and port
client.setCallback(callback); // Set the callback function for incoming messages
}
// Main loop function
void loop() {
if (!client.connected()) {
reconnect(); // Reconnect if not connected
}
client.loop(); // Process incoming messages
long now = millis();
if (now - lastMsg > 2000) { // Publish a message every 2 seconds
lastMsg = now;
++value;
snprintf(msg, MSG_BUFFER_SIZE, "hello world #%ld", value);
Serial.print("Publish message: ");
Serial.println(msg);
client.publish("outTopic", msg); // Publish the message
}
}
subscriber
#include <ESP8266WiFi.h>
#include <PubSubClient.h>
const char* ssid = "........"; // WiFi SSID
const char* password = "........"; // WiFi Password
const char* mqtt_server = "broker.mqtt-dashboard.com"; // MQTT Broker
WiFiClient espClient;
PubSubClient client(espClient);
long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;
// Function to connect to WiFi
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
randomSeed(micros());
Serial.println("");
Serial.println("WiFi connected");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
}
// Callback function for incoming messages
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
// Function to reconnect to the MQTT broker
void reconnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
// Create a random client ID
String clientId = "ESP8266Client-";
clientId += String(random(0xffff), HEX);
// Attempt to connect
if (client.connect(clientId.c_str())) {
Serial.println("connected");
// client.publish("outTopic", "hello world"); // Removed
// client.subscribe("inTopic"); // Removed
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
// Setup function
void setup() {
// pinMode(BUILTIN_LED, OUTPUT); // Removed for Publisher #2
Serial.begin(115200); // Start serial communication at 115200 baud
setup_wifi(); // Connect to WiFi
client.setServer(mqtt_server, 1883); // Set MQTT server and port
client.setCallback(callback); // Set the callback function for incoming messages
}
// Main loop function
void loop() {
if (!client.connected()) {
reconnect(); // Reconnect if not connected
}
client.loop(); // Process incoming messages
}
2 Topic
Publisher
#include <ESP8266WiFi.h>
#include <PubSubClient.h>
// WiFi credentials
const char* ssid = "your_SSID"; // Replace with your SSID
const char* password = "your_PASSWORD"; // Replace with your password
// MQTT server details
const char* mqtt_server = "broker.mqtt-dashboard.com";
WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char temp[MSG_BUFFER_SIZE];
char humid[MSG_BUFFER_SIZE];
int value = 0;
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
randomSeed(micros());
Serial.println("");
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
void reconnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
String clientId = "ESP8266Client-";
clientId += String(random(0xffff), HEX);
// Attempt to connect
if (client.connect(clientId.c_str())) {
Serial.println("connected");
// Resend last published values
client.publish("Dht22/temp", temp);
client.publish("Dht22/humid", humid);
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
// Wait 5 seconds before retrying
delay(5000);
}
}
}
void setup() {
Serial.begin(115200);
setup_wifi();
client.setServer(mqtt_server, 1883);
client.setCallback(callback);
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
// Generate random humidity and temperature values
float h = random(0, 100);
float t = random(0, 100);
unsigned long now = millis();
// Publish every 2 seconds
if (now - lastMsg > 2000) {
lastMsg = now;
snprintf(temp, MSG_BUFFER_SIZE, "%.2f", t);
snprintf(humid, MSG_BUFFER_SIZE, "%.2f", h);
Serial.print("Publish message: Temp= ");
Serial.println(temp);
Serial.print("Publish message: Humid= ");
Serial.println(humid);
client.publish("Dht22/temp", temp);
client.publish("Dht22/humid", humid);
}
}
Subscriber
#include <ESP8266WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <Adafruit_GFX.h>
#include <Adafruit_SSD1306.h>
// WiFi credentials
const char* ssid = "your_SSID"; // Replace with your SSID
const char* password = "your_PASSWORD"; // Replace with your password
// MQTT server details
const char* mqtt_server = "broker.mqtt-dashboard.com";
WiFiClient espClient;
PubSubClient client(espClient);
// Global variables
long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
String humid;
String temp;
#define OLED_RESET -1
Adafruit_SSD1306 OLED(OLED_RESET);
// Function to display data on the OLED
void oleddisplay(String topic, char* msgpayload) {
if (topic == "Dht22/temp") {
temp = String(msgpayload);
}
if (topic == "Dht22/humid") {
humid = String(msgpayload);
}
// Update the OLED display
OLED.clearDisplay();
OLED.setTextColor(WHITE);
OLED.setTextSize(1);
OLED.setCursor(0, 0);
OLED.println("DHT22");
OLED.setCursor(0, 10);
OLED.print("Temp: ");
OLED.println(temp);
OLED.setCursor(0, 20);
OLED.print("Humid: ");
OLED.println(humid);
OLED.display();
}
// Function to connect to WiFi
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
randomSeed(micros());
Serial.println("");
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
}
// MQTT callback function
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
for (int i = 0; i < length; i++) {
msg[i] = (char)payload[i];
}
msg[length] = '\0'; // Null-terminate the string
Serial.println(msg);
oleddisplay(topic, msg);
}
// Function to reconnect to MQTT
void reconnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
String clientId = "ESP8266Client-";
clientId += String(random(0xffff), HEX);
if (client.connect(clientId.c_str())) {
Serial.println("connected");
client.subscribe("ryuface-Dht22/temp");
client.subscribe("ryuface-Dht22/humid");
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
// Setup function
void setup() {
Serial.begin(115200);
OLED.begin(SSD1306_SWITCHCAPVCC, 0x3C);
setup_wifi();
client.setServer(mqtt_server, 1883);
client.setCallback(callback);
}
// Main loop
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
}