diff --git a/agent1.py b/agent1.py index 0b1edeb..bcff40c 100644 --- a/agent1.py +++ b/agent1.py @@ -6,9 +6,12 @@ import sys import threading import requests import json +import time from pathlib import Path +from datetime import datetime from slixmpp import ClientXMPP import paho.mqtt.client as mqtt +from apscheduler.schedulers.background import BackgroundScheduler sys.path.insert(0, "/opt/agent") from skills.loader import load_skills, run_skills @@ -40,14 +43,30 @@ SYSTEM_PROMPT = load_system_prompt() load_skills() conversation_history = [] -xmpp_bot = None # référence globale pour répondre via XMPP depuis MQTT -AGENTS_ONLINE = {} # {agent_name: {status, jid, mqtt_inbox, last_seen}} +xmpp_bot = None +AGENTS_ONLINE = {} -REGISTRY_FILE = CONFIG_DIR / "agents_registry.json" +REGISTRY_FILE = CONFIG_DIR / "agents_registry.json" AGENTS_ONLINE_FILE = CONFIG_DIR / "agents_online.json" +REPORTS_FILE = CONFIG_DIR / "reports_schedule.json" +TASKS_FILE = CONFIG_DIR / "tasks_schedule.json" +BLACKOUT_FILE = CONFIG_DIR / "blackout_hours.json" +# ── MODE VEILLE ─────────────────────────────────────────────────────────── +SLEEP_MODE = False + +# ── CONFIRMATION CONFIG EN ATTENTE ──────────────────────────────────────── +# {"file": "reports_schedule" | "tasks_schedule", "content": dict, "description": str} +PENDING_CONFIG = None + +# ── RAPPORTS JOURNALIERS (reçus des agents) ─────────────────────────────── +daily_reports = {} # {agent_name: payload_dict} + +# ── SCHEDULER ───────────────────────────────────────────────────────────── +scheduler = BackgroundScheduler() + +# ── AGENTS CONTEXT ──────────────────────────────────────────────────────── def _get_agents_context() -> str: - """Construit dynamiquement la liste des agents (registre + statut en ligne).""" try: registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8")) except Exception: @@ -96,21 +115,226 @@ def ask_llm(user_message: str, history: list = None) -> str: history.append({"role": "assistant", "content": err}) return err -# ── MQTT LISTENER (pour CLI) ────────────────────────────────────────────── +# ── MQTT ────────────────────────────────────────────────────────────────── mqtt_pub_client = None def mqtt_publish(topic: str, message: str): if mqtt_pub_client: mqtt_pub_client.publish(topic, message) +def _send_control(agent_name: str, command: str): + """Envoie une commande de contrôle à un agent via MQTT.""" + payload = json.dumps({"command": command}) + mqtt_publish("agents/{}/control".format(agent_name), payload) + print("[CONTROL] {} → {}".format(agent_name, command)) + +def _get_all_agents() -> list: + """Retourne la liste de tous les agents connus (registre).""" + try: + registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8")) + return [n for n in registry if n != "agent1"] + except Exception: + return [] + +# ── COMMANDES !agent ────────────────────────────────────────────────────── +def _handle_agent_command(text: str) -> str | None: + """ + Gère les commandes !agentON/OFF et !agentsON/OFF. + Retourne la réponse ou None si ce n'est pas une commande agent. + """ + global SLEEP_MODE + + t = text.strip() + + # !agentsOFF — veille agent1 + pause tous les agents + if t == "!agentsOFF": + agents = _get_all_agents() + for a in agents: + _send_control(a, "pause") + SLEEP_MODE = True + return "[VEILLE] Agent1 en veille. {} agent(s) mis en pause.\nEnvoyez !agentsON ou !agentON agent1 pour reprendre.".format(len(agents)) + + # !agentsON — sortie veille agent1 + resume tous les agents + if t == "!agentsON": + agents = _get_all_agents() + for a in agents: + _send_control(a, "resume") + SLEEP_MODE = False + return "[ACTIF] Agent1 actif. {} agent(s) relancés.".format(len(agents)) + + # !agentOFF + if t.startswith("!agentOFF "): + name = t[len("!agentOFF "):].strip() + if name == "agent1": + SLEEP_MODE = True + return "[VEILLE] Agent1 en veille. Envoyez !agentON agent1 pour reprendre." + _send_control(name, "pause") + return "[PAUSE] Commande pause envoyée à {}.".format(name) + + # !agentON + if t.startswith("!agentON "): + name = t[len("!agentON "):].strip() + if name == "agent1": + SLEEP_MODE = False + return "[ACTIF] Agent1 actif." + _send_control(name, "resume") + return "[ACTIF] Commande resume envoyée à {}.".format(name) + + return None + +# ── GESTION CONFIGS AVEC CONFIRMATION ──────────────────────────────────── +def _handle_config_command(text: str) -> str | None: + """Affiche ou propose une modification des fichiers de config.""" + global PENDING_CONFIG + t = text.strip() + + if t == "!reports": + try: + data = json.loads(REPORTS_FILE.read_text()) + return "=== reports_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False) + except Exception as e: + return "Erreur lecture : {}".format(e) + + if t == "!tasks": + try: + data = json.loads(TASKS_FILE.read_text()) + return "=== tasks_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False) + except Exception as e: + return "Erreur lecture : {}".format(e) + + if t == "!blackout": + try: + data = json.loads(BLACKOUT_FILE.read_text()) + return "=== blackout_hours.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False) + except Exception as e: + return "Erreur lecture : {}".format(e) + + return None + +def _apply_pending_config() -> str: + global PENDING_CONFIG + if not PENDING_CONFIG: + return "Aucune modification en attente." + try: + file_key = PENDING_CONFIG["file"] + content = PENDING_CONFIG["content"] + desc = PENDING_CONFIG["description"] + if file_key == "reports_schedule": + REPORTS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False)) + _reload_report_scheduler() + elif file_key == "tasks_schedule": + TASKS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False)) + _reload_task_scheduler() + PENDING_CONFIG = None + return "Modification appliquée : {}".format(desc) + except Exception as e: + PENDING_CONFIG = None + return "Erreur lors de l'application : {}".format(e) + +# ── RAPPORT JOURNALIER ──────────────────────────────────────────────────── +def _solicit_report(agent_name: str): + """Demande le rapport à un agent via MQTT control.""" + print("[REPORT] Sollicitation rapport de {}".format(agent_name)) + _send_control(agent_name, "report") + +def _compile_and_send_daily_report(): + """Compile tous les rapports reçus et envoie à Sylvain.""" + from skills.daily_report import compile_report + report_text = compile_report(daily_reports) + print("[REPORT] Rapport journalier compilé, envoi XMPP.") + if xmpp_bot: + xmpp_bot.send_message(mto=ADMIN_JID, mbody=report_text, mtype='chat') + +def _reload_report_scheduler(): + """(Re)planifie les sollicitations de rapports selon reports_schedule.json.""" + try: + data = json.loads(REPORTS_FILE.read_text()) + agents = data.get("agents", {}) + for name, conf in agents.items(): + if not conf.get("enabled", True): + continue + t = conf["report_time"] # "HH:MM" + hour, minute = map(int, t.split(":")) + job_id = "report_{}".format(name) + try: + scheduler.remove_job(job_id) + except Exception: + pass + scheduler.add_job(_solicit_report, "cron", args=[name], + hour=hour, minute=minute, id=job_id, + replace_existing=True) + print("[SCHEDULER] Rapport {} planifié à {}".format(name, t)) + + # Rapport journalier compilé + daily_t = data.get("daily_report_time", "22:30") + if data.get("daily_report_enabled", True): + dh, dm = map(int, daily_t.split(":")) + try: + scheduler.remove_job("daily_report") + except Exception: + pass + scheduler.add_job(_compile_and_send_daily_report, "cron", + hour=dh, minute=dm, id="daily_report", + replace_existing=True) + print("[SCHEDULER] Rapport journalier planifié à {}".format(daily_t)) + except Exception as e: + print("[SCHEDULER] Erreur reload report scheduler : {}".format(e)) + +def _reload_task_scheduler(): + """(Re)planifie les tâches selon tasks_schedule.json.""" + try: + data = json.loads(TASKS_FILE.read_text()) + tasks = data.get("tasks", []) + # Supprimer anciens jobs tasks_* + for job in scheduler.get_jobs(): + if job.id.startswith("task_"): + scheduler.remove_job(job.id) + for i, t in enumerate(tasks): + if not t.get("enabled", True): + continue + agent = t["agent"] + task = t["task"] + cron = t["cron"] # ex: "daily 03:00" ou "every 6h" + job_id = "task_{}_{}".format(agent, i) + _schedule_task_job(job_id, agent, task, cron) + print("[SCHEDULER] Tâche planifiée [{} @ {}] → {}".format(agent, cron, task[:50])) + except Exception as e: + print("[SCHEDULER] Erreur reload task scheduler : {}".format(e)) + +def _schedule_task_job(job_id: str, agent: str, task: str, cron_expr: str): + """Planifie une tâche via APScheduler selon l'expression cron.""" + from skills.delegate import execute as delegate_exec + def run(): + print("[SCHEDULER] Exécution tâche {} → {}".format(agent, task[:60])) + delegate_exec("{} | {}".format(agent, task)) + + expr = cron_expr.strip() + if expr.startswith("daily "): + t = expr[6:].strip() + h, m = map(int, t.split(":")) + scheduler.add_job(run, "cron", hour=h, minute=m, id=job_id, replace_existing=True) + elif expr.startswith("every ") and expr.endswith("h"): + hours = int(expr[6:-1]) + scheduler.add_job(run, "interval", hours=hours, id=job_id, replace_existing=True) + elif expr.startswith("every ") and expr.endswith("min"): + mins = int(expr[6:-3]) + scheduler.add_job(run, "interval", minutes=mins, id=job_id, replace_existing=True) + elif expr.startswith("weekly "): + parts = expr[7:].split() + day_map = {"lun":"mon","mar":"tue","mer":"wed","jeu":"thu","ven":"fri","sam":"sat","dim":"sun"} + day = day_map.get(parts[0], parts[0]) + h, m = map(int, parts[1].split(":")) + scheduler.add_job(run, "cron", day_of_week=day, hour=h, minute=m, + id=job_id, replace_existing=True) + +# ── MQTT HANDLERS ───────────────────────────────────────────────────────── def on_mqtt_message(client, userdata, msg): raw = msg.payload.decode(errors="replace") - # Support JSON avec reply_to optionnel reply_to = "agents/cli/outbox" task = raw try: - data = json.loads(raw) + data = json.loads(raw) task = data.get("task", raw) reply_to = data.get("reply_to", reply_to) except json.JSONDecodeError: @@ -120,17 +344,15 @@ def on_mqtt_message(client, userdata, msg): mqtt_history = [] reply = ask_llm(task, history=mqtt_history) mqtt_publish(reply_to, reply) - print("[MQTT] Réponse envoyée sur {}".format(reply_to)) def on_mqtt_error(client, userdata, msg): - """Reçoit les erreurs des agents et notifie l'utilisateur via XMPP.""" try: - data = json.loads(msg.payload.decode(errors="replace")) + data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") task = data.get("task", "?") error = data.get("error", "?") source = data.get("source", "?") - notif = "[ERREUR][{}] Agent : {}\nTâche : {}\nErreur : {}".format( + notif = "[ERREUR][{}] Agent : {}\nTâche : {}\nErreur : {}".format( source.upper(), agent, task[:100], error[:300]) print(notif) if xmpp_bot: @@ -139,14 +361,12 @@ def on_mqtt_error(client, userdata, msg): print("[MQTT] Erreur parsing notification : {}".format(e)) def on_mqtt_notification(client, userdata, msg): - """Reçoit les notifications du scheduler.""" try: data = json.loads(msg.payload.decode(errors="replace")) status = data.get("status", "?") agent = data.get("agent", "?") task = data.get("task", "?")[:80] ts = data.get("timestamp", "?") - # Notifier XMPP seulement en cas d'erreur ou de succès important if status == "error" and xmpp_bot: notif = "[PLANIF ERREUR] {} | {} → {}\nStatut : {}".format(ts, agent, task, status) xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') @@ -154,8 +374,6 @@ def on_mqtt_notification(client, userdata, msg): print("[MQTT] Erreur parsing notification scheduler : {}".format(e)) def on_mqtt_status(client, userdata, msg): - """Suit le statut en ligne/hors-ligne des agents (LWT + retain).""" - import time try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") @@ -164,15 +382,13 @@ def on_mqtt_status(client, userdata, msg): was_online = AGENTS_ONLINE.get(agent, {}).get("status") == "online" AGENTS_ONLINE[agent] = {**data, "last_seen": time.time()} - # Sauvegarder pour la skill agents_online AGENTS_ONLINE_FILE.write_text( json.dumps(AGENTS_ONLINE, indent=2, ensure_ascii=False), encoding="utf-8") - # Notifier sylvain uniquement si le statut change is_online = status == "online" if is_online == was_online: return - emoji = "[EN LIGNE]" if is_online else "[HORS LIGNE]" + emoji = "[EN LIGNE]" if is_online else "[HORS LIGNE]" print("[STATUS] {} → {}".format(agent, status)) if xmpp_bot: xmpp_bot.send_message(mto=ADMIN_JID, @@ -182,7 +398,6 @@ def on_mqtt_status(client, userdata, msg): print("[MQTT] Erreur parsing status : {}".format(e)) def on_mqtt_register(client, userdata, msg): - """Reçoit les déclarations de mise en ligne des agents et met à jour le registre.""" try: data = json.loads(msg.payload.decode(errors="replace")) agent = data.get("agent", "?") @@ -191,38 +406,48 @@ def on_mqtt_register(client, userdata, msg): speciality = data.get("speciality", "") print("[REGISTER] {} en ligne (JID: {}, inbox: {})".format(agent, jid, mqtt_inbox)) - # Mettre à jour agents_registry.json registry_file = CONFIG_DIR / "agents_registry.json" try: registry = json.loads(registry_file.read_text(encoding="utf-8")) except Exception: registry = {} is_new = agent not in registry - registry[agent] = { + + # Préserver les champs existants (work_hours etc.) lors d'une mise à jour + existing = registry.get(agent, {}) + existing.update({ "jid" : jid, "mqtt_inbox" : mqtt_inbox, "mqtt_outbox": "agents/agent1/inbox", "speciality" : speciality, - } + }) + registry[agent] = existing registry_file.write_text(json.dumps(registry, indent=2, ensure_ascii=False), encoding="utf-8") - if is_new: - print("[REGISTER] {} ajouté au registre.".format(agent)) - # Notifier sylvain via XMPP if xmpp_bot: status = "NOUVEAU" if is_new else "EN LIGNE" - notif = "[{}] {}\n JID : {}\n MQTT : {}".format(status, agent, jid, mqtt_inbox) + notif = "[{}] {}\n JID : {}\n MQTT : {}".format(status, agent, jid, mqtt_inbox) if speciality: notif += "\n Rôle : {}".format(speciality) xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') except Exception as e: print("[MQTT] Erreur parsing register : {}".format(e)) +def on_mqtt_daily_report(client, userdata, msg): + """Reçoit et stocke les rapports journaliers des agents.""" + try: + data = json.loads(msg.payload.decode(errors="replace")) + agent = data.get("agent", "?") + daily_reports[agent] = data + print("[REPORT] Rapport reçu de {} : {} tâches, {} erreurs".format( + agent, data.get("tasks_today", 0), data.get("errors", 0))) + except Exception as e: + print("[MQTT] Erreur parsing daily_report : {}".format(e)) + def start_mqtt_listener(): global mqtt_pub_client - mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, - client_id="agent1_pub") + mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_pub") mqtt_pub_client.connect(MQTT_HOST, MQTT_PORT) mqtt_pub_client.loop_start() @@ -232,16 +457,18 @@ def start_mqtt_listener(): sub.message_callback_add("agents/scheduler/notifications", on_mqtt_notification) sub.message_callback_add("agents/register", on_mqtt_register) sub.message_callback_add("agents/status/+", on_mqtt_status) - sub.on_message = on_mqtt_message # fallback + sub.message_callback_add("agents/daily_report", on_mqtt_daily_report) + sub.on_message = on_mqtt_message sub.connect(MQTT_HOST, MQTT_PORT) sub.subscribe([ - (MQTT_INBOX, 0), - ("agents/errors", 0), - ("agents/scheduler/notifications", 0), - ("agents/register", 0), - ("agents/status/+", 0), + (MQTT_INBOX, 0), + ("agents/errors", 0), + ("agents/scheduler/notifications", 0), + ("agents/register", 0), + ("agents/status/+", 0), + ("agents/daily_report", 0), ]) - print("[MQTT] Agent1 écoute sur {}, agents/errors, agents/status/+, agents/register".format(MQTT_INBOX)) + print("[MQTT] Agent1 écoute sur {}, errors, status/+, register, daily_report".format(MQTT_INBOX)) sub.loop_forever() # ── BOT XMPP ───────────────────────────────────────────────────────────── @@ -259,6 +486,8 @@ class AgentBot(ClientXMPP): self.send_message(mto=ADMIN_JID, mbody="Agent1 (orchestrateur) en ligne !", mtype='chat') async def message(self, msg): + global SLEEP_MODE, PENDING_CONFIG + if msg['type'] not in ('chat', 'normal'): return if str(msg['from']).split('/')[0] != ADMIN_JID: @@ -266,17 +495,55 @@ class AgentBot(ClientXMPP): user_input = msg['body'].strip() + # ── Commandes !agentON/OFF (prioritaires, toujours traitées) ────── + agent_reply = _handle_agent_command(user_input) + if agent_reply is not None: + self.send_message(mto=ADMIN_JID, mbody=agent_reply, mtype='chat') + return + + # ── Mode veille : ignorer tout sauf commandes agent ─────────────── + if SLEEP_MODE: + self.send_message( + mto=ADMIN_JID, + mbody="[VEILLE] Agent1 en veille. Envoyez !agentsON ou !agentON agent1 pour reprendre.", + mtype='chat') + return + + # ── Commandes config ────────────────────────────────────────────── + config_reply = _handle_config_command(user_input) + if config_reply is not None: + self.send_message(mto=ADMIN_JID, mbody=config_reply, mtype='chat') + return + + # ── Confirmation modification config en attente ─────────────────── + if PENDING_CONFIG is not None: + if user_input.lower() in ("oui", "yes", "o", "confirme", "ok"): + reply = _apply_pending_config() + else: + PENDING_CONFIG = None + reply = "Modification annulée." + self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat') + return + + # ── Reset conversation ───────────────────────────────────────────── if user_input == "!reset": conversation_history.clear() self.send_message(mto=ADMIN_JID, mbody="Conversation reinitialisee.", mtype='chat') return + # ── Traitement LLM normal ───────────────────────────────────────── loop = asyncio.get_event_loop() reply = await loop.run_in_executor(None, ask_llm, user_input) self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat') # ── MAIN ───────────────────────────────────────────────────────────────── if __name__ == "__main__": + # Démarrer le scheduler + scheduler.start() + _reload_report_scheduler() + _reload_task_scheduler() + + # Démarrer MQTT dans un thread séparé mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True) mqtt_thread.start() diff --git a/skills/daily_report.py b/skills/daily_report.py new file mode 100644 index 0000000..3e6b275 --- /dev/null +++ b/skills/daily_report.py @@ -0,0 +1,102 @@ +""" +Skill : DAILY_REPORT +Compile et formate le rapport journalier de tous les agents. + +Commande : + DAILY_REPORT: → rapport du jour + DAILY_REPORT: → rapport d'un agent spécifique +""" +from datetime import datetime, timedelta + +SKILL_NAME = "daily_report" +TRIGGER = "DAILY_REPORT:" + + +def _fmt_uptime(seconds: int) -> str: + if seconds < 60: + return "{}s".format(seconds) + elif seconds < 3600: + return "{}m{}s".format(seconds // 60, seconds % 60) + else: + h = seconds // 3600 + m = (seconds % 3600) // 60 + return "{}h{}m".format(h, m) + + +def compile_report(daily_reports: dict, agent_filter: str = None) -> str: + """ + Compile les rapports reçus des agents en un texte formaté. + daily_reports : {agent_name: {tasks_today, success, errors, avg_duration_s, + last_error, pending, uptime_s, paused, timestamp}} + """ + now = datetime.now().strftime("%Y-%m-%d %H:%M") + lines = ["=== RAPPORT JOURNALIER — {} ===".format(now), ""] + + if not daily_reports: + lines.append("Aucun rapport reçu des agents.") + return "\n".join(lines) + + agents = {k: v for k, v in daily_reports.items() + if agent_filter is None or k == agent_filter} + + if not agents: + return "Aucun rapport pour l'agent '{}'.".format(agent_filter) + + total_tasks = 0 + total_errors = 0 + + for agent_name, data in sorted(agents.items()): + tasks = data.get("tasks_today", 0) + success = data.get("success", 0) + errors = data.get("errors", 0) + avg_dur = data.get("avg_duration_s", 0) + pending = data.get("pending", 0) + uptime = data.get("uptime_s", 0) + paused = data.get("paused", False) + last_err= data.get("last_error") + ts = data.get("timestamp", "?") + + total_tasks += tasks + total_errors += errors + + state = "[EN PAUSE]" if paused else "[ACTIF]" + lines.append("── {} {} ──".format(agent_name, state)) + lines.append(" Rapport : {}".format(ts)) + lines.append(" Uptime : {}".format(_fmt_uptime(uptime))) + lines.append(" Tâches : {} total, {} succès, {} erreurs".format(tasks, success, errors)) + if pending: + lines.append(" En attente : {}".format(pending)) + if avg_dur: + lines.append(" Durée moy : {:.1f}s".format(avg_dur)) + if last_err: + lines.append(" Dernière erreur : {}".format(str(last_err)[:150])) + lines.append("") + + if len(agents) > 1: + lines.append("── TOTAL ──") + lines.append(" Tâches : {} | Erreurs : {}".format(total_tasks, total_errors)) + if total_tasks > 0: + rate = round((total_tasks - total_errors) / total_tasks * 100, 1) + lines.append(" Taux de succès : {}%".format(rate)) + + return "\n".join(lines) + + +def execute(args: str) -> str: + """ + Appelé par le LLM via DAILY_REPORT:. + Lit les rapports stockés dans agent1.daily_reports. + """ + import sys + # Accéder aux rapports stockés dans agent1 (module principal) + agent_filter = args.strip() or None + daily_reports = {} + try: + # agent1.py est le __main__, on récupère ses daily_reports via sys.modules + main_mod = sys.modules.get("__main__") + if main_mod and hasattr(main_mod, "daily_reports"): + daily_reports = main_mod.daily_reports + except Exception: + pass + + return compile_report(daily_reports, agent_filter)