#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import sys import threading import requests import json from pathlib import Path from slixmpp import ClientXMPP import paho.mqtt.client as mqtt sys.path.insert(0, "/opt/agent") from skills.loader import load_skills, run_skills # ── CONFIG ─────────────────────────────────────────────────────────────── CONFIG_DIR = Path("/opt/agent/config") CONFIG_FILE = CONFIG_DIR / "config.json" PROMPT_FILE = CONFIG_DIR / "system_prompt.txt" def load_config(): with open(CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) def load_system_prompt(): with open(PROMPT_FILE, "r", encoding="utf-8") as f: return f.read() cfg = load_config() OLLAMA_URL = cfg["ollama_url"] MODEL = cfg["model"] XMPP_JID = cfg["xmpp_jid"] XMPP_PASS = cfg["xmpp_pass"] ADMIN_JID = cfg["admin_jid"] MQTT_HOST = cfg.get("mqtt_host", "localhost") MQTT_PORT = int(cfg.get("mqtt_port", 1883)) MQTT_INBOX = "agents/agent1/inbox" SYSTEM_PROMPT = load_system_prompt() load_skills() conversation_history = [] xmpp_bot = None # référence globale pour répondre via XMPP depuis MQTT AGENTS_ONLINE = {} # {agent_name: {status, jid, mqtt_inbox, last_seen}} REGISTRY_FILE = CONFIG_DIR / "agents_registry.json" AGENTS_ONLINE_FILE = CONFIG_DIR / "agents_online.json" def _get_agents_context() -> str: """Construit dynamiquement la liste des agents (registre + statut en ligne).""" try: registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8")) except Exception: registry = {} if not registry: return "Aucun agent enregistré." lines = ["Agents disponibles :"] for name, info in registry.items(): online = name in AGENTS_ONLINE and AGENTS_ONLINE[name].get("status") == "online" status = "[EN LIGNE]" if online else "[hors ligne]" lines.append("- {} {} : {}".format(name, status, info.get("speciality", ""))) return "\n".join(lines) # ── LLM ────────────────────────────────────────────────────────────────── def call_ollama(messages: list) -> str: payload = { "model" : MODEL, "messages": messages, "stream" : False, "options" : {"temperature": 0.3} } response = requests.post(OLLAMA_URL, json=payload, timeout=180) return response.json()["message"]["content"] def ask_llm(user_message: str, history: list = None) -> str: if history is None: history = conversation_history history.append({"role": "user", "content": user_message}) full_prompt = SYSTEM_PROMPT + "\n\n" + _get_agents_context() messages = [{"role": "system", "content": full_prompt}] + history try: MAX_STEPS = 10 for _ in range(MAX_STEPS): reply = call_ollama(messages) skill_triggered, result = run_skills(reply) if not skill_triggered: history.append({"role": "assistant", "content": reply}) return reply messages.append({"role": "assistant", "content": reply}) messages.append({"role": "user", "content": "[Résultat skill]\n" + result}) reply = call_ollama(messages) history.append({"role": "assistant", "content": reply}) return reply except Exception as e: err = "Erreur : " + str(e) history.append({"role": "assistant", "content": err}) return err # ── MQTT LISTENER (pour CLI) ────────────────────────────────────────────── mqtt_pub_client = None def mqtt_publish(topic: str, message: str): if mqtt_pub_client: mqtt_pub_client.publish(topic, message) def on_mqtt_message(client, userdata, msg): raw = msg.payload.decode(errors="replace") # Support JSON avec reply_to optionnel reply_to = "agents/cli/outbox" task = raw try: data = json.loads(raw) task = data.get("task", raw) reply_to = data.get("reply_to", reply_to) except json.JSONDecodeError: pass print("[MQTT] Message CLI reçu : {}".format(task[:80])) mqtt_history = [] reply = ask_llm(task, history=mqtt_history) mqtt_publish(reply_to, reply) print("[MQTT] Réponse envoyée sur {}".format(reply_to)) def on_mqtt_error(client, userdata, msg): """Reçoit les erreurs des agents et notifie l'utilisateur via XMPP.""" try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") task = data.get("task", "?") error = data.get("error", "?") source = data.get("source", "?") notif = "[ERREUR][{}] Agent : {}\nTâche : {}\nErreur : {}".format( source.upper(), agent, task[:100], error[:300]) print(notif) if xmpp_bot: xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') except Exception as e: print("[MQTT] Erreur parsing notification : {}".format(e)) def on_mqtt_notification(client, userdata, msg): """Reçoit les notifications du scheduler.""" try: data = json.loads(msg.payload.decode(errors="replace")) status = data.get("status", "?") agent = data.get("agent", "?") task = data.get("task", "?")[:80] ts = data.get("timestamp", "?") # Notifier XMPP seulement en cas d'erreur ou de succès important if status == "error" and xmpp_bot: notif = "[PLANIF ERREUR] {} | {} → {}\nStatut : {}".format(ts, agent, task, status) xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') except Exception as e: print("[MQTT] Erreur parsing notification scheduler : {}".format(e)) def on_mqtt_status(client, userdata, msg): """Suit le statut en ligne/hors-ligne des agents (LWT + retain).""" import time try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") status = data.get("status", "?") was_online = AGENTS_ONLINE.get(agent, {}).get("status") == "online" AGENTS_ONLINE[agent] = {**data, "last_seen": time.time()} # Sauvegarder pour la skill agents_online AGENTS_ONLINE_FILE.write_text( json.dumps(AGENTS_ONLINE, indent=2, ensure_ascii=False), encoding="utf-8") # Notifier sylvain uniquement si le statut change is_online = status == "online" if is_online == was_online: return emoji = "[EN LIGNE]" if is_online else "[HORS LIGNE]" print("[STATUS] {} → {}".format(agent, status)) if xmpp_bot: xmpp_bot.send_message(mto=ADMIN_JID, mbody="{} {}".format(emoji, agent), mtype='chat') except Exception as e: print("[MQTT] Erreur parsing status : {}".format(e)) def on_mqtt_register(client, userdata, msg): """Reçoit les déclarations de mise en ligne des agents et met à jour le registre.""" try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") jid = data.get("jid", "?") mqtt_inbox = data.get("mqtt_inbox", "?") speciality = data.get("speciality", "") print("[REGISTER] {} en ligne (JID: {}, inbox: {})".format(agent, jid, mqtt_inbox)) # Mettre à jour agents_registry.json registry_file = CONFIG_DIR / "agents_registry.json" try: registry = json.loads(registry_file.read_text(encoding="utf-8")) except Exception: registry = {} is_new = agent not in registry registry[agent] = { "jid" : jid, "mqtt_inbox" : mqtt_inbox, "mqtt_outbox": "agents/agent1/inbox", "speciality" : speciality, } registry_file.write_text(json.dumps(registry, indent=2, ensure_ascii=False), encoding="utf-8") if is_new: print("[REGISTER] {} ajouté au registre.".format(agent)) # Notifier sylvain via XMPP if xmpp_bot: status = "NOUVEAU" if is_new else "EN LIGNE" notif = "[{}] {}\n JID : {}\n MQTT : {}".format(status, agent, jid, mqtt_inbox) if speciality: notif += "\n Rôle : {}".format(speciality) xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') except Exception as e: print("[MQTT] Erreur parsing register : {}".format(e)) def start_mqtt_listener(): global mqtt_pub_client mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_pub") mqtt_pub_client.connect(MQTT_HOST, MQTT_PORT) mqtt_pub_client.loop_start() sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_sub") sub.message_callback_add("agents/agent1/inbox", on_mqtt_message) sub.message_callback_add("agents/errors", on_mqtt_error) sub.message_callback_add("agents/scheduler/notifications", on_mqtt_notification) sub.message_callback_add("agents/register", on_mqtt_register) sub.message_callback_add("agents/status/+", on_mqtt_status) sub.on_message = on_mqtt_message # fallback sub.connect(MQTT_HOST, MQTT_PORT) sub.subscribe([ (MQTT_INBOX, 0), ("agents/errors", 0), ("agents/scheduler/notifications", 0), ("agents/register", 0), ("agents/status/+", 0), ]) print("[MQTT] Agent1 écoute sur {}, agents/errors, agents/status/+, agents/register".format(MQTT_INBOX)) sub.loop_forever() # ── BOT XMPP ───────────────────────────────────────────────────────────── class AgentBot(ClientXMPP): def __init__(self): ClientXMPP.__init__(self, XMPP_JID, XMPP_PASS) self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) self.register_plugin('xep_0030') self.register_plugin('xep_0199') async def session_start(self, event): self.send_presence() await self.get_roster() self.send_message(mto=ADMIN_JID, mbody="Agent1 (orchestrateur) en ligne !", mtype='chat') async def message(self, msg): if msg['type'] not in ('chat', 'normal'): return if str(msg['from']).split('/')[0] != ADMIN_JID: return user_input = msg['body'].strip() if user_input == "!reset": conversation_history.clear() self.send_message(mto=ADMIN_JID, mbody="Conversation reinitialisee.", mtype='chat') return loop = asyncio.get_event_loop() reply = await loop.run_in_executor(None, ask_llm, user_input) self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat') # ── MAIN ───────────────────────────────────────────────────────────────── if __name__ == "__main__": mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True) mqtt_thread.start() xmpp_bot = AgentBot() xmpp_bot.connect() xmpp_bot.loop.run_forever()