MQTT on Pi4
Health Telemetry to PostgreSQL
This guide adds a serious MQTT stack to your server. You will run Mosquitto on 10.10.10.250, publish hardware and system health from Pi4, consume telemetry into PostgreSQL, and operate the pipeline with security and resilience controls.
MQTT is not just pub/sub syntax. Treat it as a reliable edge event bus with contracts, security boundaries, and operational checks.
Create Folders, Files, and First MQTT Run
Objective: start from an empty host and build a working MQTT workspace with reproducible run and test commands.
Learning Focus: This lesson removes setup assumptions so every later MQTT lesson starts from a known-good baseline with clear folder layout, executable files, and validation checks.
Before broker hardening or telemetry ingestion, you need a stable developer loop: create project directories, create code files, run them, and verify expected behavior. That loop is what prevents confusion later when failures appear, because you know whether the problem is setup, security policy, or application logic.
Create workspace directories
Keep code, logs, and service artifacts in explicit locations so operations and troubleshooting stay predictable.
sudo mkdir -p /opt/mqtt/{app,bin,logs} sudo chown -R $USER:$USER /opt/mqtt cd /opt/mqtt/app pwd
Create a Python virtual environment and install packages
This keeps MQTT dependencies isolated from system Python packages.
python3 -m venv /opt/mqtt/.venv source /opt/mqtt/.venv/bin/activate pip install --upgrade pip pip install paho-mqtt psutil psycopg[binary] pip freeze > /opt/mqtt/app/requirements.txt
Create your first publisher file
This is a minimal file-creation example that proves your tooling path before the larger health publisher in later lessons.
import paho.mqtt.publish as publish
publish.single(
topic="test/hello",
payload="hello_from_lesson0",
hostname="127.0.0.1",
port=1883,
)Run and test
Use one terminal as subscriber observer and another as publisher executor. This confirms file, runtime, and broker path in one loop.
# terminal A mosquitto_sub -h 127.0.0.1 -t test/hello -v # terminal B source /opt/mqtt/.venv/bin/activate python /opt/mqtt/app/hello_pub.py python -m py_compile /opt/mqtt/app/hello_pub.py
Checkpoint: subscriber receives hello_from_lesson0 and py_compile returns no syntax errors.
Define MQTT System Architecture
Objective: define a contract-first telemetry flow before installing anything.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Teaching Lens
This lesson teaches separation of concerns so publishers, broker, and consumers can evolve independently.
This lesson also focuses on operational reasoning: what healthy behavior looks like, what failure signals look like, and how this step protects the reliability of the lessons that come next.
The critical engineering idea is that MQTT design starts with contracts, not scripts. When topic names and payload fields are defined first, your publisher can be rewritten in Rust later, your ingestor can move to another host, and your analytics code still works because the message contract remains stable.
In this lesson, you are learning to think in failure boundaries. A broker outage, a crashed publisher, and a slow SQL consumer are three different failures that should be visible and handled differently. That mindset is what turns MQTT from a demo transport into an operational telemetry bus.
Reference data flow
Use this blueprint to keep your implementation deterministic and testable.
Read this flow as a systems boundary map, not just a diagram. The publisher owns measurement timing and payload creation, the broker owns fan-out and delivery semantics, and the SQL ingestor owns persistence policy. When each boundary is explicit, debugging is faster because you can isolate faults by stage instead of treating MQTT as one opaque black box.
Pi4 Health Agent -> mqtt/pi4/health/json -> Mosquitto Broker Pi4 Health Agent -> mqtt/pi4/health/lwt -> Mosquitto Broker SQL Ingestor <- mqtt/pi4/health/# <- Mosquitto Broker SQL Ingestor -> embedded_events table -> PostgreSQL
Checkpoint: all topic names and payload fields are agreed before coding.
Install and Baseline Mosquitto Broker
Objective: deploy broker runtime with startup persistence and basic local test capability.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Run
This sequence installs Mosquitto, enables service startup, and verifies listener readiness on port 1883.
The purpose of this first run is to establish a known-good control plane before adding credentials, TLS, and custom ACLs. If you cannot prove baseline broker health now, later security configuration errors will become ambiguous and cost you time during troubleshooting.
sudo apt-get update sudo apt-get install -y mosquitto mosquitto-clients sudo systemctl enable --now mosquitto sudo systemctl status mosquitto --no-pager ss -tulpn | grep 1883
Sanity publish and subscribe
Run subscriber first, then publish once from a second shell. This proves end-to-end broker function before security layering.
Starting the subscriber first teaches an important MQTT debugging habit: always observe the bus before injecting traffic. This lets you verify topic routing and payload visibility directly, so later failures are clearly tied to auth, ACL, or transport changes rather than core broker behavior.
mosquitto_sub -h 127.0.0.1 -t test/hello -v mosquitto_pub -h 127.0.0.1 -t test/hello -m "broker_alive"
Checkpoint: subscriber receives the test payload with topic and value.
Add User Authentication and Topic ACL
Objective: enforce identity and per-topic rights so random clients cannot publish to critical channels.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Create users and ACL policy
This setup creates separate identities for publisher and SQL consumer and limits each to minimal privileges.
This block teaches principle-of-least-privilege applied to messaging systems. A publisher identity should never consume unrelated topics, and a consumer identity should not publish into control topics. These constraints reduce accidental misuse and make malicious behavior easier to detect in logs.
sudo mosquitto_passwd -c /etc/mosquitto/passwd pi4_publisher sudo mosquitto_passwd /etc/mosquitto/passwd sql_ingestor sudo tee /etc/mosquitto/acl >/dev/null <<'EOF' user pi4_publisher topic write mqtt/pi4/health/# user sql_ingestor topic read mqtt/pi4/health/# EOF sudo tee /etc/mosquitto/conf.d/security.conf >/dev/null <<'EOF' allow_anonymous false password_file /etc/mosquitto/passwd acl_file /etc/mosquitto/acl listener 1883 0.0.0.0 EOF sudo systemctl restart mosquitto sudo systemctl status mosquitto --no-pager
Do not reuse one account for all components. Split credentials by function to reduce blast radius.
Checkpoint: unauthenticated publish fails; authenticated publish succeeds.
Design Topic and Payload Contracts
Objective: standardize message semantics so consumers can parse without fragile assumptions.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Contract example
Use stable fields and explicit units. Keep values machine-oriented, not prose strings.
Think of this payload as an API contract, not a casual message. Field names, units, and cardinality decisions here determine whether downstream SQL, dashboards, and alert rules remain stable as your implementation evolves from Python to Rust and from single-node to multi-node publishing.
Topic: mqtt/pi4/health/json
Payload:
{
"host": "pi-dns-core",
"ip": "10.10.10.250",
"cpu_pct": 17.2,
"mem_pct": 43.8,
"temp_c": 52.4,
"disk_root_pct": 61.1,
"services": {
"bind9": "active",
"postgresql": "active",
"nginx": "active",
"freeradius": "active"
},
"ts": "2026-05-12T12:34:56Z"
}This lesson teaches contract stability over ad hoc payload changes.
You verify every publisher message includes required fields and timestamp.
Checkpoint: payload schema is fixed and documented for all producers and consumers.
Build Pi4 Health Publisher
Objective: publish real host health at intervals and emit Last Will status on disconnect.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Create publisher agent
This script reads system metrics, service states, and pushes JSON to MQTT every 10 seconds.
Read this agent as a periodic sampling pipeline. It collects host state, normalizes values into a single JSON contract, and emits to one telemetry topic at fixed cadence. The stable cadence matters because SQL queries and alert windows depend on predictable sample intervals.
import json
import socket
import time
import subprocess
from datetime import datetime, timezone
import psutil
import paho.mqtt.client as mqtt
BROKER = "127.0.0.1"
PORT = 1883
USER = "pi4_publisher"
PASS = "ChangePublisherPassword!"
TOPIC = "mqtt/pi4/health/json"
LWT_TOPIC = "mqtt/pi4/health/lwt"
SERVICES = ["bind9", "postgresql", "nginx", "freeradius"]
def svc_state(name: str) -> str:
rc = subprocess.run(["systemctl", "is-active", "--quiet", name]).returncode
return "active" if rc == 0 else "inactive"
client = mqtt.Client(client_id="pi4-health-publisher")
client.username_pw_set(USER, PASS)
client.will_set(LWT_TOPIC, payload="offline", qos=1, retain=True)
client.connect(BROKER, PORT, keepalive=30)
client.loop_start()
client.publish(LWT_TOPIC, payload="online", qos=1, retain=True)
while True:
payload = {
"host": socket.gethostname(),
"ip": "10.10.10.250",
"cpu_pct": psutil.cpu_percent(interval=0.5),
"mem_pct": psutil.virtual_memory().percent,
"temp_c": psutil.sensors_temperatures().get("cpu_thermal", [{}])[0].get("current", 0.0),
"disk_root_pct": psutil.disk_usage("/").percent,
"services": {s: svc_state(s) for s in SERVICES},
"ts": datetime.now(timezone.utc).isoformat()
}
client.publish(TOPIC, payload=json.dumps(payload), qos=1, retain=False)
time.sleep(10)Run as service
Service wrapper ensures restart behavior and predictable startup order.
Systemd conversion is a teaching step, not just packaging. It moves the publisher from an interactive script into an operable service with restart policy, dependency ordering, and journal visibility. That transition is what makes telemetry trustworthy after reboot and unattended failures.
sudo apt-get install -y python3-psutil python3-pip python3 -m pip install --break-system-packages paho-mqtt sudo mkdir -p /opt/mqtt sudo tee /etc/systemd/system/pi4-health-pub.service >/dev/null <<'EOF' [Unit] Description=Pi4 MQTT health publisher After=network-online.target mosquitto.service [Service] ExecStart=/usr/bin/python3 /opt/mqtt/pi4_health_pub.py Restart=always RestartSec=2 User=pi Group=pi [Install] WantedBy=multi-user.target EOF sudo systemctl daemon-reload sudo systemctl enable --now pi4-health-pub sudo systemctl status pi4-health-pub --no-pager
Checkpoint: live health JSON messages appear on the broker topic every 10 seconds.
Consume MQTT into PostgreSQL
Objective: persist MQTT health messages in SQL for analytics and alerting.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Create SQL table and ingestor
This table stores broker metadata, topic, and payload for replay and debugging. Ingestor subscribes to wildcard health topics and inserts rows.
The key design decision here is raw payload preservation. By storing original JSON in JSONB, you keep forward compatibility when publishers add new keys. This avoids brittle migrations every time telemetry evolves and lets you reprocess historical messages with improved analytics logic later.
psql -U dns_user -d dns_analytics -h localhost <<'EOF' CREATE TABLE IF NOT EXISTS mqtt_messages ( id BIGSERIAL PRIMARY KEY, ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), topic TEXT NOT NULL, qos SMALLINT NOT NULL, payload JSONB NOT NULL ); CREATE INDEX IF NOT EXISTS idx_mqtt_messages_ts ON mqtt_messages (ts DESC); CREATE INDEX IF NOT EXISTS idx_mqtt_messages_topic ON mqtt_messages (topic); EOF cat <<'EOF' > /opt/mqtt/mqtt_sql_ingestor.py import json import psycopg import paho.mqtt.client as mqtt MQTT_USER = "sql_ingestor" MQTT_PASS = "ChangeIngestorPassword!" conn = psycopg.connect("host=localhost dbname=dns_analytics user=dns_user password=ChangeThisPassword!") def on_connect(client, userdata, flags, rc): client.subscribe("mqtt/pi4/health/#", qos=1) def on_message(client, userdata, msg): body = msg.payload.decode("utf-8") try: payload = json.loads(body) except json.JSONDecodeError: payload = {"raw": body} with conn, conn.cursor() as cur: cur.execute( "INSERT INTO mqtt_messages(topic, qos, payload) VALUES (%s, %s, %s::jsonb)", (msg.topic, msg.qos, json.dumps(payload)), ) client = mqtt.Client(client_id="sql-mqtt-ingestor") client.username_pw_set(MQTT_USER, MQTT_PASS) client.on_connect = on_connect client.on_message = on_message client.connect("127.0.0.1", 1883, 30) client.loop_forever() EOF python3 /opt/mqtt/mqtt_sql_ingestor.py
Checkpoint: new rows appear in mqtt_messages while publisher is running.
Tune Retain, QoS, and Last Will Behavior
Objective: choose delivery semantics deliberately instead of defaulting everything.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Teaching Lens
This lesson teaches where message durability helps and where it creates noise.
This lesson also focuses on operational reasoning: what healthy behavior looks like, what failure signals look like, and how this step protects the reliability of the lessons that come next.
Durability settings are architecture decisions, not defaults to copy. QoS 1 is appropriate for health state transitions because duplicates are acceptable but loss is not. For fast telemetry streams, retain can backfire by replaying stale data to new subscribers and creating false operational conclusions.
The deeper lesson is semantic clarity. A retained Last Will topic communicates current node presence, while non-retained periodic metrics communicate live time-series behavior. Keeping those roles separate prevents dashboards and automation from mixing historical snapshots with real-time state.
Recommended profile
Apply this profile to avoid stale telemetry floods and still keep reliable state transitions.
The configuration below encodes intent. Telemetry streams are transient observations and should not be retained, while liveness state must be immediately visible to late subscribers. Keeping those semantics separate prevents false positives in monitoring and prevents stale state from contaminating real-time decision logic.
mqtt/pi4/health/json -> qos=1, retain=false mqtt/pi4/health/lwt -> qos=1, retain=true mqtt/pi4/alerts/# -> qos=1, retain=true for latest alert state mqtt/pi4/debug/# -> qos=0, retain=false
Checkpoint: subscriber restart receives current LWT state immediately and only fresh telemetry onward.
Enable TLS for Broker Clients
Objective: protect credentials and payload integrity when broker traffic leaves localhost paths.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Create local CA and broker certificate
This creates private PKI material for your home network and configures a TLS listener on 8883.
Do this step methodically because TLS failures often look like generic connection failures at the client side. By generating a local CA and broker certificate explicitly, you gain a trust chain you control and can rotate, rather than relying on insecure plaintext credentials for non-local traffic.
sudo mkdir -p /etc/mosquitto/certs cd /etc/mosquitto/certs sudo openssl genrsa -out ca.key 4096 sudo openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.crt -subj "/CN=codeandcore-mqtt-ca" sudo openssl genrsa -out broker.key 2048 sudo openssl req -new -key broker.key -out broker.csr -subj "/CN=10.10.10.250" sudo openssl x509 -req -in broker.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out broker.crt -days 825 -sha256 sudo tee /etc/mosquitto/conf.d/tls.conf >/dev/null <<'EOF' listener 8883 0.0.0.0 cafile /etc/mosquitto/certs/ca.crt certfile /etc/mosquitto/certs/broker.crt keyfile /etc/mosquitto/certs/broker.key require_certificate false allow_anonymous false EOF sudo chown mosquitto:mosquitto /etc/mosquitto/certs/broker.key sudo chmod 640 /etc/mosquitto/certs/broker.key sudo systemctl restart mosquitto
Keep 1883 only for localhost clients during migration. Move remote clients to 8883 then close 1883 on LAN.
Checkpoint: authenticated TLS clients connect successfully on port 8883.
Operate MQTT with Real Observability
Objective: create an operator runbook for outages, stalled consumers, and schema drift.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Runbook commands
These checks identify whether failures come from publisher, broker, consumer, or database layers.
Use this block as layered diagnosis, not random command sampling. Read from service health to bus visibility to database persistence in that order. This sequence narrows fault scope quickly and keeps incident response deterministic when telemetry appears to stall.
sudo systemctl status mosquitto --no-pager sudo journalctl -u mosquitto -n 80 --no-pager sudo systemctl status pi4-health-pub --no-pager sudo systemctl status mqtt-sql-ingestor --no-pager mosquitto_sub -h 127.0.0.1 -u sql_ingestor -P 'ChangeIngestorPassword!' -t mqtt/pi4/health/# -C 3 -v psql -U dns_user -d dns_analytics -h localhost -c "SELECT topic, ts FROM mqtt_messages ORDER BY id DESC LIMIT 10;"
Create ingestor service
Run your SQL consumer as a systemd service so it auto-recovers from network or database interruptions.
Service-managing the ingestor closes the reliability loop. Without this step, broker uptime can be perfect while data silently stops landing in SQL after a transient exception. A managed unit with restart policy ensures ingestion continuity and makes failures visible through standard host observability tools.
sudo tee /etc/systemd/system/mqtt-sql-ingestor.service >/dev/null <<'EOF' [Unit] Description=MQTT to PostgreSQL ingestor After=network-online.target mosquitto.service postgresql.service [Service] ExecStart=/usr/bin/python3 /opt/mqtt/mqtt_sql_ingestor.py Restart=always RestartSec=2 User=pi Group=pi [Install] WantedBy=multi-user.target EOF sudo systemctl daemon-reload sudo systemctl enable --now mqtt-sql-ingestor sudo systemctl status mqtt-sql-ingestor --no-pager
Checkpoint: ingestor survives restart and rows keep landing during publisher runtime.
Final Validation: End-to-End MQTT Health Pipeline
Objective: prove that publish, broker routing, SQL ingestion, and outage signaling all work together.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Execute final test sequence
This final lesson is intentionally operational. You are verifying both the data plane and the failure plane. The data plane proves that health messages move from publisher to broker to SQL. The failure plane proves that when the publisher dies, the system emits an offline signal instead of silently failing.
Run the commands as a single narrative sequence. Watch live topic output first, then validate SQL persistence, then simulate failure and recovery. This sequence teaches you how to diagnose real outages with evidence instead of assumptions.
sudo systemctl status mosquitto --no-pager
sudo systemctl status pi4-health-pub --no-pager
sudo systemctl status mqtt-sql-ingestor --no-pager
mosquitto_sub -h 127.0.0.1 -u sql_ingestor -P 'ChangeIngestorPassword!' -t mqtt/pi4/health/# -C 5 -v
psql -U dns_user -d dns_analytics -h localhost -c "SELECT topic, ts, payload->>'host' AS host FROM mqtt_messages ORDER BY id DESC LIMIT 10;"
sudo systemctl stop pi4-health-pub
mosquitto_sub -h 127.0.0.1 -u sql_ingestor -P 'ChangeIngestorPassword!' -t mqtt/pi4/health/lwt -C 1 -v
sudo systemctl start pi4-health-pub
mosquitto_sub -h 127.0.0.1 -u sql_ingestor -P 'ChangeIngestorPassword!' -t mqtt/pi4/health/lwt -C 1 -v
psql -U dns_user -d dns_analytics -h localhost -c "SELECT topic, ts FROM mqtt_messages ORDER BY id DESC LIMIT 10;"You are complete when Pi4 health JSON is continuously published to MQTT, consumed into PostgreSQL, and Last Will state transitions are visible during failures.
Build a Rust MQTT Health Publisher
Objective: replace script-level publishing with a compiled Rust publisher that emits structured health telemetry.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Teaching Lens
This lesson teaches Rust crate composition for MQTT, host metrics, and JSON payload contracts.
This lesson also focuses on operational reasoning: what healthy behavior looks like, what failure signals look like, and how this step protects the reliability of the lessons that come next.
The teaching focus here is architecture continuity: language can change while interfaces remain stable. You are moving from a Python publisher to a Rust publisher without changing topics or payload keys, which is exactly how production migrations should work.
You are also learning reliability-by-design in Rust. The publisher loop, Last Will semantics, and serialization path are explicit in code, so you can reason about behavior under load and failure instead of relying on implicit framework behavior.
Create Rust project and dependencies
Use this once to scaffold a dedicated publisher binary under your edge workspace.
This stage is a controlled migration pattern. You create a separate Rust binary with explicit dependencies so you can test parity with the existing publisher before cutover. Building in parallel preserves operational stability while you gain compiled performance and stronger type guarantees.
source $HOME/.cargo/env cargo new --bin /opt/mqtt/rust-health-publisher cd /opt/mqtt/rust-health-publisher cargo add rumqttc serde serde_json anyhow chrono sysinfo
Implement publisher
This implementation publishes health JSON to mqtt/pi4/health/json and uses LWT on mqtt/pi4/health/lwt. Keep the topic contract unchanged so your current consumer keeps working.
Read the Rust implementation as an explicit reliability model. Connection handling, keepalive, Last Will, serialization, and publish cadence are all directly visible in code. That transparency is one reason Rust works well in edge systems where hidden runtime behavior can be operationally expensive.
use anyhow::Result;
use chrono::Utc;
use rumqttc::{Client, LastWill, MqttOptions, QoS};
use serde::Serialize;
use std::thread;
use std::time::Duration;
use sysinfo::{Disks, System};
#[derive(Serialize)]
struct HealthPayload {
host: String,
ip: String,
cpu_pct: f32,
mem_pct: f32,
temp_c: f32,
disk_root_pct: f32,
ts: String,
}
fn main() -> Result<()> {
let mut mqttoptions = MqttOptions::new("pi4-rust-health-publisher", "127.0.0.1", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(30));
mqttoptions.set_credentials("pi4_publisher", "ChangePublisherPassword!");
mqttoptions.set_last_will(LastWill::new(
"mqtt/pi4/health/lwt",
"offline",
QoS::AtLeastOnce,
true,
));
let (mut client, mut connection) = Client::new(mqttoptions, 10);
thread::spawn(move || for _ in connection.iter() {});
client.publish("mqtt/pi4/health/lwt", QoS::AtLeastOnce, true, "online")?;
loop {
let mut sys = System::new_all();
sys.refresh_all();
let disks = Disks::new_with_refreshed_list();
let cpu_pct = sys.global_cpu_usage();
let mem_pct = if sys.total_memory() == 0 {
0.0
} else {
(sys.used_memory() as f32 / sys.total_memory() as f32) * 100.0
};
let mut disk_root_pct = 0.0;
for d in disks.list() {
if d.mount_point().to_string_lossy() == "/" {
let total = d.total_space() as f32;
let avail = d.available_space() as f32;
if total > 0.0 {
disk_root_pct = ((total - avail) / total) * 100.0;
}
}
}
let payload = HealthPayload {
host: "pi-dns-core".to_string(),
ip: "10.10.10.250".to_string(),
cpu_pct,
mem_pct,
temp_c: 0.0,
disk_root_pct,
ts: Utc::now().to_rfc3339(),
};
let body = serde_json::to_string(&payload)?;
client.publish("mqtt/pi4/health/json", QoS::AtLeastOnce, false, body)?;
thread::sleep(Duration::from_secs(10));
}
}Compile and smoke test
Build in release mode and watch the broker topic directly to confirm payloads and cadence.
This smoke test validates behavior rather than just compilation. You are confirming that runtime publish cadence, authentication, and payload shape are all correct under the real broker path. If this test passes, migration risk is significantly reduced before service cutover.
cd /opt/mqtt/rust-health-publisher cargo build --release ./target/release/rust-health-publisher mosquitto_sub -h 127.0.0.1 -u sql_ingestor -P 'ChangeIngestorPassword!' -t mqtt/pi4/health/json -C 2 -v
Checkpoint: Rust publisher sends valid JSON messages at expected intervals and existing ingestor can parse them.
Run Rust Publisher as a Managed Service
Objective: operate the Rust publisher as a production daemon with restart behavior and log visibility.
Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.
Install and service-wrap
This wraps your compiled binary in systemd and lets you phase out the Python publisher when ready.
Cutover should be deliberate: install binary, register service, disable old publisher, then verify continuity at both topic and SQL layers. Treat this as an operational migration with rollback awareness, not a simple replacement command sequence.
cd /opt/mqtt/rust-health-publisher cargo build --release sudo install -m 0755 target/release/rust-health-publisher /usr/local/bin/pi4-rust-health-publisher sudo tee /etc/systemd/system/pi4-rust-health-pub.service >/dev/null <<'EOF' [Unit] Description=Pi4 Rust MQTT health publisher After=network-online.target mosquitto.service [Service] ExecStart=/usr/local/bin/pi4-rust-health-publisher Restart=always RestartSec=2 User=pi Group=pi NoNewPrivileges=true [Install] WantedBy=multi-user.target EOF sudo systemctl daemon-reload sudo systemctl disable --now pi4-health-pub sudo systemctl enable --now pi4-rust-health-pub sudo systemctl status pi4-rust-health-pub --no-pager sudo journalctl -u pi4-rust-health-pub -n 60 --no-pager
You verify service restarts automatically, publishes after reboot, and keeps SQL ingestion flowing.
Checkpoint: Rust daemon is the active publisher in production and Python publisher is retired cleanly.