0fe1ece68d
- agent_update.py : _run_ssh() via sshpass, dispatche local ou SSH selon ssh_host - agent1.py : _get_agent_git_info() transmet ssh_host/ssh_user depuis le registre - agents_registry.json : agent2_test → ssh_host: 192.168.7.13, ssh_user: root Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
673 lines
28 KiB
Python
673 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import asyncio
|
|
import sys
|
|
import threading
|
|
import requests
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from slixmpp import ClientXMPP
|
|
import paho.mqtt.client as mqtt
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
|
|
sys.path.insert(0, "/opt/agent")
|
|
from skills.loader import load_skills, run_skills
|
|
|
|
# ── CONFIG ───────────────────────────────────────────────────────────────
|
|
CONFIG_DIR = Path("/opt/agent/config")
|
|
CONFIG_FILE = CONFIG_DIR / "config.json"
|
|
PROMPT_FILE = CONFIG_DIR / "system_prompt.txt"
|
|
|
|
def load_config():
|
|
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
def load_system_prompt():
|
|
with open(PROMPT_FILE, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
|
|
cfg = load_config()
|
|
OLLAMA_URL = cfg["ollama_url"]
|
|
MODEL = cfg["model"]
|
|
XMPP_JID = cfg["xmpp_jid"]
|
|
XMPP_PASS = cfg["xmpp_pass"]
|
|
ADMIN_JID = cfg["admin_jid"]
|
|
MQTT_HOST = cfg.get("mqtt_host", "localhost")
|
|
MQTT_PORT = int(cfg.get("mqtt_port", 1883))
|
|
MQTT_INBOX = "agents/agent1/inbox"
|
|
SYSTEM_PROMPT = load_system_prompt()
|
|
|
|
load_skills()
|
|
|
|
conversation_history = []
|
|
xmpp_bot = None
|
|
AGENTS_ONLINE = {}
|
|
|
|
REGISTRY_FILE = CONFIG_DIR / "agents_registry.json"
|
|
AGENTS_ONLINE_FILE = CONFIG_DIR / "agents_online.json"
|
|
REPORTS_FILE = CONFIG_DIR / "reports_schedule.json"
|
|
TASKS_FILE = CONFIG_DIR / "tasks_schedule.json"
|
|
BLACKOUT_FILE = CONFIG_DIR / "blackout_hours.json"
|
|
|
|
# ── MODE VEILLE ───────────────────────────────────────────────────────────
|
|
SLEEP_MODE = False
|
|
|
|
# ── CONFIRMATION CONFIG EN ATTENTE ────────────────────────────────────────
|
|
# {"file": "reports_schedule" | "tasks_schedule", "content": dict, "description": str}
|
|
PENDING_CONFIG = None
|
|
|
|
# ── RAPPORTS JOURNALIERS (reçus des agents) ───────────────────────────────
|
|
daily_reports = {} # {agent_name: payload_dict}
|
|
|
|
# ── SCHEDULER ─────────────────────────────────────────────────────────────
|
|
scheduler = BackgroundScheduler()
|
|
|
|
# ── AGENTS CONTEXT ────────────────────────────────────────────────────────
|
|
def _get_agents_context() -> str:
|
|
try:
|
|
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
registry = {}
|
|
if not registry:
|
|
return "Aucun agent enregistré."
|
|
lines = ["Agents disponibles :"]
|
|
for name, info in registry.items():
|
|
online = name in AGENTS_ONLINE and AGENTS_ONLINE[name].get("status") == "online"
|
|
status = "[EN LIGNE]" if online else "[hors ligne]"
|
|
lines.append("- {} {} : {}".format(name, status, info.get("speciality", "")))
|
|
return "\n".join(lines)
|
|
|
|
# ── LLM ──────────────────────────────────────────────────────────────────
|
|
def call_ollama(messages: list) -> str:
|
|
payload = {
|
|
"model" : MODEL,
|
|
"messages": messages,
|
|
"stream" : False,
|
|
"options" : {"temperature": 0.3}
|
|
}
|
|
response = requests.post(OLLAMA_URL, json=payload, timeout=180)
|
|
return response.json()["message"]["content"]
|
|
|
|
def ask_llm(user_message: str, history: list = None) -> str:
|
|
if history is None:
|
|
history = conversation_history
|
|
history.append({"role": "user", "content": user_message})
|
|
full_prompt = SYSTEM_PROMPT + "\n\n" + _get_agents_context()
|
|
messages = [{"role": "system", "content": full_prompt}] + history
|
|
try:
|
|
MAX_STEPS = 10
|
|
for _ in range(MAX_STEPS):
|
|
reply = call_ollama(messages)
|
|
skill_triggered, result = run_skills(reply)
|
|
if not skill_triggered:
|
|
history.append({"role": "assistant", "content": reply})
|
|
return reply
|
|
messages.append({"role": "assistant", "content": reply})
|
|
messages.append({"role": "user", "content": "[Résultat skill]\n" + result})
|
|
reply = call_ollama(messages)
|
|
history.append({"role": "assistant", "content": reply})
|
|
return reply
|
|
except Exception as e:
|
|
err = "Erreur : " + str(e)
|
|
history.append({"role": "assistant", "content": err})
|
|
return err
|
|
|
|
# ── MQTT ──────────────────────────────────────────────────────────────────
|
|
mqtt_pub_client = None
|
|
|
|
def mqtt_publish(topic: str, message: str):
|
|
if mqtt_pub_client:
|
|
mqtt_pub_client.publish(topic, message)
|
|
|
|
def _send_control(agent_name: str, command: str):
|
|
"""Envoie une commande de contrôle à un agent via MQTT."""
|
|
payload = json.dumps({"command": command})
|
|
mqtt_publish("agents/{}/control".format(agent_name), payload)
|
|
print("[CONTROL] {} → {}".format(agent_name, command))
|
|
|
|
def _get_all_agents() -> list:
|
|
"""Retourne la liste de tous les agents connus (registre)."""
|
|
try:
|
|
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
|
|
return [n for n in registry if n != "agent1"]
|
|
except Exception:
|
|
return []
|
|
|
|
# ── COMMANDES !agent ──────────────────────────────────────────────────────
|
|
def _handle_agent_command(text: str) -> tuple:
|
|
"""
|
|
Gère toutes les commandes !agent*.
|
|
Retourne (handled: bool, reply: str|None).
|
|
reply=None signifie que la réponse XMPP a déjà été envoyée (ex: self-upgrade).
|
|
"""
|
|
global SLEEP_MODE
|
|
|
|
t = text.strip()
|
|
|
|
# !agentsOFF — veille agent1 + pause tous les agents
|
|
if t == "!agentsOFF":
|
|
agents = _get_all_agents()
|
|
for a in agents:
|
|
_send_control(a, "pause")
|
|
SLEEP_MODE = True
|
|
return True, "[VEILLE] Agent1 en veille. {} agent(s) mis en pause.\nEnvoyez !agentsON ou !agentON agent1 pour reprendre.".format(len(agents))
|
|
|
|
# !agentsON — sortie veille agent1 + resume tous les agents
|
|
if t == "!agentsON":
|
|
agents = _get_all_agents()
|
|
for a in agents:
|
|
_send_control(a, "resume")
|
|
SLEEP_MODE = False
|
|
return True, "[ACTIF] Agent1 actif. {} agent(s) relancés.".format(len(agents))
|
|
|
|
# !agentOFF <nom>
|
|
if t.startswith("!agentOFF "):
|
|
name = t[len("!agentOFF "):].strip()
|
|
if name == "agent1":
|
|
SLEEP_MODE = True
|
|
return True, "[VEILLE] Agent1 en veille. Envoyez !agentON agent1 pour reprendre."
|
|
_send_control(name, "pause")
|
|
return True, "[PAUSE] Commande pause envoyée à {}.".format(name)
|
|
|
|
# !agentON <nom>
|
|
if t.startswith("!agentON "):
|
|
name = t[len("!agentON "):].strip()
|
|
if name == "agent1":
|
|
SLEEP_MODE = False
|
|
return True, "[ACTIF] Agent1 actif."
|
|
_send_control(name, "resume")
|
|
return True, "[ACTIF] Commande resume envoyée à {}.".format(name)
|
|
|
|
# !agentsUPDATE — vérifier les mises à jour de tous les agents
|
|
if t == "!agentsUPDATE":
|
|
return True, _handle_update_all()
|
|
|
|
# !agentUPDATE <nom>
|
|
if t.startswith("!agentUPDATE "):
|
|
name = t[len("!agentUPDATE "):].strip()
|
|
return True, _handle_update_one(name)
|
|
|
|
# !agentsUPGRADE — mettre à jour tous les agents
|
|
if t == "!agentsUPGRADE":
|
|
return True, _handle_upgrade_all()
|
|
|
|
# !agentUPGRADE <nom>
|
|
if t.startswith("!agentUPGRADE "):
|
|
name = t[len("!agentUPGRADE "):].strip()
|
|
return True, _handle_upgrade_one(name)
|
|
|
|
return False, None
|
|
|
|
# ── MISE À JOUR DEPUIS GIT ───────────────────────────────────────────────
|
|
def _get_agent_git_info(name: str) -> dict:
|
|
"""
|
|
Retourne {install_path, service_name, git_branch} pour un agent.
|
|
Priorité : registre → convention /opt/<nom> (fallback automatique).
|
|
"""
|
|
try:
|
|
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
|
|
agent = registry.get(name, {})
|
|
except Exception:
|
|
agent = {}
|
|
|
|
# Fallback : déduire depuis le nom de l'agent (convention /opt/<nom>)
|
|
# agent1 est un cas particulier : /opt/agent, service "agent"
|
|
if name == "agent1":
|
|
default_path = "/opt/agent"
|
|
default_service = "agent"
|
|
else:
|
|
default_path = "/opt/{}".format(name)
|
|
default_service = name
|
|
|
|
return {
|
|
"install_path": agent.get("install_path", default_path),
|
|
"service_name": agent.get("service_name", default_service),
|
|
"git_branch" : agent.get("git_branch", "main"),
|
|
"ssh_host" : agent.get("ssh_host"),
|
|
"ssh_user" : agent.get("ssh_user", "root"),
|
|
}
|
|
|
|
def _handle_update_one(name: str) -> str:
|
|
from skills.agent_update import check_update
|
|
info = _get_agent_git_info(name)
|
|
return check_update(name, info["install_path"], info["git_branch"],
|
|
info["ssh_host"], info["ssh_user"])
|
|
|
|
def _handle_update_all() -> str:
|
|
from skills.agent_update import check_update
|
|
try:
|
|
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return "Erreur lecture registre."
|
|
results = []
|
|
for name in registry:
|
|
info = _get_agent_git_info(name)
|
|
results.append(check_update(name, info["install_path"], info["git_branch"],
|
|
info["ssh_host"], info["ssh_user"]))
|
|
return "\n\n".join(results) if results else "Aucun agent dans le registre."
|
|
|
|
def _handle_upgrade_one(name: str) -> str:
|
|
from skills.agent_update import do_upgrade
|
|
info = _get_agent_git_info(name)
|
|
self_upgrade = (name == "agent1")
|
|
msg = do_upgrade(name, info["install_path"], info["service_name"],
|
|
info["git_branch"], self_upgrade=self_upgrade,
|
|
ssh_host=info["ssh_host"], ssh_user=info["ssh_user"])
|
|
if self_upgrade and "Redémarrage en cours" in msg:
|
|
# Envoyer le message XMPP avant le restart
|
|
if xmpp_bot:
|
|
xmpp_bot.send_message(mto=ADMIN_JID, mbody=msg, mtype='chat')
|
|
import subprocess
|
|
subprocess.Popen(["systemctl", "restart", info["service_name"]])
|
|
return None # Réponse déjà envoyée manuellement
|
|
return msg
|
|
|
|
def _handle_upgrade_all() -> str:
|
|
from skills.agent_update import do_upgrade
|
|
try:
|
|
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return "Erreur lecture registre."
|
|
|
|
results = []
|
|
agent1_info = None
|
|
|
|
for name in registry:
|
|
info = _get_agent_git_info(name)
|
|
if name == "agent1":
|
|
agent1_info = (name, info) # traiter en dernier
|
|
continue
|
|
msg = do_upgrade(name, info["install_path"],
|
|
info["service_name"], info["git_branch"],
|
|
ssh_host=info["ssh_host"], ssh_user=info["ssh_user"])
|
|
results.append(msg)
|
|
|
|
summary = "\n\n".join(results) if results else "Aucun agent mis à jour."
|
|
|
|
if agent1_info:
|
|
name, info = agent1_info
|
|
pull_msg = do_upgrade(name, info["install_path"],
|
|
info["service_name"], info["git_branch"],
|
|
self_upgrade=True)
|
|
summary += "\n\n" + pull_msg
|
|
if xmpp_bot:
|
|
xmpp_bot.send_message(mto=ADMIN_JID, mbody=summary, mtype='chat')
|
|
import subprocess
|
|
subprocess.Popen(["systemctl", "restart", info.get("service_name", "agent")])
|
|
return None # Réponse déjà envoyée
|
|
|
|
return summary
|
|
|
|
# ── GESTION CONFIGS AVEC CONFIRMATION ────────────────────────────────────
|
|
def _handle_config_command(text: str) -> str | None:
|
|
"""Affiche ou propose une modification des fichiers de config."""
|
|
global PENDING_CONFIG
|
|
t = text.strip()
|
|
|
|
if t == "!reports":
|
|
try:
|
|
data = json.loads(REPORTS_FILE.read_text())
|
|
return "=== reports_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
return "Erreur lecture : {}".format(e)
|
|
|
|
if t == "!tasks":
|
|
try:
|
|
data = json.loads(TASKS_FILE.read_text())
|
|
return "=== tasks_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
return "Erreur lecture : {}".format(e)
|
|
|
|
if t == "!blackout":
|
|
try:
|
|
data = json.loads(BLACKOUT_FILE.read_text())
|
|
return "=== blackout_hours.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
return "Erreur lecture : {}".format(e)
|
|
|
|
return None
|
|
|
|
def _apply_pending_config() -> str:
|
|
global PENDING_CONFIG
|
|
if not PENDING_CONFIG:
|
|
return "Aucune modification en attente."
|
|
try:
|
|
file_key = PENDING_CONFIG["file"]
|
|
content = PENDING_CONFIG["content"]
|
|
desc = PENDING_CONFIG["description"]
|
|
if file_key == "reports_schedule":
|
|
REPORTS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False))
|
|
_reload_report_scheduler()
|
|
elif file_key == "tasks_schedule":
|
|
TASKS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False))
|
|
_reload_task_scheduler()
|
|
PENDING_CONFIG = None
|
|
return "Modification appliquée : {}".format(desc)
|
|
except Exception as e:
|
|
PENDING_CONFIG = None
|
|
return "Erreur lors de l'application : {}".format(e)
|
|
|
|
# ── RAPPORT JOURNALIER ────────────────────────────────────────────────────
|
|
def _solicit_report(agent_name: str):
|
|
"""Demande le rapport à un agent via MQTT control."""
|
|
print("[REPORT] Sollicitation rapport de {}".format(agent_name))
|
|
_send_control(agent_name, "report")
|
|
|
|
def _compile_and_send_daily_report():
|
|
"""Compile tous les rapports reçus et envoie à Sylvain."""
|
|
from skills.daily_report import compile_report
|
|
report_text = compile_report(daily_reports)
|
|
print("[REPORT] Rapport journalier compilé, envoi XMPP.")
|
|
if xmpp_bot:
|
|
xmpp_bot.send_message(mto=ADMIN_JID, mbody=report_text, mtype='chat')
|
|
|
|
def _reload_report_scheduler():
|
|
"""(Re)planifie les sollicitations de rapports selon reports_schedule.json."""
|
|
try:
|
|
data = json.loads(REPORTS_FILE.read_text())
|
|
agents = data.get("agents", {})
|
|
for name, conf in agents.items():
|
|
if not conf.get("enabled", True):
|
|
continue
|
|
t = conf["report_time"] # "HH:MM"
|
|
hour, minute = map(int, t.split(":"))
|
|
job_id = "report_{}".format(name)
|
|
try:
|
|
scheduler.remove_job(job_id)
|
|
except Exception:
|
|
pass
|
|
scheduler.add_job(_solicit_report, "cron", args=[name],
|
|
hour=hour, minute=minute, id=job_id,
|
|
replace_existing=True)
|
|
print("[SCHEDULER] Rapport {} planifié à {}".format(name, t))
|
|
|
|
# Rapport journalier compilé
|
|
daily_t = data.get("daily_report_time", "22:30")
|
|
if data.get("daily_report_enabled", True):
|
|
dh, dm = map(int, daily_t.split(":"))
|
|
try:
|
|
scheduler.remove_job("daily_report")
|
|
except Exception:
|
|
pass
|
|
scheduler.add_job(_compile_and_send_daily_report, "cron",
|
|
hour=dh, minute=dm, id="daily_report",
|
|
replace_existing=True)
|
|
print("[SCHEDULER] Rapport journalier planifié à {}".format(daily_t))
|
|
except Exception as e:
|
|
print("[SCHEDULER] Erreur reload report scheduler : {}".format(e))
|
|
|
|
def _reload_task_scheduler():
|
|
"""(Re)planifie les tâches selon tasks_schedule.json."""
|
|
try:
|
|
data = json.loads(TASKS_FILE.read_text())
|
|
tasks = data.get("tasks", [])
|
|
# Supprimer anciens jobs tasks_*
|
|
for job in scheduler.get_jobs():
|
|
if job.id.startswith("task_"):
|
|
scheduler.remove_job(job.id)
|
|
for i, t in enumerate(tasks):
|
|
if not t.get("enabled", True):
|
|
continue
|
|
agent = t["agent"]
|
|
task = t["task"]
|
|
cron = t["cron"] # ex: "daily 03:00" ou "every 6h"
|
|
job_id = "task_{}_{}".format(agent, i)
|
|
_schedule_task_job(job_id, agent, task, cron)
|
|
print("[SCHEDULER] Tâche planifiée [{} @ {}] → {}".format(agent, cron, task[:50]))
|
|
except Exception as e:
|
|
print("[SCHEDULER] Erreur reload task scheduler : {}".format(e))
|
|
|
|
def _schedule_task_job(job_id: str, agent: str, task: str, cron_expr: str):
|
|
"""Planifie une tâche via APScheduler selon l'expression cron."""
|
|
from skills.delegate import execute as delegate_exec
|
|
def run():
|
|
print("[SCHEDULER] Exécution tâche {} → {}".format(agent, task[:60]))
|
|
delegate_exec("{} | {}".format(agent, task))
|
|
|
|
expr = cron_expr.strip()
|
|
if expr.startswith("daily "):
|
|
t = expr[6:].strip()
|
|
h, m = map(int, t.split(":"))
|
|
scheduler.add_job(run, "cron", hour=h, minute=m, id=job_id, replace_existing=True)
|
|
elif expr.startswith("every ") and expr.endswith("h"):
|
|
hours = int(expr[6:-1])
|
|
scheduler.add_job(run, "interval", hours=hours, id=job_id, replace_existing=True)
|
|
elif expr.startswith("every ") and expr.endswith("min"):
|
|
mins = int(expr[6:-3])
|
|
scheduler.add_job(run, "interval", minutes=mins, id=job_id, replace_existing=True)
|
|
elif expr.startswith("weekly "):
|
|
parts = expr[7:].split()
|
|
day_map = {"lun":"mon","mar":"tue","mer":"wed","jeu":"thu","ven":"fri","sam":"sat","dim":"sun"}
|
|
day = day_map.get(parts[0], parts[0])
|
|
h, m = map(int, parts[1].split(":"))
|
|
scheduler.add_job(run, "cron", day_of_week=day, hour=h, minute=m,
|
|
id=job_id, replace_existing=True)
|
|
|
|
# ── MQTT HANDLERS ─────────────────────────────────────────────────────────
|
|
def on_mqtt_message(client, userdata, msg):
|
|
raw = msg.payload.decode(errors="replace")
|
|
|
|
reply_to = "agents/cli/outbox"
|
|
task = raw
|
|
try:
|
|
data = json.loads(raw)
|
|
task = data.get("task", raw)
|
|
reply_to = data.get("reply_to", reply_to)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
print("[MQTT] Message CLI reçu : {}".format(task[:80]))
|
|
mqtt_history = []
|
|
reply = ask_llm(task, history=mqtt_history)
|
|
mqtt_publish(reply_to, reply)
|
|
|
|
def on_mqtt_error(client, userdata, msg):
|
|
try:
|
|
data = json.loads(msg.payload.decode(errors="replace"))
|
|
agent = data.get("agent", "?")
|
|
task = data.get("task", "?")
|
|
error = data.get("error", "?")
|
|
source = data.get("source", "?")
|
|
notif = "[ERREUR][{}] Agent : {}\nTâche : {}\nErreur : {}".format(
|
|
source.upper(), agent, task[:100], error[:300])
|
|
print(notif)
|
|
if xmpp_bot:
|
|
xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat')
|
|
except Exception as e:
|
|
print("[MQTT] Erreur parsing notification : {}".format(e))
|
|
|
|
def on_mqtt_notification(client, userdata, msg):
|
|
try:
|
|
data = json.loads(msg.payload.decode(errors="replace"))
|
|
status = data.get("status", "?")
|
|
agent = data.get("agent", "?")
|
|
task = data.get("task", "?")[:80]
|
|
ts = data.get("timestamp", "?")
|
|
if status == "error" and xmpp_bot:
|
|
notif = "[PLANIF ERREUR] {} | {} → {}\nStatut : {}".format(ts, agent, task, status)
|
|
xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat')
|
|
except Exception as e:
|
|
print("[MQTT] Erreur parsing notification scheduler : {}".format(e))
|
|
|
|
def on_mqtt_status(client, userdata, msg):
|
|
try:
|
|
data = json.loads(msg.payload.decode(errors="replace"))
|
|
agent = data.get("agent", "?")
|
|
status = data.get("status", "?")
|
|
|
|
was_online = AGENTS_ONLINE.get(agent, {}).get("status") == "online"
|
|
AGENTS_ONLINE[agent] = {**data, "last_seen": time.time()}
|
|
|
|
AGENTS_ONLINE_FILE.write_text(
|
|
json.dumps(AGENTS_ONLINE, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
|
|
is_online = status == "online"
|
|
if is_online == was_online:
|
|
return
|
|
emoji = "[EN LIGNE]" if is_online else "[HORS LIGNE]"
|
|
print("[STATUS] {} → {}".format(agent, status))
|
|
if xmpp_bot:
|
|
xmpp_bot.send_message(mto=ADMIN_JID,
|
|
mbody="{} {}".format(emoji, agent),
|
|
mtype='chat')
|
|
except Exception as e:
|
|
print("[MQTT] Erreur parsing status : {}".format(e))
|
|
|
|
def on_mqtt_register(client, userdata, msg):
|
|
try:
|
|
data = json.loads(msg.payload.decode(errors="replace"))
|
|
agent = data.get("agent", "?")
|
|
jid = data.get("jid", "?")
|
|
mqtt_inbox = data.get("mqtt_inbox", "?")
|
|
speciality = data.get("speciality", "")
|
|
print("[REGISTER] {} en ligne (JID: {}, inbox: {})".format(agent, jid, mqtt_inbox))
|
|
|
|
registry_file = CONFIG_DIR / "agents_registry.json"
|
|
try:
|
|
registry = json.loads(registry_file.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
registry = {}
|
|
is_new = agent not in registry
|
|
|
|
# Préserver les champs existants (work_hours etc.) lors d'une mise à jour
|
|
existing = registry.get(agent, {})
|
|
existing.update({
|
|
"jid" : jid,
|
|
"mqtt_inbox" : mqtt_inbox,
|
|
"mqtt_outbox": "agents/agent1/inbox",
|
|
"speciality" : speciality,
|
|
})
|
|
registry[agent] = existing
|
|
registry_file.write_text(json.dumps(registry, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
|
|
if xmpp_bot:
|
|
status = "NOUVEAU" if is_new else "EN LIGNE"
|
|
notif = "[{}] {}\n JID : {}\n MQTT : {}".format(status, agent, jid, mqtt_inbox)
|
|
if speciality:
|
|
notif += "\n Rôle : {}".format(speciality)
|
|
xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat')
|
|
except Exception as e:
|
|
print("[MQTT] Erreur parsing register : {}".format(e))
|
|
|
|
def on_mqtt_daily_report(client, userdata, msg):
|
|
"""Reçoit et stocke les rapports journaliers des agents."""
|
|
try:
|
|
data = json.loads(msg.payload.decode(errors="replace"))
|
|
agent = data.get("agent", "?")
|
|
daily_reports[agent] = data
|
|
print("[REPORT] Rapport reçu de {} : {} tâches, {} erreurs".format(
|
|
agent, data.get("tasks_today", 0), data.get("errors", 0)))
|
|
except Exception as e:
|
|
print("[MQTT] Erreur parsing daily_report : {}".format(e))
|
|
|
|
def start_mqtt_listener():
|
|
global mqtt_pub_client
|
|
|
|
mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_pub")
|
|
mqtt_pub_client.connect(MQTT_HOST, MQTT_PORT)
|
|
mqtt_pub_client.loop_start()
|
|
|
|
sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_sub")
|
|
sub.message_callback_add("agents/agent1/inbox", on_mqtt_message)
|
|
sub.message_callback_add("agents/errors", on_mqtt_error)
|
|
sub.message_callback_add("agents/scheduler/notifications", on_mqtt_notification)
|
|
sub.message_callback_add("agents/register", on_mqtt_register)
|
|
sub.message_callback_add("agents/status/+", on_mqtt_status)
|
|
sub.message_callback_add("agents/daily_report", on_mqtt_daily_report)
|
|
sub.on_message = on_mqtt_message
|
|
sub.connect(MQTT_HOST, MQTT_PORT)
|
|
sub.subscribe([
|
|
(MQTT_INBOX, 0),
|
|
("agents/errors", 0),
|
|
("agents/scheduler/notifications", 0),
|
|
("agents/register", 0),
|
|
("agents/status/+", 0),
|
|
("agents/daily_report", 0),
|
|
])
|
|
print("[MQTT] Agent1 écoute sur {}, errors, status/+, register, daily_report".format(MQTT_INBOX))
|
|
sub.loop_forever()
|
|
|
|
# ── BOT XMPP ─────────────────────────────────────────────────────────────
|
|
class AgentBot(ClientXMPP):
|
|
def __init__(self):
|
|
ClientXMPP.__init__(self, XMPP_JID, XMPP_PASS)
|
|
self.add_event_handler("session_start", self.session_start)
|
|
self.add_event_handler("message", self.message)
|
|
self.register_plugin('xep_0030')
|
|
self.register_plugin('xep_0199')
|
|
|
|
async def session_start(self, event):
|
|
self.send_presence()
|
|
await self.get_roster()
|
|
self.send_message(mto=ADMIN_JID, mbody="Agent1 (orchestrateur) en ligne !", mtype='chat')
|
|
|
|
async def message(self, msg):
|
|
global SLEEP_MODE, PENDING_CONFIG
|
|
|
|
if msg['type'] not in ('chat', 'normal'):
|
|
return
|
|
if str(msg['from']).split('/')[0] != ADMIN_JID:
|
|
return
|
|
|
|
user_input = msg['body'].strip()
|
|
|
|
# ── Commandes !agent* (prioritaires, toujours traitées) ──────────
|
|
handled, agent_reply = _handle_agent_command(user_input)
|
|
if handled:
|
|
if agent_reply is not None:
|
|
self.send_message(mto=ADMIN_JID, mbody=agent_reply, mtype='chat')
|
|
return # None = réponse déjà envoyée manuellement (ex: self-upgrade)
|
|
|
|
# ── Mode veille : ignorer tout sauf commandes agent ───────────────
|
|
if SLEEP_MODE:
|
|
self.send_message(
|
|
mto=ADMIN_JID,
|
|
mbody="[VEILLE] Agent1 en veille. Envoyez !agentsON ou !agentON agent1 pour reprendre.",
|
|
mtype='chat')
|
|
return
|
|
|
|
# ── Commandes config ──────────────────────────────────────────────
|
|
config_reply = _handle_config_command(user_input)
|
|
if config_reply is not None:
|
|
self.send_message(mto=ADMIN_JID, mbody=config_reply, mtype='chat')
|
|
return
|
|
|
|
# ── Confirmation modification config en attente ───────────────────
|
|
if PENDING_CONFIG is not None:
|
|
if user_input.lower() in ("oui", "yes", "o", "confirme", "ok"):
|
|
reply = _apply_pending_config()
|
|
else:
|
|
PENDING_CONFIG = None
|
|
reply = "Modification annulée."
|
|
self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat')
|
|
return
|
|
|
|
# ── Reset conversation ─────────────────────────────────────────────
|
|
if user_input == "!reset":
|
|
conversation_history.clear()
|
|
self.send_message(mto=ADMIN_JID, mbody="Conversation reinitialisee.", mtype='chat')
|
|
return
|
|
|
|
# ── Traitement LLM normal ─────────────────────────────────────────
|
|
loop = asyncio.get_event_loop()
|
|
reply = await loop.run_in_executor(None, ask_llm, user_input)
|
|
self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat')
|
|
|
|
# ── MAIN ─────────────────────────────────────────────────────────────────
|
|
if __name__ == "__main__":
|
|
# Démarrer le scheduler
|
|
scheduler.start()
|
|
_reload_report_scheduler()
|
|
_reload_task_scheduler()
|
|
|
|
# Démarrer MQTT dans un thread séparé
|
|
mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True)
|
|
mqtt_thread.start()
|
|
|
|
xmpp_bot = AgentBot()
|
|
xmpp_bot.connect()
|
|
xmpp_bot.loop.run_forever()
|