Batch 3 : commandes !agentON/OFF, mode veille, rapports journaliers

agent1.py :
  - !agentOFF/ON <nom> : pause/resume d'un agent via MQTT control
  - !agentsOFF/ON : mode veille agent1 + pause/resume tous les agents
  - Confirmation en attente pour modif config (PENDING_CONFIG)
  - !reports / !tasks / !blackout : afficher les configs
  - APScheduler : sollicitation rapports + rapport journalier automatique
  - Souscription agents/daily_report : stockage des rapports reçus
  - on_mqtt_register : préserve work_hours lors des mises à jour registre

skills/daily_report.py :
  - DAILY_REPORT: [agent] : compile les rapports journaliers reçus
  - Formatage uptime, stats, taux de succès

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-08 15:41:54 +00:00
parent 9e3aa29d74
commit 60a216d565
2 changed files with 404 additions and 35 deletions
+291 -24
View File
@@ -6,9 +6,12 @@ import sys
import threading
import requests
import json
import time
from pathlib import Path
from datetime import datetime
from slixmpp import ClientXMPP
import paho.mqtt.client as mqtt
from apscheduler.schedulers.background import BackgroundScheduler
sys.path.insert(0, "/opt/agent")
from skills.loader import load_skills, run_skills
@@ -40,14 +43,30 @@ SYSTEM_PROMPT = load_system_prompt()
load_skills()
conversation_history = []
xmpp_bot = None # référence globale pour répondre via XMPP depuis MQTT
AGENTS_ONLINE = {} # {agent_name: {status, jid, mqtt_inbox, last_seen}}
xmpp_bot = None
AGENTS_ONLINE = {}
REGISTRY_FILE = CONFIG_DIR / "agents_registry.json"
AGENTS_ONLINE_FILE = CONFIG_DIR / "agents_online.json"
REPORTS_FILE = CONFIG_DIR / "reports_schedule.json"
TASKS_FILE = CONFIG_DIR / "tasks_schedule.json"
BLACKOUT_FILE = CONFIG_DIR / "blackout_hours.json"
# ── MODE VEILLE ───────────────────────────────────────────────────────────
SLEEP_MODE = False
# ── CONFIRMATION CONFIG EN ATTENTE ────────────────────────────────────────
# {"file": "reports_schedule" | "tasks_schedule", "content": dict, "description": str}
PENDING_CONFIG = None
# ── RAPPORTS JOURNALIERS (reçus des agents) ───────────────────────────────
daily_reports = {} # {agent_name: payload_dict}
# ── SCHEDULER ─────────────────────────────────────────────────────────────
scheduler = BackgroundScheduler()
# ── AGENTS CONTEXT ────────────────────────────────────────────────────────
def _get_agents_context() -> str:
"""Construit dynamiquement la liste des agents (registre + statut en ligne)."""
try:
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
except Exception:
@@ -96,17 +115,222 @@ def ask_llm(user_message: str, history: list = None) -> str:
history.append({"role": "assistant", "content": err})
return err
# ── MQTT LISTENER (pour CLI) ──────────────────────────────────────────────
# ── MQTT ──────────────────────────────────────────────────────────────────
mqtt_pub_client = None
def mqtt_publish(topic: str, message: str):
if mqtt_pub_client:
mqtt_pub_client.publish(topic, message)
def _send_control(agent_name: str, command: str):
"""Envoie une commande de contrôle à un agent via MQTT."""
payload = json.dumps({"command": command})
mqtt_publish("agents/{}/control".format(agent_name), payload)
print("[CONTROL] {}{}".format(agent_name, command))
def _get_all_agents() -> list:
"""Retourne la liste de tous les agents connus (registre)."""
try:
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
return [n for n in registry if n != "agent1"]
except Exception:
return []
# ── COMMANDES !agent ──────────────────────────────────────────────────────
def _handle_agent_command(text: str) -> str | None:
"""
Gère les commandes !agentON/OFF et !agentsON/OFF.
Retourne la réponse ou None si ce n'est pas une commande agent.
"""
global SLEEP_MODE
t = text.strip()
# !agentsOFF — veille agent1 + pause tous les agents
if t == "!agentsOFF":
agents = _get_all_agents()
for a in agents:
_send_control(a, "pause")
SLEEP_MODE = True
return "[VEILLE] Agent1 en veille. {} agent(s) mis en pause.\nEnvoyez !agentsON ou !agentON agent1 pour reprendre.".format(len(agents))
# !agentsON — sortie veille agent1 + resume tous les agents
if t == "!agentsON":
agents = _get_all_agents()
for a in agents:
_send_control(a, "resume")
SLEEP_MODE = False
return "[ACTIF] Agent1 actif. {} agent(s) relancés.".format(len(agents))
# !agentOFF <nom>
if t.startswith("!agentOFF "):
name = t[len("!agentOFF "):].strip()
if name == "agent1":
SLEEP_MODE = True
return "[VEILLE] Agent1 en veille. Envoyez !agentON agent1 pour reprendre."
_send_control(name, "pause")
return "[PAUSE] Commande pause envoyée à {}.".format(name)
# !agentON <nom>
if t.startswith("!agentON "):
name = t[len("!agentON "):].strip()
if name == "agent1":
SLEEP_MODE = False
return "[ACTIF] Agent1 actif."
_send_control(name, "resume")
return "[ACTIF] Commande resume envoyée à {}.".format(name)
return None
# ── GESTION CONFIGS AVEC CONFIRMATION ────────────────────────────────────
def _handle_config_command(text: str) -> str | None:
"""Affiche ou propose une modification des fichiers de config."""
global PENDING_CONFIG
t = text.strip()
if t == "!reports":
try:
data = json.loads(REPORTS_FILE.read_text())
return "=== reports_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
except Exception as e:
return "Erreur lecture : {}".format(e)
if t == "!tasks":
try:
data = json.loads(TASKS_FILE.read_text())
return "=== tasks_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
except Exception as e:
return "Erreur lecture : {}".format(e)
if t == "!blackout":
try:
data = json.loads(BLACKOUT_FILE.read_text())
return "=== blackout_hours.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
except Exception as e:
return "Erreur lecture : {}".format(e)
return None
def _apply_pending_config() -> str:
global PENDING_CONFIG
if not PENDING_CONFIG:
return "Aucune modification en attente."
try:
file_key = PENDING_CONFIG["file"]
content = PENDING_CONFIG["content"]
desc = PENDING_CONFIG["description"]
if file_key == "reports_schedule":
REPORTS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False))
_reload_report_scheduler()
elif file_key == "tasks_schedule":
TASKS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False))
_reload_task_scheduler()
PENDING_CONFIG = None
return "Modification appliquée : {}".format(desc)
except Exception as e:
PENDING_CONFIG = None
return "Erreur lors de l'application : {}".format(e)
# ── RAPPORT JOURNALIER ────────────────────────────────────────────────────
def _solicit_report(agent_name: str):
"""Demande le rapport à un agent via MQTT control."""
print("[REPORT] Sollicitation rapport de {}".format(agent_name))
_send_control(agent_name, "report")
def _compile_and_send_daily_report():
"""Compile tous les rapports reçus et envoie à Sylvain."""
from skills.daily_report import compile_report
report_text = compile_report(daily_reports)
print("[REPORT] Rapport journalier compilé, envoi XMPP.")
if xmpp_bot:
xmpp_bot.send_message(mto=ADMIN_JID, mbody=report_text, mtype='chat')
def _reload_report_scheduler():
"""(Re)planifie les sollicitations de rapports selon reports_schedule.json."""
try:
data = json.loads(REPORTS_FILE.read_text())
agents = data.get("agents", {})
for name, conf in agents.items():
if not conf.get("enabled", True):
continue
t = conf["report_time"] # "HH:MM"
hour, minute = map(int, t.split(":"))
job_id = "report_{}".format(name)
try:
scheduler.remove_job(job_id)
except Exception:
pass
scheduler.add_job(_solicit_report, "cron", args=[name],
hour=hour, minute=minute, id=job_id,
replace_existing=True)
print("[SCHEDULER] Rapport {} planifié à {}".format(name, t))
# Rapport journalier compilé
daily_t = data.get("daily_report_time", "22:30")
if data.get("daily_report_enabled", True):
dh, dm = map(int, daily_t.split(":"))
try:
scheduler.remove_job("daily_report")
except Exception:
pass
scheduler.add_job(_compile_and_send_daily_report, "cron",
hour=dh, minute=dm, id="daily_report",
replace_existing=True)
print("[SCHEDULER] Rapport journalier planifié à {}".format(daily_t))
except Exception as e:
print("[SCHEDULER] Erreur reload report scheduler : {}".format(e))
def _reload_task_scheduler():
"""(Re)planifie les tâches selon tasks_schedule.json."""
try:
data = json.loads(TASKS_FILE.read_text())
tasks = data.get("tasks", [])
# Supprimer anciens jobs tasks_*
for job in scheduler.get_jobs():
if job.id.startswith("task_"):
scheduler.remove_job(job.id)
for i, t in enumerate(tasks):
if not t.get("enabled", True):
continue
agent = t["agent"]
task = t["task"]
cron = t["cron"] # ex: "daily 03:00" ou "every 6h"
job_id = "task_{}_{}".format(agent, i)
_schedule_task_job(job_id, agent, task, cron)
print("[SCHEDULER] Tâche planifiée [{} @ {}] → {}".format(agent, cron, task[:50]))
except Exception as e:
print("[SCHEDULER] Erreur reload task scheduler : {}".format(e))
def _schedule_task_job(job_id: str, agent: str, task: str, cron_expr: str):
"""Planifie une tâche via APScheduler selon l'expression cron."""
from skills.delegate import execute as delegate_exec
def run():
print("[SCHEDULER] Exécution tâche {}{}".format(agent, task[:60]))
delegate_exec("{} | {}".format(agent, task))
expr = cron_expr.strip()
if expr.startswith("daily "):
t = expr[6:].strip()
h, m = map(int, t.split(":"))
scheduler.add_job(run, "cron", hour=h, minute=m, id=job_id, replace_existing=True)
elif expr.startswith("every ") and expr.endswith("h"):
hours = int(expr[6:-1])
scheduler.add_job(run, "interval", hours=hours, id=job_id, replace_existing=True)
elif expr.startswith("every ") and expr.endswith("min"):
mins = int(expr[6:-3])
scheduler.add_job(run, "interval", minutes=mins, id=job_id, replace_existing=True)
elif expr.startswith("weekly "):
parts = expr[7:].split()
day_map = {"lun":"mon","mar":"tue","mer":"wed","jeu":"thu","ven":"fri","sam":"sat","dim":"sun"}
day = day_map.get(parts[0], parts[0])
h, m = map(int, parts[1].split(":"))
scheduler.add_job(run, "cron", day_of_week=day, hour=h, minute=m,
id=job_id, replace_existing=True)
# ── MQTT HANDLERS ─────────────────────────────────────────────────────────
def on_mqtt_message(client, userdata, msg):
raw = msg.payload.decode(errors="replace")
# Support JSON avec reply_to optionnel
reply_to = "agents/cli/outbox"
task = raw
try:
@@ -120,10 +344,8 @@ def on_mqtt_message(client, userdata, msg):
mqtt_history = []
reply = ask_llm(task, history=mqtt_history)
mqtt_publish(reply_to, reply)
print("[MQTT] Réponse envoyée sur {}".format(reply_to))
def on_mqtt_error(client, userdata, msg):
"""Reçoit les erreurs des agents et notifie l'utilisateur via XMPP."""
try:
data = json.loads(msg.payload.decode(errors="replace"))
agent = data.get("agent", "?")
@@ -139,14 +361,12 @@ def on_mqtt_error(client, userdata, msg):
print("[MQTT] Erreur parsing notification : {}".format(e))
def on_mqtt_notification(client, userdata, msg):
"""Reçoit les notifications du scheduler."""
try:
data = json.loads(msg.payload.decode(errors="replace"))
status = data.get("status", "?")
agent = data.get("agent", "?")
task = data.get("task", "?")[:80]
ts = data.get("timestamp", "?")
# Notifier XMPP seulement en cas d'erreur ou de succès important
if status == "error" and xmpp_bot:
notif = "[PLANIF ERREUR] {} | {}{}\nStatut : {}".format(ts, agent, task, status)
xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat')
@@ -154,8 +374,6 @@ def on_mqtt_notification(client, userdata, msg):
print("[MQTT] Erreur parsing notification scheduler : {}".format(e))
def on_mqtt_status(client, userdata, msg):
"""Suit le statut en ligne/hors-ligne des agents (LWT + retain)."""
import time
try:
data = json.loads(msg.payload.decode(errors="replace"))
agent = data.get("agent", "?")
@@ -164,11 +382,9 @@ def on_mqtt_status(client, userdata, msg):
was_online = AGENTS_ONLINE.get(agent, {}).get("status") == "online"
AGENTS_ONLINE[agent] = {**data, "last_seen": time.time()}
# Sauvegarder pour la skill agents_online
AGENTS_ONLINE_FILE.write_text(
json.dumps(AGENTS_ONLINE, indent=2, ensure_ascii=False), encoding="utf-8")
# Notifier sylvain uniquement si le statut change
is_online = status == "online"
if is_online == was_online:
return
@@ -182,7 +398,6 @@ def on_mqtt_status(client, userdata, msg):
print("[MQTT] Erreur parsing status : {}".format(e))
def on_mqtt_register(client, userdata, msg):
"""Reçoit les déclarations de mise en ligne des agents et met à jour le registre."""
try:
data = json.loads(msg.payload.decode(errors="replace"))
agent = data.get("agent", "?")
@@ -191,24 +406,24 @@ def on_mqtt_register(client, userdata, msg):
speciality = data.get("speciality", "")
print("[REGISTER] {} en ligne (JID: {}, inbox: {})".format(agent, jid, mqtt_inbox))
# Mettre à jour agents_registry.json
registry_file = CONFIG_DIR / "agents_registry.json"
try:
registry = json.loads(registry_file.read_text(encoding="utf-8"))
except Exception:
registry = {}
is_new = agent not in registry
registry[agent] = {
# Préserver les champs existants (work_hours etc.) lors d'une mise à jour
existing = registry.get(agent, {})
existing.update({
"jid" : jid,
"mqtt_inbox" : mqtt_inbox,
"mqtt_outbox": "agents/agent1/inbox",
"speciality" : speciality,
}
})
registry[agent] = existing
registry_file.write_text(json.dumps(registry, indent=2, ensure_ascii=False), encoding="utf-8")
if is_new:
print("[REGISTER] {} ajouté au registre.".format(agent))
# Notifier sylvain via XMPP
if xmpp_bot:
status = "NOUVEAU" if is_new else "EN LIGNE"
notif = "[{}] {}\n JID : {}\n MQTT : {}".format(status, agent, jid, mqtt_inbox)
@@ -218,11 +433,21 @@ def on_mqtt_register(client, userdata, msg):
except Exception as e:
print("[MQTT] Erreur parsing register : {}".format(e))
def on_mqtt_daily_report(client, userdata, msg):
"""Reçoit et stocke les rapports journaliers des agents."""
try:
data = json.loads(msg.payload.decode(errors="replace"))
agent = data.get("agent", "?")
daily_reports[agent] = data
print("[REPORT] Rapport reçu de {} : {} tâches, {} erreurs".format(
agent, data.get("tasks_today", 0), data.get("errors", 0)))
except Exception as e:
print("[MQTT] Erreur parsing daily_report : {}".format(e))
def start_mqtt_listener():
global mqtt_pub_client
mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id="agent1_pub")
mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_pub")
mqtt_pub_client.connect(MQTT_HOST, MQTT_PORT)
mqtt_pub_client.loop_start()
@@ -232,7 +457,8 @@ def start_mqtt_listener():
sub.message_callback_add("agents/scheduler/notifications", on_mqtt_notification)
sub.message_callback_add("agents/register", on_mqtt_register)
sub.message_callback_add("agents/status/+", on_mqtt_status)
sub.on_message = on_mqtt_message # fallback
sub.message_callback_add("agents/daily_report", on_mqtt_daily_report)
sub.on_message = on_mqtt_message
sub.connect(MQTT_HOST, MQTT_PORT)
sub.subscribe([
(MQTT_INBOX, 0),
@@ -240,8 +466,9 @@ def start_mqtt_listener():
("agents/scheduler/notifications", 0),
("agents/register", 0),
("agents/status/+", 0),
("agents/daily_report", 0),
])
print("[MQTT] Agent1 écoute sur {}, agents/errors, agents/status/+, agents/register".format(MQTT_INBOX))
print("[MQTT] Agent1 écoute sur {}, errors, status/+, register, daily_report".format(MQTT_INBOX))
sub.loop_forever()
# ── BOT XMPP ─────────────────────────────────────────────────────────────
@@ -259,6 +486,8 @@ class AgentBot(ClientXMPP):
self.send_message(mto=ADMIN_JID, mbody="Agent1 (orchestrateur) en ligne !", mtype='chat')
async def message(self, msg):
global SLEEP_MODE, PENDING_CONFIG
if msg['type'] not in ('chat', 'normal'):
return
if str(msg['from']).split('/')[0] != ADMIN_JID:
@@ -266,17 +495,55 @@ class AgentBot(ClientXMPP):
user_input = msg['body'].strip()
# ── Commandes !agentON/OFF (prioritaires, toujours traitées) ──────
agent_reply = _handle_agent_command(user_input)
if agent_reply is not None:
self.send_message(mto=ADMIN_JID, mbody=agent_reply, mtype='chat')
return
# ── Mode veille : ignorer tout sauf commandes agent ───────────────
if SLEEP_MODE:
self.send_message(
mto=ADMIN_JID,
mbody="[VEILLE] Agent1 en veille. Envoyez !agentsON ou !agentON agent1 pour reprendre.",
mtype='chat')
return
# ── Commandes config ──────────────────────────────────────────────
config_reply = _handle_config_command(user_input)
if config_reply is not None:
self.send_message(mto=ADMIN_JID, mbody=config_reply, mtype='chat')
return
# ── Confirmation modification config en attente ───────────────────
if PENDING_CONFIG is not None:
if user_input.lower() in ("oui", "yes", "o", "confirme", "ok"):
reply = _apply_pending_config()
else:
PENDING_CONFIG = None
reply = "Modification annulée."
self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat')
return
# ── Reset conversation ─────────────────────────────────────────────
if user_input == "!reset":
conversation_history.clear()
self.send_message(mto=ADMIN_JID, mbody="Conversation reinitialisee.", mtype='chat')
return
# ── Traitement LLM normal ─────────────────────────────────────────
loop = asyncio.get_event_loop()
reply = await loop.run_in_executor(None, ask_llm, user_input)
self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat')
# ── MAIN ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Démarrer le scheduler
scheduler.start()
_reload_report_scheduler()
_reload_task_scheduler()
# Démarrer MQTT dans un thread séparé
mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True)
mqtt_thread.start()
+102
View File
@@ -0,0 +1,102 @@
"""
Skill : DAILY_REPORT
Compile et formate le rapport journalier de tous les agents.
Commande :
DAILY_REPORT: → rapport du jour
DAILY_REPORT: <agent> → rapport d'un agent spécifique
"""
from datetime import datetime, timedelta
SKILL_NAME = "daily_report"
TRIGGER = "DAILY_REPORT:"
def _fmt_uptime(seconds: int) -> str:
if seconds < 60:
return "{}s".format(seconds)
elif seconds < 3600:
return "{}m{}s".format(seconds // 60, seconds % 60)
else:
h = seconds // 3600
m = (seconds % 3600) // 60
return "{}h{}m".format(h, m)
def compile_report(daily_reports: dict, agent_filter: str = None) -> str:
"""
Compile les rapports reçus des agents en un texte formaté.
daily_reports : {agent_name: {tasks_today, success, errors, avg_duration_s,
last_error, pending, uptime_s, paused, timestamp}}
"""
now = datetime.now().strftime("%Y-%m-%d %H:%M")
lines = ["=== RAPPORT JOURNALIER — {} ===".format(now), ""]
if not daily_reports:
lines.append("Aucun rapport reçu des agents.")
return "\n".join(lines)
agents = {k: v for k, v in daily_reports.items()
if agent_filter is None or k == agent_filter}
if not agents:
return "Aucun rapport pour l'agent '{}'.".format(agent_filter)
total_tasks = 0
total_errors = 0
for agent_name, data in sorted(agents.items()):
tasks = data.get("tasks_today", 0)
success = data.get("success", 0)
errors = data.get("errors", 0)
avg_dur = data.get("avg_duration_s", 0)
pending = data.get("pending", 0)
uptime = data.get("uptime_s", 0)
paused = data.get("paused", False)
last_err= data.get("last_error")
ts = data.get("timestamp", "?")
total_tasks += tasks
total_errors += errors
state = "[EN PAUSE]" if paused else "[ACTIF]"
lines.append("── {} {} ──".format(agent_name, state))
lines.append(" Rapport : {}".format(ts))
lines.append(" Uptime : {}".format(_fmt_uptime(uptime)))
lines.append(" Tâches : {} total, {} succès, {} erreurs".format(tasks, success, errors))
if pending:
lines.append(" En attente : {}".format(pending))
if avg_dur:
lines.append(" Durée moy : {:.1f}s".format(avg_dur))
if last_err:
lines.append(" Dernière erreur : {}".format(str(last_err)[:150]))
lines.append("")
if len(agents) > 1:
lines.append("── TOTAL ──")
lines.append(" Tâches : {} | Erreurs : {}".format(total_tasks, total_errors))
if total_tasks > 0:
rate = round((total_tasks - total_errors) / total_tasks * 100, 1)
lines.append(" Taux de succès : {}%".format(rate))
return "\n".join(lines)
def execute(args: str) -> str:
"""
Appelé par le LLM via DAILY_REPORT:.
Lit les rapports stockés dans agent1.daily_reports.
"""
import sys
# Accéder aux rapports stockés dans agent1 (module principal)
agent_filter = args.strip() or None
daily_reports = {}
try:
# agent1.py est le __main__, on récupère ses daily_reports via sys.modules
main_mod = sys.modules.get("__main__")
if main_mod and hasattr(main_mod, "daily_reports"):
daily_reports = main_mod.daily_reports
except Exception:
pass
return compile_report(daily_reports, agent_filter)