#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import sys import threading import requests import json import time from pathlib import Path from datetime import datetime from slixmpp import ClientXMPP import paho.mqtt.client as mqtt from apscheduler.schedulers.background import BackgroundScheduler sys.path.insert(0, "/opt/agent") from skills.loader import load_skills, run_skills # ── CONFIG ─────────────────────────────────────────────────────────────── CONFIG_DIR = Path("/opt/agent/config") CONFIG_FILE = CONFIG_DIR / "config.json" PROMPT_FILE = CONFIG_DIR / "system_prompt.txt" def load_config(): with open(CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) def load_system_prompt(): with open(PROMPT_FILE, "r", encoding="utf-8") as f: return f.read() cfg = load_config() OLLAMA_URL = cfg["ollama_url"] MODEL = cfg["model"] XMPP_JID = cfg["xmpp_jid"] XMPP_PASS = cfg["xmpp_pass"] ADMIN_JID = cfg["admin_jid"] MQTT_HOST = cfg.get("mqtt_host", "localhost") MQTT_PORT = int(cfg.get("mqtt_port", 1883)) MQTT_INBOX = "agents/agent1/inbox" SYSTEM_PROMPT = load_system_prompt() load_skills() conversation_history = [] xmpp_bot = None AGENTS_ONLINE = {} REGISTRY_FILE = CONFIG_DIR / "agents_registry.json" AGENTS_ONLINE_FILE = CONFIG_DIR / "agents_online.json" REPORTS_FILE = CONFIG_DIR / "reports_schedule.json" TASKS_FILE = CONFIG_DIR / "tasks_schedule.json" BLACKOUT_FILE = CONFIG_DIR / "blackout_hours.json" # ── MODE VEILLE ─────────────────────────────────────────────────────────── SLEEP_MODE = False # ── CONFIRMATION CONFIG EN ATTENTE ──────────────────────────────────────── # {"file": "reports_schedule" | "tasks_schedule", "content": dict, "description": str} PENDING_CONFIG = None # ── RAPPORTS JOURNALIERS (reçus des agents) ─────────────────────────────── daily_reports = {} # {agent_name: payload_dict} # ── SCHEDULER ───────────────────────────────────────────────────────────── scheduler = BackgroundScheduler() # ── AGENTS CONTEXT ──────────────────────────────────────────────────────── def _get_agents_context() -> str: try: registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8")) except Exception: registry = {} if not registry: return "Aucun agent enregistré." lines = ["Agents disponibles :"] for name, info in registry.items(): online = name in AGENTS_ONLINE and AGENTS_ONLINE[name].get("status") == "online" status = "[EN LIGNE]" if online else "[hors ligne]" lines.append("- {} {} : {}".format(name, status, info.get("speciality", ""))) return "\n".join(lines) # ── LLM ────────────────────────────────────────────────────────────────── def call_ollama(messages: list) -> str: payload = { "model" : MODEL, "messages": messages, "stream" : False, "options" : {"temperature": 0.3} } response = requests.post(OLLAMA_URL, json=payload, timeout=180) return response.json()["message"]["content"] def ask_llm(user_message: str, history: list = None) -> str: if history is None: history = conversation_history history.append({"role": "user", "content": user_message}) full_prompt = SYSTEM_PROMPT + "\n\n" + _get_agents_context() messages = [{"role": "system", "content": full_prompt}] + history try: MAX_STEPS = 10 for _ in range(MAX_STEPS): reply = call_ollama(messages) skill_triggered, result = run_skills(reply) if not skill_triggered: history.append({"role": "assistant", "content": reply}) return reply messages.append({"role": "assistant", "content": reply}) messages.append({"role": "user", "content": "[Résultat skill]\n" + result}) reply = call_ollama(messages) history.append({"role": "assistant", "content": reply}) return reply except Exception as e: err = "Erreur : " + str(e) history.append({"role": "assistant", "content": err}) return err # ── MQTT ────────────────────────────────────────────────────────────────── mqtt_pub_client = None def mqtt_publish(topic: str, message: str): if mqtt_pub_client: mqtt_pub_client.publish(topic, message) def _send_control(agent_name: str, command: str): """Envoie une commande de contrôle à un agent via MQTT.""" payload = json.dumps({"command": command}) mqtt_publish("agents/{}/control".format(agent_name), payload) print("[CONTROL] {} → {}".format(agent_name, command)) def _get_all_agents() -> list: """Retourne la liste de tous les agents connus (registre).""" try: registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8")) return [n for n in registry if n != "agent1"] except Exception: return [] # ── COMMANDES !agent ────────────────────────────────────────────────────── def _handle_agent_command(text: str) -> str | None: """ Gère les commandes !agentON/OFF et !agentsON/OFF. Retourne la réponse ou None si ce n'est pas une commande agent. """ global SLEEP_MODE t = text.strip() # !agentsOFF — veille agent1 + pause tous les agents if t == "!agentsOFF": agents = _get_all_agents() for a in agents: _send_control(a, "pause") SLEEP_MODE = True return "[VEILLE] Agent1 en veille. {} agent(s) mis en pause.\nEnvoyez !agentsON ou !agentON agent1 pour reprendre.".format(len(agents)) # !agentsON — sortie veille agent1 + resume tous les agents if t == "!agentsON": agents = _get_all_agents() for a in agents: _send_control(a, "resume") SLEEP_MODE = False return "[ACTIF] Agent1 actif. {} agent(s) relancés.".format(len(agents)) # !agentOFF if t.startswith("!agentOFF "): name = t[len("!agentOFF "):].strip() if name == "agent1": SLEEP_MODE = True return "[VEILLE] Agent1 en veille. Envoyez !agentON agent1 pour reprendre." _send_control(name, "pause") return "[PAUSE] Commande pause envoyée à {}.".format(name) # !agentON if t.startswith("!agentON "): name = t[len("!agentON "):].strip() if name == "agent1": SLEEP_MODE = False return "[ACTIF] Agent1 actif." _send_control(name, "resume") return "[ACTIF] Commande resume envoyée à {}.".format(name) return None # ── GESTION CONFIGS AVEC CONFIRMATION ──────────────────────────────────── def _handle_config_command(text: str) -> str | None: """Affiche ou propose une modification des fichiers de config.""" global PENDING_CONFIG t = text.strip() if t == "!reports": try: data = json.loads(REPORTS_FILE.read_text()) return "=== reports_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False) except Exception as e: return "Erreur lecture : {}".format(e) if t == "!tasks": try: data = json.loads(TASKS_FILE.read_text()) return "=== tasks_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False) except Exception as e: return "Erreur lecture : {}".format(e) if t == "!blackout": try: data = json.loads(BLACKOUT_FILE.read_text()) return "=== blackout_hours.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False) except Exception as e: return "Erreur lecture : {}".format(e) return None def _apply_pending_config() -> str: global PENDING_CONFIG if not PENDING_CONFIG: return "Aucune modification en attente." try: file_key = PENDING_CONFIG["file"] content = PENDING_CONFIG["content"] desc = PENDING_CONFIG["description"] if file_key == "reports_schedule": REPORTS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False)) _reload_report_scheduler() elif file_key == "tasks_schedule": TASKS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False)) _reload_task_scheduler() PENDING_CONFIG = None return "Modification appliquée : {}".format(desc) except Exception as e: PENDING_CONFIG = None return "Erreur lors de l'application : {}".format(e) # ── RAPPORT JOURNALIER ──────────────────────────────────────────────────── def _solicit_report(agent_name: str): """Demande le rapport à un agent via MQTT control.""" print("[REPORT] Sollicitation rapport de {}".format(agent_name)) _send_control(agent_name, "report") def _compile_and_send_daily_report(): """Compile tous les rapports reçus et envoie à Sylvain.""" from skills.daily_report import compile_report report_text = compile_report(daily_reports) print("[REPORT] Rapport journalier compilé, envoi XMPP.") if xmpp_bot: xmpp_bot.send_message(mto=ADMIN_JID, mbody=report_text, mtype='chat') def _reload_report_scheduler(): """(Re)planifie les sollicitations de rapports selon reports_schedule.json.""" try: data = json.loads(REPORTS_FILE.read_text()) agents = data.get("agents", {}) for name, conf in agents.items(): if not conf.get("enabled", True): continue t = conf["report_time"] # "HH:MM" hour, minute = map(int, t.split(":")) job_id = "report_{}".format(name) try: scheduler.remove_job(job_id) except Exception: pass scheduler.add_job(_solicit_report, "cron", args=[name], hour=hour, minute=minute, id=job_id, replace_existing=True) print("[SCHEDULER] Rapport {} planifié à {}".format(name, t)) # Rapport journalier compilé daily_t = data.get("daily_report_time", "22:30") if data.get("daily_report_enabled", True): dh, dm = map(int, daily_t.split(":")) try: scheduler.remove_job("daily_report") except Exception: pass scheduler.add_job(_compile_and_send_daily_report, "cron", hour=dh, minute=dm, id="daily_report", replace_existing=True) print("[SCHEDULER] Rapport journalier planifié à {}".format(daily_t)) except Exception as e: print("[SCHEDULER] Erreur reload report scheduler : {}".format(e)) def _reload_task_scheduler(): """(Re)planifie les tâches selon tasks_schedule.json.""" try: data = json.loads(TASKS_FILE.read_text()) tasks = data.get("tasks", []) # Supprimer anciens jobs tasks_* for job in scheduler.get_jobs(): if job.id.startswith("task_"): scheduler.remove_job(job.id) for i, t in enumerate(tasks): if not t.get("enabled", True): continue agent = t["agent"] task = t["task"] cron = t["cron"] # ex: "daily 03:00" ou "every 6h" job_id = "task_{}_{}".format(agent, i) _schedule_task_job(job_id, agent, task, cron) print("[SCHEDULER] Tâche planifiée [{} @ {}] → {}".format(agent, cron, task[:50])) except Exception as e: print("[SCHEDULER] Erreur reload task scheduler : {}".format(e)) def _schedule_task_job(job_id: str, agent: str, task: str, cron_expr: str): """Planifie une tâche via APScheduler selon l'expression cron.""" from skills.delegate import execute as delegate_exec def run(): print("[SCHEDULER] Exécution tâche {} → {}".format(agent, task[:60])) delegate_exec("{} | {}".format(agent, task)) expr = cron_expr.strip() if expr.startswith("daily "): t = expr[6:].strip() h, m = map(int, t.split(":")) scheduler.add_job(run, "cron", hour=h, minute=m, id=job_id, replace_existing=True) elif expr.startswith("every ") and expr.endswith("h"): hours = int(expr[6:-1]) scheduler.add_job(run, "interval", hours=hours, id=job_id, replace_existing=True) elif expr.startswith("every ") and expr.endswith("min"): mins = int(expr[6:-3]) scheduler.add_job(run, "interval", minutes=mins, id=job_id, replace_existing=True) elif expr.startswith("weekly "): parts = expr[7:].split() day_map = {"lun":"mon","mar":"tue","mer":"wed","jeu":"thu","ven":"fri","sam":"sat","dim":"sun"} day = day_map.get(parts[0], parts[0]) h, m = map(int, parts[1].split(":")) scheduler.add_job(run, "cron", day_of_week=day, hour=h, minute=m, id=job_id, replace_existing=True) # ── MQTT HANDLERS ───────────────────────────────────────────────────────── def on_mqtt_message(client, userdata, msg): raw = msg.payload.decode(errors="replace") reply_to = "agents/cli/outbox" task = raw try: data = json.loads(raw) task = data.get("task", raw) reply_to = data.get("reply_to", reply_to) except json.JSONDecodeError: pass print("[MQTT] Message CLI reçu : {}".format(task[:80])) mqtt_history = [] reply = ask_llm(task, history=mqtt_history) mqtt_publish(reply_to, reply) def on_mqtt_error(client, userdata, msg): try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") task = data.get("task", "?") error = data.get("error", "?") source = data.get("source", "?") notif = "[ERREUR][{}] Agent : {}\nTâche : {}\nErreur : {}".format( source.upper(), agent, task[:100], error[:300]) print(notif) if xmpp_bot: xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') except Exception as e: print("[MQTT] Erreur parsing notification : {}".format(e)) def on_mqtt_notification(client, userdata, msg): try: data = json.loads(msg.payload.decode(errors="replace")) status = data.get("status", "?") agent = data.get("agent", "?") task = data.get("task", "?")[:80] ts = data.get("timestamp", "?") if status == "error" and xmpp_bot: notif = "[PLANIF ERREUR] {} | {} → {}\nStatut : {}".format(ts, agent, task, status) xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') except Exception as e: print("[MQTT] Erreur parsing notification scheduler : {}".format(e)) def on_mqtt_status(client, userdata, msg): try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") status = data.get("status", "?") was_online = AGENTS_ONLINE.get(agent, {}).get("status") == "online" AGENTS_ONLINE[agent] = {**data, "last_seen": time.time()} AGENTS_ONLINE_FILE.write_text( json.dumps(AGENTS_ONLINE, indent=2, ensure_ascii=False), encoding="utf-8") is_online = status == "online" if is_online == was_online: return emoji = "[EN LIGNE]" if is_online else "[HORS LIGNE]" print("[STATUS] {} → {}".format(agent, status)) if xmpp_bot: xmpp_bot.send_message(mto=ADMIN_JID, mbody="{} {}".format(emoji, agent), mtype='chat') except Exception as e: print("[MQTT] Erreur parsing status : {}".format(e)) def on_mqtt_register(client, userdata, msg): try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") jid = data.get("jid", "?") mqtt_inbox = data.get("mqtt_inbox", "?") speciality = data.get("speciality", "") print("[REGISTER] {} en ligne (JID: {}, inbox: {})".format(agent, jid, mqtt_inbox)) registry_file = CONFIG_DIR / "agents_registry.json" try: registry = json.loads(registry_file.read_text(encoding="utf-8")) except Exception: registry = {} is_new = agent not in registry # Préserver les champs existants (work_hours etc.) lors d'une mise à jour existing = registry.get(agent, {}) existing.update({ "jid" : jid, "mqtt_inbox" : mqtt_inbox, "mqtt_outbox": "agents/agent1/inbox", "speciality" : speciality, }) registry[agent] = existing registry_file.write_text(json.dumps(registry, indent=2, ensure_ascii=False), encoding="utf-8") if xmpp_bot: status = "NOUVEAU" if is_new else "EN LIGNE" notif = "[{}] {}\n JID : {}\n MQTT : {}".format(status, agent, jid, mqtt_inbox) if speciality: notif += "\n Rôle : {}".format(speciality) xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') except Exception as e: print("[MQTT] Erreur parsing register : {}".format(e)) def on_mqtt_daily_report(client, userdata, msg): """Reçoit et stocke les rapports journaliers des agents.""" try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") daily_reports[agent] = data print("[REPORT] Rapport reçu de {} : {} tâches, {} erreurs".format( agent, data.get("tasks_today", 0), data.get("errors", 0))) except Exception as e: print("[MQTT] Erreur parsing daily_report : {}".format(e)) def start_mqtt_listener(): global mqtt_pub_client mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_pub") mqtt_pub_client.connect(MQTT_HOST, MQTT_PORT) mqtt_pub_client.loop_start() sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_sub") sub.message_callback_add("agents/agent1/inbox", on_mqtt_message) sub.message_callback_add("agents/errors", on_mqtt_error) sub.message_callback_add("agents/scheduler/notifications", on_mqtt_notification) sub.message_callback_add("agents/register", on_mqtt_register) sub.message_callback_add("agents/status/+", on_mqtt_status) sub.message_callback_add("agents/daily_report", on_mqtt_daily_report) sub.on_message = on_mqtt_message sub.connect(MQTT_HOST, MQTT_PORT) sub.subscribe([ (MQTT_INBOX, 0), ("agents/errors", 0), ("agents/scheduler/notifications", 0), ("agents/register", 0), ("agents/status/+", 0), ("agents/daily_report", 0), ]) print("[MQTT] Agent1 écoute sur {}, errors, status/+, register, daily_report".format(MQTT_INBOX)) sub.loop_forever() # ── BOT XMPP ───────────────────────────────────────────────────────────── class AgentBot(ClientXMPP): def __init__(self): ClientXMPP.__init__(self, XMPP_JID, XMPP_PASS) self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) self.register_plugin('xep_0030') self.register_plugin('xep_0199') async def session_start(self, event): self.send_presence() await self.get_roster() self.send_message(mto=ADMIN_JID, mbody="Agent1 (orchestrateur) en ligne !", mtype='chat') async def message(self, msg): global SLEEP_MODE, PENDING_CONFIG if msg['type'] not in ('chat', 'normal'): return if str(msg['from']).split('/')[0] != ADMIN_JID: return user_input = msg['body'].strip() # ── Commandes !agentON/OFF (prioritaires, toujours traitées) ────── agent_reply = _handle_agent_command(user_input) if agent_reply is not None: self.send_message(mto=ADMIN_JID, mbody=agent_reply, mtype='chat') return # ── Mode veille : ignorer tout sauf commandes agent ─────────────── if SLEEP_MODE: self.send_message( mto=ADMIN_JID, mbody="[VEILLE] Agent1 en veille. Envoyez !agentsON ou !agentON agent1 pour reprendre.", mtype='chat') return # ── Commandes config ────────────────────────────────────────────── config_reply = _handle_config_command(user_input) if config_reply is not None: self.send_message(mto=ADMIN_JID, mbody=config_reply, mtype='chat') return # ── Confirmation modification config en attente ─────────────────── if PENDING_CONFIG is not None: if user_input.lower() in ("oui", "yes", "o", "confirme", "ok"): reply = _apply_pending_config() else: PENDING_CONFIG = None reply = "Modification annulée." self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat') return # ── Reset conversation ───────────────────────────────────────────── if user_input == "!reset": conversation_history.clear() self.send_message(mto=ADMIN_JID, mbody="Conversation reinitialisee.", mtype='chat') return # ── Traitement LLM normal ───────────────────────────────────────── loop = asyncio.get_event_loop() reply = await loop.run_in_executor(None, ask_llm, user_input) self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat') # ── MAIN ───────────────────────────────────────────────────────────────── if __name__ == "__main__": # Démarrer le scheduler scheduler.start() _reload_report_scheduler() _reload_task_scheduler() # Démarrer MQTT dans un thread séparé mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True) mqtt_thread.start() xmpp_bot = AgentBot() xmpp_bot.connect() xmpp_bot.loop.run_forever()