""" Skill : MQTT_PUBLISH / MQTT_SUBSCRIBE Permet à l'agent de communiquer avec d'autres agents via MQTT. Commandes : MQTT_PUBLISH: | → Publie un message sur un topic MQTT MQTT_SUBSCRIBE: → Lit le dernier message reçu sur un topic (via fichier cache) """ import json import paho.mqtt.publish as publish import paho.mqtt.client as mqtt from pathlib import Path SKILL_NAME = "mqtt" TRIGGER = None TRIGGERS = { "MQTT_PUBLISH:": "mqtt_publish", "MQTT_SUBSCRIBE:": "mqtt_subscribe", } CONFIG_FILE = Path("/opt/agent/config/config.json") CACHE_DIR = Path("/opt/agent/mqtt_cache") def _get_cfg(): with open(CONFIG_FILE) as f: cfg = json.load(f) return ( cfg.get("mqtt_host", "localhost"), int(cfg.get("mqtt_port", 1883)), cfg.get("mqtt_client_id", "agent1"), ) def mqtt_publish(args: str) -> str: if "|" not in args: return "Erreur : format attendu → MQTT_PUBLISH: | " topic, _, message = args.partition("|") topic, message = topic.strip(), message.strip() if not topic or not message: return "Erreur : topic ou message vide." try: host, port, _ = _get_cfg() publish.single(topic, payload=message, hostname=host, port=port) return "Message publié sur «{}» : {}".format(topic, message) except Exception as e: return "Erreur MQTT_PUBLISH : {}".format(e) def mqtt_subscribe(args: str) -> str: topic = args.strip() if not topic: return "Erreur : topic vide." try: host, port, client_id = _get_cfg() received = [] def on_message(client, userdata, msg): received.append(msg.payload.decode(errors="replace")) client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id + "_sub") client.on_message = on_message client.connect(host, port, keepalive=5) client.subscribe(topic) client.loop_start() import time time.sleep(2) client.loop_stop() client.disconnect() if received: return "Message reçu sur «{}» :\n{}".format(topic, "\n".join(received)) return "Aucun message reçu sur «{}» (2s d'attente).".format(topic) except Exception as e: return "Erreur MQTT_SUBSCRIBE : {}".format(e)