Rapports et gestion des erreurs
- skills/reporting.py : REPORT: / REPORT_ERRORS: avec historique SQLite - skills/delegate.py : log des exécutions + détection erreurs + notification MQTT - skills/schedule_tasks.py : log des tâches planifiées - agent1.py : abonnement agents/errors + agents/scheduler/notifications → alerte XMPP - cli.py : commandes /report et /errors - system_prompt.txt : REPORT: et REPORT_ERRORS: ajoutés Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -102,6 +102,37 @@ def on_mqtt_message(client, userdata, msg):
|
|||||||
mqtt_publish(reply_to, reply)
|
mqtt_publish(reply_to, reply)
|
||||||
print("[MQTT] Réponse envoyée sur {}".format(reply_to))
|
print("[MQTT] Réponse envoyée sur {}".format(reply_to))
|
||||||
|
|
||||||
|
def on_mqtt_error(client, userdata, msg):
|
||||||
|
"""Reçoit les erreurs des agents et notifie l'utilisateur via XMPP."""
|
||||||
|
try:
|
||||||
|
data = json.loads(msg.payload.decode(errors="replace"))
|
||||||
|
agent = data.get("agent", "?")
|
||||||
|
task = data.get("task", "?")
|
||||||
|
error = data.get("error", "?")
|
||||||
|
source = data.get("source", "?")
|
||||||
|
notif = "[ERREUR][{}] Agent : {}\nTâche : {}\nErreur : {}".format(
|
||||||
|
source.upper(), agent, task[:100], error[:300])
|
||||||
|
print(notif)
|
||||||
|
if xmpp_bot:
|
||||||
|
xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat')
|
||||||
|
except Exception as e:
|
||||||
|
print("[MQTT] Erreur parsing notification : {}".format(e))
|
||||||
|
|
||||||
|
def on_mqtt_notification(client, userdata, msg):
|
||||||
|
"""Reçoit les notifications du scheduler."""
|
||||||
|
try:
|
||||||
|
data = json.loads(msg.payload.decode(errors="replace"))
|
||||||
|
status = data.get("status", "?")
|
||||||
|
agent = data.get("agent", "?")
|
||||||
|
task = data.get("task", "?")[:80]
|
||||||
|
ts = data.get("timestamp", "?")
|
||||||
|
# Notifier XMPP seulement en cas d'erreur ou de succès important
|
||||||
|
if status == "error" and xmpp_bot:
|
||||||
|
notif = "[PLANIF ERREUR] {} | {} → {}\nStatut : {}".format(ts, agent, task, status)
|
||||||
|
xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat')
|
||||||
|
except Exception as e:
|
||||||
|
print("[MQTT] Erreur parsing notification scheduler : {}".format(e))
|
||||||
|
|
||||||
def start_mqtt_listener():
|
def start_mqtt_listener():
|
||||||
global mqtt_pub_client
|
global mqtt_pub_client
|
||||||
|
|
||||||
@@ -111,10 +142,17 @@ def start_mqtt_listener():
|
|||||||
mqtt_pub_client.loop_start()
|
mqtt_pub_client.loop_start()
|
||||||
|
|
||||||
sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_sub")
|
sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_sub")
|
||||||
sub.on_message = on_mqtt_message
|
sub.message_callback_add("agents/agent1/inbox", on_mqtt_message)
|
||||||
|
sub.message_callback_add("agents/errors", on_mqtt_error)
|
||||||
|
sub.message_callback_add("agents/scheduler/notifications", on_mqtt_notification)
|
||||||
|
sub.on_message = on_mqtt_message # fallback
|
||||||
sub.connect(MQTT_HOST, MQTT_PORT)
|
sub.connect(MQTT_HOST, MQTT_PORT)
|
||||||
sub.subscribe(MQTT_INBOX)
|
sub.subscribe([
|
||||||
print("[MQTT] Agent1 écoute sur {}".format(MQTT_INBOX))
|
(MQTT_INBOX, 0),
|
||||||
|
("agents/errors", 0),
|
||||||
|
("agents/scheduler/notifications", 0),
|
||||||
|
])
|
||||||
|
print("[MQTT] Agent1 écoute sur {}, agents/errors, agents/scheduler/notifications".format(MQTT_INBOX))
|
||||||
sub.loop_forever()
|
sub.loop_forever()
|
||||||
|
|
||||||
# ── BOT XMPP ─────────────────────────────────────────────────────────────
|
# ── BOT XMPP ─────────────────────────────────────────────────────────────
|
||||||
|
|||||||
@@ -115,7 +115,7 @@ def main_loop(agent: str):
|
|||||||
connect_mqtt()
|
connect_mqtt()
|
||||||
|
|
||||||
console.print(Rule("[bold blue]Agent CLI[/bold blue]"))
|
console.print(Rule("[bold blue]Agent CLI[/bold blue]"))
|
||||||
console.print("[dim]Agent cible : [bold]{}[/bold] | /reset | /plans | /quit[/dim]\n".format(agent))
|
console.print("[dim]Agent cible : [bold]{}[/bold] | /reset | /plans | /report | /errors | /agent <nom> | /quit[/dim]\n".format(agent))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -142,6 +142,18 @@ def main_loop(agent: str):
|
|||||||
print_response("plans", result)
|
print_response("plans", result)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if user_input == "/report":
|
||||||
|
with console.status("[bold yellow]Génération rapport...[/bold yellow]"):
|
||||||
|
result = send_and_wait("agent1", "REPORT:", timeout=30)
|
||||||
|
print_response("rapport", result)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if user_input == "/errors":
|
||||||
|
with console.status("[bold yellow]Récupération erreurs...[/bold yellow]"):
|
||||||
|
result = send_and_wait("agent1", "REPORT_ERRORS:", timeout=30)
|
||||||
|
print_response("erreurs", result)
|
||||||
|
continue
|
||||||
|
|
||||||
if user_input.startswith("/agent "):
|
if user_input.startswith("/agent "):
|
||||||
agent = user_input.split(" ", 1)[1].strip()
|
agent = user_input.split(" ", 1)[1].strip()
|
||||||
console.print("[dim]Agent changé : [bold]{}[/bold][/dim]".format(agent))
|
console.print("[dim]Agent changé : [bold]{}[/bold][/dim]".format(agent))
|
||||||
|
|||||||
@@ -27,6 +27,18 @@ PLAN_LIST:
|
|||||||
PLAN_CANCEL: <job_id>
|
PLAN_CANCEL: <job_id>
|
||||||
→ Annuler une tâche planifiée
|
→ Annuler une tâche planifiée
|
||||||
|
|
||||||
|
REPORT:
|
||||||
|
→ Rapport complet des 20 dernières exécutions de tous les agents
|
||||||
|
|
||||||
|
REPORT: <agent>
|
||||||
|
→ Rapport filtré pour un agent spécifique
|
||||||
|
|
||||||
|
REPORT_ERRORS:
|
||||||
|
→ Afficher uniquement les erreurs récentes
|
||||||
|
|
||||||
|
REPORT_ERRORS: <agent>
|
||||||
|
→ Erreurs d'un agent spécifique
|
||||||
|
|
||||||
SEARCH: <requête>
|
SEARCH: <requête>
|
||||||
→ Recherche web DuckDuckGo
|
→ Recherche web DuckDuckGo
|
||||||
|
|
||||||
|
|||||||
+48
-9
@@ -1,12 +1,10 @@
|
|||||||
"""
|
"""
|
||||||
Skill : DELEGATE
|
Skill : DELEGATE
|
||||||
Délègue une tâche à un agent spécialisé via MQTT et attend sa réponse.
|
Délègue une tâche à un agent spécialisé via MQTT et attend sa réponse.
|
||||||
|
Log le résultat et notifie agent1 en cas d'erreur.
|
||||||
|
|
||||||
Commande :
|
Commande :
|
||||||
DELEGATE: <agent_name> | <tâche>
|
DELEGATE: <agent_name> | <tâche>
|
||||||
|
|
||||||
Exemple :
|
|
||||||
DELEGATE: agent2_debian13 | Comment installer Docker sur Debian 13 ?
|
|
||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
@@ -19,13 +17,39 @@ TRIGGER = "DELEGATE:"
|
|||||||
|
|
||||||
CONFIG_FILE = Path("/opt/agent/config/config.json")
|
CONFIG_FILE = Path("/opt/agent/config/config.json")
|
||||||
REGISTRY_FILE = Path("/opt/agent/config/agents_registry.json")
|
REGISTRY_FILE = Path("/opt/agent/config/agents_registry.json")
|
||||||
TIMEOUT = 120 # secondes max d'attente
|
TIMEOUT = 120
|
||||||
|
|
||||||
|
# Mots-clés indiquant une erreur dans la réponse d'un agent
|
||||||
|
ERROR_KEYWORDS = ["erreur", "error", "timeout", "échec", "failed", "cannot", "permission denied",
|
||||||
|
"command not found", "no such file", "connexion refusée"]
|
||||||
|
|
||||||
def _load():
|
def _load():
|
||||||
cfg = json.loads(CONFIG_FILE.read_text())
|
cfg = json.loads(CONFIG_FILE.read_text())
|
||||||
registry = json.loads(REGISTRY_FILE.read_text())
|
registry = json.loads(REGISTRY_FILE.read_text())
|
||||||
return cfg, registry
|
return cfg, registry
|
||||||
|
|
||||||
|
def _is_error(result: str) -> bool:
|
||||||
|
lower = result.lower()
|
||||||
|
if "[erreur" in lower or "exit code" in lower:
|
||||||
|
return True
|
||||||
|
return any(kw in lower for kw in ERROR_KEYWORDS)
|
||||||
|
|
||||||
|
def _notify_error(host: str, port: int, agent: str, task: str, result: str):
|
||||||
|
"""Publie l'erreur sur le topic d'erreurs pour que agent1 notifie l'utilisateur."""
|
||||||
|
try:
|
||||||
|
payload = json.dumps({
|
||||||
|
"agent" : agent,
|
||||||
|
"task" : task[:200],
|
||||||
|
"error" : result[:500],
|
||||||
|
"source" : "delegate"
|
||||||
|
})
|
||||||
|
pub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="delegate_err_pub")
|
||||||
|
pub.connect(host, port)
|
||||||
|
pub.publish("agents/errors", payload)
|
||||||
|
pub.disconnect()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
def execute(args: str) -> str:
|
def execute(args: str) -> str:
|
||||||
if "|" not in args:
|
if "|" not in args:
|
||||||
return "Erreur : format attendu → DELEGATE: <agent> | <tâche>"
|
return "Erreur : format attendu → DELEGATE: <agent> | <tâche>"
|
||||||
@@ -51,14 +75,12 @@ def execute(args: str) -> str:
|
|||||||
response_container.append(msg.payload.decode(errors="replace"))
|
response_container.append(msg.payload.decode(errors="replace"))
|
||||||
response_received.set()
|
response_received.set()
|
||||||
|
|
||||||
# Souscription à la réponse
|
|
||||||
sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_delegate_sub")
|
sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_delegate_sub")
|
||||||
sub.on_message = on_message
|
sub.on_message = on_message
|
||||||
sub.connect(host, port)
|
sub.connect(host, port)
|
||||||
sub.subscribe(outbox)
|
sub.subscribe(outbox)
|
||||||
sub.loop_start()
|
sub.loop_start()
|
||||||
|
|
||||||
# Envoi de la tâche
|
|
||||||
pub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_delegate_pub")
|
pub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_delegate_pub")
|
||||||
pub.connect(host, port)
|
pub.connect(host, port)
|
||||||
pub.publish(inbox, task)
|
pub.publish(inbox, task)
|
||||||
@@ -66,11 +88,28 @@ def execute(args: str) -> str:
|
|||||||
|
|
||||||
print("[DELEGATE] Tâche envoyée à {} : {}".format(agent_name, task[:80]))
|
print("[DELEGATE] Tâche envoyée à {} : {}".format(agent_name, task[:80]))
|
||||||
|
|
||||||
# Attente de la réponse
|
start = time.time()
|
||||||
received = response_received.wait(timeout=TIMEOUT)
|
received = response_received.wait(timeout=TIMEOUT)
|
||||||
|
duration = time.time() - start
|
||||||
|
|
||||||
sub.loop_stop()
|
sub.loop_stop()
|
||||||
sub.disconnect()
|
sub.disconnect()
|
||||||
|
|
||||||
|
from skills.reporting import log_execution
|
||||||
|
|
||||||
if received and response_container:
|
if received and response_container:
|
||||||
return "[{}] {}".format(agent_name, response_container[0])
|
result = response_container[0]
|
||||||
return "Timeout : {} n'a pas répondu dans les {}s.".format(agent_name, TIMEOUT)
|
status = "error" if _is_error(result) else "success"
|
||||||
|
log_execution("delegate", agent_name, task, status, result, duration)
|
||||||
|
|
||||||
|
if status == "error":
|
||||||
|
_notify_error(host, port, agent_name, task, result)
|
||||||
|
print("[DELEGATE] Erreur détectée dans la réponse de {}".format(agent_name))
|
||||||
|
|
||||||
|
return "[{}] {}".format(agent_name, result)
|
||||||
|
|
||||||
|
# Timeout
|
||||||
|
timeout_msg = "Timeout : {} n'a pas répondu dans les {}s.".format(agent_name, TIMEOUT)
|
||||||
|
log_execution("delegate", agent_name, task, "timeout", timeout_msg, duration)
|
||||||
|
_notify_error(host, port, agent_name, task, timeout_msg)
|
||||||
|
return timeout_msg
|
||||||
|
|||||||
@@ -0,0 +1,114 @@
|
|||||||
|
"""
|
||||||
|
Skill : REPORT / REPORT_ERRORS
|
||||||
|
Historique des exécutions et des erreurs de tous les agents.
|
||||||
|
|
||||||
|
Commandes :
|
||||||
|
REPORT: → rapport complet des 20 dernières exécutions
|
||||||
|
REPORT: <agent> → rapport filtré par agent
|
||||||
|
REPORT_ERRORS: → uniquement les erreurs récentes
|
||||||
|
REPORT_ERRORS: <agent> → erreurs d'un agent spécifique
|
||||||
|
"""
|
||||||
|
import sqlite3
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
SKILL_NAME = "reporting"
|
||||||
|
TRIGGER = None
|
||||||
|
TRIGGERS = {
|
||||||
|
"REPORT:": "report",
|
||||||
|
"REPORT_ERRORS:": "report_errors",
|
||||||
|
}
|
||||||
|
|
||||||
|
DB_PATH = Path("/opt/agent/executions.db")
|
||||||
|
|
||||||
|
def _get_conn():
|
||||||
|
conn = sqlite3.connect(DB_PATH)
|
||||||
|
conn.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS executions (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
timestamp TEXT NOT NULL,
|
||||||
|
source TEXT NOT NULL,
|
||||||
|
agent TEXT NOT NULL,
|
||||||
|
task TEXT NOT NULL,
|
||||||
|
status TEXT NOT NULL,
|
||||||
|
result TEXT,
|
||||||
|
duration_s REAL
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
conn.commit()
|
||||||
|
return conn
|
||||||
|
|
||||||
|
def log_execution(source: str, agent: str, task: str,
|
||||||
|
status: str, result: str, duration_s: float = 0.0):
|
||||||
|
"""Enregistre une exécution dans la base. Appelé par delegate et schedule."""
|
||||||
|
try:
|
||||||
|
with _get_conn() as conn:
|
||||||
|
conn.execute("""
|
||||||
|
INSERT INTO executions (timestamp, source, agent, task, status, result, duration_s)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
|
""", (
|
||||||
|
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||||
|
source,
|
||||||
|
agent,
|
||||||
|
task[:200],
|
||||||
|
status,
|
||||||
|
result[:1000] if result else "",
|
||||||
|
round(duration_s, 2)
|
||||||
|
))
|
||||||
|
except Exception as e:
|
||||||
|
print("[Reporting] Erreur log : {}".format(e))
|
||||||
|
|
||||||
|
def _format_rows(rows) -> str:
|
||||||
|
if not rows:
|
||||||
|
return "Aucune exécution trouvée."
|
||||||
|
lines = []
|
||||||
|
for r in rows:
|
||||||
|
id_, ts, source, agent, task, status, result, dur = r
|
||||||
|
icon = "✓" if status == "success" else "✗"
|
||||||
|
lines.append("{} [{}] {} | {} → {} ({:.1f}s)".format(
|
||||||
|
icon, ts, agent, task[:50], status, dur or 0))
|
||||||
|
if status != "success" and result:
|
||||||
|
lines.append(" └ {}".format(result[:150]))
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
def report(args: str) -> str:
|
||||||
|
agent_filter = args.strip() or None
|
||||||
|
try:
|
||||||
|
with _get_conn() as conn:
|
||||||
|
if agent_filter:
|
||||||
|
rows = conn.execute("""
|
||||||
|
SELECT id, timestamp, source, agent, task, status, result, duration_s
|
||||||
|
FROM executions WHERE agent = ? ORDER BY id DESC LIMIT 20
|
||||||
|
""", (agent_filter,)).fetchall()
|
||||||
|
else:
|
||||||
|
rows = conn.execute("""
|
||||||
|
SELECT id, timestamp, source, agent, task, status, result, duration_s
|
||||||
|
FROM executions ORDER BY id DESC LIMIT 20
|
||||||
|
""").fetchall()
|
||||||
|
header = "Rapport d'exécution{} (20 dernières) :".format(
|
||||||
|
" [{}]".format(agent_filter) if agent_filter else "")
|
||||||
|
return header + "\n" + _format_rows(rows)
|
||||||
|
except Exception as e:
|
||||||
|
return "Erreur REPORT : {}".format(e)
|
||||||
|
|
||||||
|
def report_errors(args: str) -> str:
|
||||||
|
agent_filter = args.strip() or None
|
||||||
|
try:
|
||||||
|
with _get_conn() as conn:
|
||||||
|
if agent_filter:
|
||||||
|
rows = conn.execute("""
|
||||||
|
SELECT id, timestamp, source, agent, task, status, result, duration_s
|
||||||
|
FROM executions WHERE status != 'success' AND agent = ?
|
||||||
|
ORDER BY id DESC LIMIT 20
|
||||||
|
""", (agent_filter,)).fetchall()
|
||||||
|
else:
|
||||||
|
rows = conn.execute("""
|
||||||
|
SELECT id, timestamp, source, agent, task, status, result, duration_s
|
||||||
|
FROM executions WHERE status != 'success'
|
||||||
|
ORDER BY id DESC LIMIT 20
|
||||||
|
""").fetchall()
|
||||||
|
header = "Erreurs{} (20 dernières) :".format(
|
||||||
|
" [{}]".format(agent_filter) if agent_filter else "")
|
||||||
|
return header + "\n" + _format_rows(rows)
|
||||||
|
except Exception as e:
|
||||||
|
return "Erreur REPORT_ERRORS : {}".format(e)
|
||||||
@@ -47,23 +47,30 @@ def _get_scheduler():
|
|||||||
|
|
||||||
def _run_delegated_task(agent: str, task: str):
|
def _run_delegated_task(agent: str, task: str):
|
||||||
"""Exécutée par le scheduler : délègue la tâche à l'agent."""
|
"""Exécutée par le scheduler : délègue la tâche à l'agent."""
|
||||||
|
import time as _time
|
||||||
from skills.delegate import execute as delegate_exec
|
from skills.delegate import execute as delegate_exec
|
||||||
|
from skills.reporting import log_execution
|
||||||
import paho.mqtt.publish as publish
|
import paho.mqtt.publish as publish
|
||||||
import json as _json
|
import json as _json
|
||||||
|
|
||||||
|
start = _time.time()
|
||||||
result = delegate_exec("{} | {}".format(agent, task))
|
result = delegate_exec("{} | {}".format(agent, task))
|
||||||
print("[SCHEDULE] Tâche exécutée [{} → {}] : {}".format(
|
duration = _time.time() - start
|
||||||
datetime.now().strftime("%Y-%m-%d %H:%M"), agent, task[:60]))
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
|
||||||
|
|
||||||
# Notifier via MQTT sur le topic de notification
|
status = "error" if "erreur" in result.lower() or "timeout" in result.lower() else "success"
|
||||||
|
log_execution("schedule", agent, task, status, result, duration)
|
||||||
|
print("[SCHEDULE] Tâche exécutée [{} → {}] statut={} : {}".format(ts, agent, status, task[:60]))
|
||||||
|
|
||||||
|
# Notifier via MQTT
|
||||||
try:
|
try:
|
||||||
cfg = _json.loads(Path("/opt/agent/config/config.json").read_text())
|
cfg = _json.loads(Path("/opt/agent/config/config.json").read_text())
|
||||||
publish.single(
|
host = cfg.get("mqtt_host", "localhost")
|
||||||
"agents/scheduler/notifications",
|
|
||||||
payload="[{}] {}\n{}".format(agent, task, result),
|
|
||||||
hostname=cfg.get("mqtt_host", "localhost"),
|
|
||||||
port = int(cfg.get("mqtt_port", 1883))
|
port = int(cfg.get("mqtt_port", 1883))
|
||||||
)
|
payload = _json.dumps({"agent": agent, "task": task, "status": status,
|
||||||
|
"result": result[:500], "timestamp": ts})
|
||||||
|
publish.single("agents/scheduler/notifications", payload=payload,
|
||||||
|
hostname=host, port=port)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user