#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import sys import threading import requests import json from pathlib import Path from slixmpp import ClientXMPP import paho.mqtt.client as mqtt sys.path.insert(0, "/opt/agent") from skills.loader import load_skills, run_skills # ── CONFIG ─────────────────────────────────────────────────────────────── CONFIG_DIR = Path("/opt/agent/config") CONFIG_FILE = CONFIG_DIR / "config.json" PROMPT_FILE = CONFIG_DIR / "system_prompt.txt" def load_config(): with open(CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) def load_system_prompt(): with open(PROMPT_FILE, "r", encoding="utf-8") as f: return f.read() cfg = load_config() OLLAMA_URL = cfg["ollama_url"] MODEL = cfg["model"] XMPP_JID = cfg["xmpp_jid"] XMPP_PASS = cfg["xmpp_pass"] ADMIN_JID = cfg["admin_jid"] MQTT_HOST = cfg.get("mqtt_host", "localhost") MQTT_PORT = int(cfg.get("mqtt_port", 1883)) MQTT_INBOX = "agents/agent1/inbox" SYSTEM_PROMPT = load_system_prompt() load_skills() conversation_history = [] xmpp_bot = None # référence globale pour répondre via XMPP depuis MQTT # ── LLM ────────────────────────────────────────────────────────────────── def call_ollama(messages: list) -> str: payload = { "model" : MODEL, "messages": messages, "stream" : False, "options" : {"temperature": 0.3} } response = requests.post(OLLAMA_URL, json=payload, timeout=180) return response.json()["message"]["content"] def ask_llm(user_message: str, history: list = None) -> str: if history is None: history = conversation_history history.append({"role": "user", "content": user_message}) messages = [{"role": "system", "content": SYSTEM_PROMPT}] + history try: MAX_STEPS = 10 for _ in range(MAX_STEPS): reply = call_ollama(messages) skill_triggered, result = run_skills(reply) if not skill_triggered: history.append({"role": "assistant", "content": reply}) return reply messages.append({"role": "assistant", "content": reply}) messages.append({"role": "user", "content": "[Résultat skill]\n" + result}) reply = call_ollama(messages) history.append({"role": "assistant", "content": reply}) return reply except Exception as e: err = "Erreur : " + str(e) history.append({"role": "assistant", "content": err}) return err # ── MQTT LISTENER (pour CLI) ────────────────────────────────────────────── mqtt_pub_client = None def mqtt_publish(topic: str, message: str): if mqtt_pub_client: mqtt_pub_client.publish(topic, message) def on_mqtt_message(client, userdata, msg): raw = msg.payload.decode(errors="replace") # Support JSON avec reply_to optionnel reply_to = "agents/cli/outbox" task = raw try: data = json.loads(raw) task = data.get("task", raw) reply_to = data.get("reply_to", reply_to) except json.JSONDecodeError: pass print("[MQTT] Message CLI reçu : {}".format(task[:80])) mqtt_history = [] reply = ask_llm(task, history=mqtt_history) mqtt_publish(reply_to, reply) print("[MQTT] Réponse envoyée sur {}".format(reply_to)) def on_mqtt_error(client, userdata, msg): """Reçoit les erreurs des agents et notifie l'utilisateur via XMPP.""" try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") task = data.get("task", "?") error = data.get("error", "?") source = data.get("source", "?") notif = "[ERREUR][{}] Agent : {}\nTâche : {}\nErreur : {}".format( source.upper(), agent, task[:100], error[:300]) print(notif) if xmpp_bot: xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') except Exception as e: print("[MQTT] Erreur parsing notification : {}".format(e)) def on_mqtt_notification(client, userdata, msg): """Reçoit les notifications du scheduler.""" try: data = json.loads(msg.payload.decode(errors="replace")) status = data.get("status", "?") agent = data.get("agent", "?") task = data.get("task", "?")[:80] ts = data.get("timestamp", "?") # Notifier XMPP seulement en cas d'erreur ou de succès important if status == "error" and xmpp_bot: notif = "[PLANIF ERREUR] {} | {} → {}\nStatut : {}".format(ts, agent, task, status) xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') except Exception as e: print("[MQTT] Erreur parsing notification scheduler : {}".format(e)) def on_mqtt_register(client, userdata, msg): """Reçoit les déclarations de mise en ligne des agents.""" try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") jid = data.get("jid", "?") mqtt_inbox = data.get("mqtt_inbox", "?") speciality = data.get("speciality", "") print("[REGISTER] {} en ligne (JID: {}, inbox: {})".format(agent, jid, mqtt_inbox)) # Notifier sylvain via XMPP if xmpp_bot: notif = "[EN LIGNE] {}\n JID : {}\n MQTT : {}".format(agent, jid, mqtt_inbox) if speciality: notif += "\n Rôle : {}".format(speciality) xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') except Exception as e: print("[MQTT] Erreur parsing register : {}".format(e)) def start_mqtt_listener(): global mqtt_pub_client mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_pub") mqtt_pub_client.connect(MQTT_HOST, MQTT_PORT) mqtt_pub_client.loop_start() sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_sub") sub.message_callback_add("agents/agent1/inbox", on_mqtt_message) sub.message_callback_add("agents/errors", on_mqtt_error) sub.message_callback_add("agents/scheduler/notifications", on_mqtt_notification) sub.message_callback_add("agents/register", on_mqtt_register) sub.on_message = on_mqtt_message # fallback sub.connect(MQTT_HOST, MQTT_PORT) sub.subscribe([ (MQTT_INBOX, 0), ("agents/errors", 0), ("agents/scheduler/notifications", 0), ("agents/register", 0), ]) print("[MQTT] Agent1 écoute sur {}, agents/errors, agents/scheduler/notifications, agents/register".format(MQTT_INBOX)) sub.loop_forever() # ── BOT XMPP ───────────────────────────────────────────────────────────── class AgentBot(ClientXMPP): def __init__(self): ClientXMPP.__init__(self, XMPP_JID, XMPP_PASS) self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) self.register_plugin('xep_0030') self.register_plugin('xep_0199') async def session_start(self, event): self.send_presence() await self.get_roster() self.send_message(mto=ADMIN_JID, mbody="Agent1 (orchestrateur) en ligne !", mtype='chat') async def message(self, msg): if msg['type'] not in ('chat', 'normal'): return if str(msg['from']).split('/')[0] != ADMIN_JID: return user_input = msg['body'].strip() if user_input == "!reset": conversation_history.clear() self.send_message(mto=ADMIN_JID, mbody="Conversation reinitialisee.", mtype='chat') return loop = asyncio.get_event_loop() reply = await loop.run_in_executor(None, ask_llm, user_input) self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat') # ── MAIN ───────────────────────────────────────────────────────────────── if __name__ == "__main__": mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True) mqtt_thread.start() xmpp_bot = AgentBot() xmpp_bot.connect() xmpp_bot.loop.run_forever()