Files
agent2/skills/mqtt_publish.py
T

79 lines
2.4 KiB
Python

"""
Skill : MQTT_PUBLISH / MQTT_SUBSCRIBE
Permet à l'agent de communiquer avec d'autres agents via MQTT.
Commandes :
MQTT_PUBLISH: <topic> | <message>
→ Publie un message sur un topic MQTT
MQTT_SUBSCRIBE: <topic>
→ Lit le dernier message reçu sur un topic (via fichier cache)
"""
import json
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from pathlib import Path
SKILL_NAME = "mqtt"
TRIGGER = None
TRIGGERS = {
"MQTT_PUBLISH:": "mqtt_publish",
"MQTT_SUBSCRIBE:": "mqtt_subscribe",
}
CONFIG_FILE = Path("/opt/agent/config/config.json")
CACHE_DIR = Path("/opt/agent/mqtt_cache")
def _get_cfg():
with open(CONFIG_FILE) as f:
cfg = json.load(f)
return (
cfg.get("mqtt_host", "localhost"),
int(cfg.get("mqtt_port", 1883)),
cfg.get("mqtt_client_id", "agent1"),
)
def mqtt_publish(args: str) -> str:
if "|" not in args:
return "Erreur : format attendu → MQTT_PUBLISH: <topic> | <message>"
topic, _, message = args.partition("|")
topic, message = topic.strip(), message.strip()
if not topic or not message:
return "Erreur : topic ou message vide."
try:
host, port, _ = _get_cfg()
publish.single(topic, payload=message, hostname=host, port=port)
return "Message publié sur «{}» : {}".format(topic, message)
except Exception as e:
return "Erreur MQTT_PUBLISH : {}".format(e)
def mqtt_subscribe(args: str) -> str:
topic = args.strip()
if not topic:
return "Erreur : topic vide."
try:
host, port, client_id = _get_cfg()
received = []
def on_message(client, userdata, msg):
received.append(msg.payload.decode(errors="replace"))
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id=client_id + "_sub")
client.on_message = on_message
client.connect(host, port, keepalive=5)
client.subscribe(topic)
client.loop_start()
import time
time.sleep(2)
client.loop_stop()
client.disconnect()
if received:
return "Message reçu sur «{}» :\n{}".format(topic, "\n".join(received))
return "Aucun message reçu sur «{}» (2s d'attente).".format(topic)
except Exception as e:
return "Erreur MQTT_SUBSCRIBE : {}".format(e)