diff --git a/agent1.py b/agent1.py index f482599..5d9348d 100644 --- a/agent1.py +++ b/agent1.py @@ -102,6 +102,37 @@ def on_mqtt_message(client, userdata, msg): mqtt_publish(reply_to, reply) print("[MQTT] Réponse envoyée sur {}".format(reply_to)) +def on_mqtt_error(client, userdata, msg): + """Reçoit les erreurs des agents et notifie l'utilisateur via XMPP.""" + try: + data = json.loads(msg.payload.decode(errors="replace")) + agent = data.get("agent", "?") + task = data.get("task", "?") + error = data.get("error", "?") + source = data.get("source", "?") + notif = "[ERREUR][{}] Agent : {}\nTâche : {}\nErreur : {}".format( + source.upper(), agent, task[:100], error[:300]) + print(notif) + if xmpp_bot: + xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') + except Exception as e: + print("[MQTT] Erreur parsing notification : {}".format(e)) + +def on_mqtt_notification(client, userdata, msg): + """Reçoit les notifications du scheduler.""" + try: + data = json.loads(msg.payload.decode(errors="replace")) + status = data.get("status", "?") + agent = data.get("agent", "?") + task = data.get("task", "?")[:80] + ts = data.get("timestamp", "?") + # Notifier XMPP seulement en cas d'erreur ou de succès important + if status == "error" and xmpp_bot: + notif = "[PLANIF ERREUR] {} | {} → {}\nStatut : {}".format(ts, agent, task, status) + xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat') + except Exception as e: + print("[MQTT] Erreur parsing notification scheduler : {}".format(e)) + def start_mqtt_listener(): global mqtt_pub_client @@ -111,10 +142,17 @@ def start_mqtt_listener(): mqtt_pub_client.loop_start() sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_sub") - sub.on_message = on_mqtt_message + sub.message_callback_add("agents/agent1/inbox", on_mqtt_message) + sub.message_callback_add("agents/errors", on_mqtt_error) + sub.message_callback_add("agents/scheduler/notifications", on_mqtt_notification) + sub.on_message = on_mqtt_message # fallback sub.connect(MQTT_HOST, MQTT_PORT) - sub.subscribe(MQTT_INBOX) - print("[MQTT] Agent1 écoute sur {}".format(MQTT_INBOX)) + sub.subscribe([ + (MQTT_INBOX, 0), + ("agents/errors", 0), + ("agents/scheduler/notifications", 0), + ]) + print("[MQTT] Agent1 écoute sur {}, agents/errors, agents/scheduler/notifications".format(MQTT_INBOX)) sub.loop_forever() # ── BOT XMPP ───────────────────────────────────────────────────────────── diff --git a/cli.py b/cli.py index 15689e1..4459073 100644 --- a/cli.py +++ b/cli.py @@ -115,7 +115,7 @@ def main_loop(agent: str): connect_mqtt() console.print(Rule("[bold blue]Agent CLI[/bold blue]")) - console.print("[dim]Agent cible : [bold]{}[/bold] | /reset | /plans | /quit[/dim]\n".format(agent)) + console.print("[dim]Agent cible : [bold]{}[/bold] | /reset | /plans | /report | /errors | /agent | /quit[/dim]\n".format(agent)) while True: try: @@ -142,6 +142,18 @@ def main_loop(agent: str): print_response("plans", result) continue + if user_input == "/report": + with console.status("[bold yellow]Génération rapport...[/bold yellow]"): + result = send_and_wait("agent1", "REPORT:", timeout=30) + print_response("rapport", result) + continue + + if user_input == "/errors": + with console.status("[bold yellow]Récupération erreurs...[/bold yellow]"): + result = send_and_wait("agent1", "REPORT_ERRORS:", timeout=30) + print_response("erreurs", result) + continue + if user_input.startswith("/agent "): agent = user_input.split(" ", 1)[1].strip() console.print("[dim]Agent changé : [bold]{}[/bold][/dim]".format(agent)) diff --git a/config/system_prompt.txt b/config/system_prompt.txt index 1d069a6..5349511 100644 --- a/config/system_prompt.txt +++ b/config/system_prompt.txt @@ -27,6 +27,18 @@ PLAN_LIST: PLAN_CANCEL: → Annuler une tâche planifiée +REPORT: + → Rapport complet des 20 dernières exécutions de tous les agents + +REPORT: + → Rapport filtré pour un agent spécifique + +REPORT_ERRORS: + → Afficher uniquement les erreurs récentes + +REPORT_ERRORS: + → Erreurs d'un agent spécifique + SEARCH: → Recherche web DuckDuckGo diff --git a/skills/delegate.py b/skills/delegate.py index 9c80aa4..e0741ce 100644 --- a/skills/delegate.py +++ b/skills/delegate.py @@ -1,12 +1,10 @@ """ Skill : DELEGATE Délègue une tâche à un agent spécialisé via MQTT et attend sa réponse. +Log le résultat et notifie agent1 en cas d'erreur. Commande : DELEGATE: | - -Exemple : - DELEGATE: agent2_debian13 | Comment installer Docker sur Debian 13 ? """ import json import time @@ -19,13 +17,39 @@ TRIGGER = "DELEGATE:" CONFIG_FILE = Path("/opt/agent/config/config.json") REGISTRY_FILE = Path("/opt/agent/config/agents_registry.json") -TIMEOUT = 120 # secondes max d'attente +TIMEOUT = 120 + +# Mots-clés indiquant une erreur dans la réponse d'un agent +ERROR_KEYWORDS = ["erreur", "error", "timeout", "échec", "failed", "cannot", "permission denied", + "command not found", "no such file", "connexion refusée"] def _load(): cfg = json.loads(CONFIG_FILE.read_text()) registry = json.loads(REGISTRY_FILE.read_text()) return cfg, registry +def _is_error(result: str) -> bool: + lower = result.lower() + if "[erreur" in lower or "exit code" in lower: + return True + return any(kw in lower for kw in ERROR_KEYWORDS) + +def _notify_error(host: str, port: int, agent: str, task: str, result: str): + """Publie l'erreur sur le topic d'erreurs pour que agent1 notifie l'utilisateur.""" + try: + payload = json.dumps({ + "agent" : agent, + "task" : task[:200], + "error" : result[:500], + "source" : "delegate" + }) + pub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="delegate_err_pub") + pub.connect(host, port) + pub.publish("agents/errors", payload) + pub.disconnect() + except Exception: + pass + def execute(args: str) -> str: if "|" not in args: return "Erreur : format attendu → DELEGATE: | " @@ -44,21 +68,19 @@ def execute(args: str) -> str: host = cfg.get("mqtt_host", "localhost") port = int(cfg.get("mqtt_port", 1883)) - response_received = threading.Event() + response_received = threading.Event() response_container = [] def on_message(client, userdata, msg): response_container.append(msg.payload.decode(errors="replace")) response_received.set() - # Souscription à la réponse sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_delegate_sub") sub.on_message = on_message sub.connect(host, port) sub.subscribe(outbox) sub.loop_start() - # Envoi de la tâche pub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_delegate_pub") pub.connect(host, port) pub.publish(inbox, task) @@ -66,11 +88,28 @@ def execute(args: str) -> str: print("[DELEGATE] Tâche envoyée à {} : {}".format(agent_name, task[:80])) - # Attente de la réponse + start = time.time() received = response_received.wait(timeout=TIMEOUT) + duration = time.time() - start + sub.loop_stop() sub.disconnect() + from skills.reporting import log_execution + if received and response_container: - return "[{}] {}".format(agent_name, response_container[0]) - return "Timeout : {} n'a pas répondu dans les {}s.".format(agent_name, TIMEOUT) + result = response_container[0] + status = "error" if _is_error(result) else "success" + log_execution("delegate", agent_name, task, status, result, duration) + + if status == "error": + _notify_error(host, port, agent_name, task, result) + print("[DELEGATE] Erreur détectée dans la réponse de {}".format(agent_name)) + + return "[{}] {}".format(agent_name, result) + + # Timeout + timeout_msg = "Timeout : {} n'a pas répondu dans les {}s.".format(agent_name, TIMEOUT) + log_execution("delegate", agent_name, task, "timeout", timeout_msg, duration) + _notify_error(host, port, agent_name, task, timeout_msg) + return timeout_msg diff --git a/skills/reporting.py b/skills/reporting.py new file mode 100644 index 0000000..2ca8c34 --- /dev/null +++ b/skills/reporting.py @@ -0,0 +1,114 @@ +""" +Skill : REPORT / REPORT_ERRORS +Historique des exécutions et des erreurs de tous les agents. + +Commandes : + REPORT: → rapport complet des 20 dernières exécutions + REPORT: → rapport filtré par agent + REPORT_ERRORS: → uniquement les erreurs récentes + REPORT_ERRORS: → erreurs d'un agent spécifique +""" +import sqlite3 +from pathlib import Path +from datetime import datetime + +SKILL_NAME = "reporting" +TRIGGER = None +TRIGGERS = { + "REPORT:": "report", + "REPORT_ERRORS:": "report_errors", +} + +DB_PATH = Path("/opt/agent/executions.db") + +def _get_conn(): + conn = sqlite3.connect(DB_PATH) + conn.execute(""" + CREATE TABLE IF NOT EXISTS executions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + source TEXT NOT NULL, + agent TEXT NOT NULL, + task TEXT NOT NULL, + status TEXT NOT NULL, + result TEXT, + duration_s REAL + ) + """) + conn.commit() + return conn + +def log_execution(source: str, agent: str, task: str, + status: str, result: str, duration_s: float = 0.0): + """Enregistre une exécution dans la base. Appelé par delegate et schedule.""" + try: + with _get_conn() as conn: + conn.execute(""" + INSERT INTO executions (timestamp, source, agent, task, status, result, duration_s) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, ( + datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + source, + agent, + task[:200], + status, + result[:1000] if result else "", + round(duration_s, 2) + )) + except Exception as e: + print("[Reporting] Erreur log : {}".format(e)) + +def _format_rows(rows) -> str: + if not rows: + return "Aucune exécution trouvée." + lines = [] + for r in rows: + id_, ts, source, agent, task, status, result, dur = r + icon = "✓" if status == "success" else "✗" + lines.append("{} [{}] {} | {} → {} ({:.1f}s)".format( + icon, ts, agent, task[:50], status, dur or 0)) + if status != "success" and result: + lines.append(" └ {}".format(result[:150])) + return "\n".join(lines) + +def report(args: str) -> str: + agent_filter = args.strip() or None + try: + with _get_conn() as conn: + if agent_filter: + rows = conn.execute(""" + SELECT id, timestamp, source, agent, task, status, result, duration_s + FROM executions WHERE agent = ? ORDER BY id DESC LIMIT 20 + """, (agent_filter,)).fetchall() + else: + rows = conn.execute(""" + SELECT id, timestamp, source, agent, task, status, result, duration_s + FROM executions ORDER BY id DESC LIMIT 20 + """).fetchall() + header = "Rapport d'exécution{} (20 dernières) :".format( + " [{}]".format(agent_filter) if agent_filter else "") + return header + "\n" + _format_rows(rows) + except Exception as e: + return "Erreur REPORT : {}".format(e) + +def report_errors(args: str) -> str: + agent_filter = args.strip() or None + try: + with _get_conn() as conn: + if agent_filter: + rows = conn.execute(""" + SELECT id, timestamp, source, agent, task, status, result, duration_s + FROM executions WHERE status != 'success' AND agent = ? + ORDER BY id DESC LIMIT 20 + """, (agent_filter,)).fetchall() + else: + rows = conn.execute(""" + SELECT id, timestamp, source, agent, task, status, result, duration_s + FROM executions WHERE status != 'success' + ORDER BY id DESC LIMIT 20 + """).fetchall() + header = "Erreurs{} (20 dernières) :".format( + " [{}]".format(agent_filter) if agent_filter else "") + return header + "\n" + _format_rows(rows) + except Exception as e: + return "Erreur REPORT_ERRORS : {}".format(e) diff --git a/skills/schedule_tasks.py b/skills/schedule_tasks.py index 10d4499..010a991 100644 --- a/skills/schedule_tasks.py +++ b/skills/schedule_tasks.py @@ -47,23 +47,30 @@ def _get_scheduler(): def _run_delegated_task(agent: str, task: str): """Exécutée par le scheduler : délègue la tâche à l'agent.""" + import time as _time from skills.delegate import execute as delegate_exec + from skills.reporting import log_execution import paho.mqtt.publish as publish import json as _json + start = _time.time() result = delegate_exec("{} | {}".format(agent, task)) - print("[SCHEDULE] Tâche exécutée [{} → {}] : {}".format( - datetime.now().strftime("%Y-%m-%d %H:%M"), agent, task[:60])) + duration = _time.time() - start + ts = datetime.now().strftime("%Y-%m-%d %H:%M") - # Notifier via MQTT sur le topic de notification + status = "error" if "erreur" in result.lower() or "timeout" in result.lower() else "success" + log_execution("schedule", agent, task, status, result, duration) + print("[SCHEDULE] Tâche exécutée [{} → {}] statut={} : {}".format(ts, agent, status, task[:60])) + + # Notifier via MQTT try: cfg = _json.loads(Path("/opt/agent/config/config.json").read_text()) - publish.single( - "agents/scheduler/notifications", - payload="[{}] {}\n{}".format(agent, task, result), - hostname=cfg.get("mqtt_host", "localhost"), - port=int(cfg.get("mqtt_port", 1883)) - ) + host = cfg.get("mqtt_host", "localhost") + port = int(cfg.get("mqtt_port", 1883)) + payload = _json.dumps({"agent": agent, "task": task, "status": status, + "result": result[:500], "timestamp": ts}) + publish.single("agents/scheduler/notifications", payload=payload, + hostname=host, port=port) except Exception: pass