From dc7f395aa8455d13644d67010640253b8a36e72f Mon Sep 17 00:00:00 2001 From: sylvain Date: Sun, 15 Mar 2026 19:48:59 +0000 Subject: [PATCH] feat: script scheduling, execution notifications, LLM coordinator - Add /script command (run/schedule/unschedule/schedules/list) - Add scheduler.add_script_job() with 'once HH:MM' one-shot support - Subscribe to agents/scripts/execution and notify admins via XMPP - Integrate LLMCoordinator (local acquire/release for Nexus) - Update /help with script commands Co-Authored-By: Claude Sonnet 4.6 --- nexus.py | 195 +++++++++++++++++++++++++++++++++++++++++++++++ scheduler.py | 45 ++++++++++- skills/script.py | 20 ++++- 3 files changed, 258 insertions(+), 2 deletions(-) diff --git a/nexus.py b/nexus.py index d2cd3a7..196596d 100644 --- a/nexus.py +++ b/nexus.py @@ -43,6 +43,7 @@ class Nexus(BaseAgent): self.scheduler = NexusScheduler( send_task_callback=self._schedule_send_task, request_report_callback=self._request_daily_report, + send_script_callback=self._schedule_send_script, ) # Résultats en attente de réponse XMPP @@ -98,6 +99,8 @@ class Nexus(BaseAgent): self.mqtt.subscribe("agents/nexus/inbox", self._on_agent_result) # Rapports quotidiens des agents self.mqtt.subscribe("agents/daily_report", self._on_daily_report) + # Notifications d'exécution de scripts + self.mqtt.subscribe("agents/scripts/execution", self._on_script_execution) # ────────────────────────────────────────────── # Réception des résultats agents (MQTT) @@ -276,6 +279,9 @@ class Nexus(BaseAgent): if cmd == "schedules": return self.scheduler.list_jobs() + if cmd == "script": + return self._handle_script_nexus_command(args) + if cmd == "queue": return self._handle_queue_command(args) @@ -341,6 +347,149 @@ class Nexus(BaseAgent): return f"Commande inconnue : /{cmd}. Tape /help." + def _handle_script_nexus_command(self, args: str) -> str: + """ + /script run [args] — exécuter maintenant + /script schedule [args] — planifier + /script unschedule — annuler une planification + /script list [agent] — scripts dispo ou planifications + /script schedules — voir les scripts planifiés + + Fréquences : daily HH:MM | once HH:MM | every Xh | every Xmin | weekly HH:MM + """ + parts = args.strip().split(None, 1) + action = parts[0].lower() if parts else "" + rest = parts[1] if len(parts) > 1 else "" + + # ── run ────────────────────────────────────────────────────────── + if action == "run": + p = rest.split(None, 1) + if len(p) < 2: + return "Usage : /script run [args]" + agent_id, script_rest = p[0], p[1] + # Envoi direct COMMAND + attente réponse 30s + import uuid as _uuid + corr_id = _uuid.uuid4().hex[:8] + reply_topic = f"agents/results/{corr_id}" + reply_box = [] + reply_evt = threading.Event() + + def _on_result(msg, topic): + body = msg.payload if hasattr(msg, 'payload') else str(msg) + reply_box.append(body) + reply_evt.set() + + self.mqtt.subscribe(reply_topic, _on_result) + try: + self.mqtt.send_to( + agent_id, + f"/script exec {script_rest}", + msg_type=MessageType.COMMAND, + correlation_id=corr_id, + reply_to=reply_topic, + ) + got = reply_evt.wait(timeout=30) + finally: + self.mqtt.unsubscribe(reply_topic) + + if got: + return reply_box[0] + return f"Pas de réponse de {agent_id} dans 30s." + + # ── schedule ───────────────────────────────────────────────────── + if action == "schedule": + # format : [args] + # la fréquence peut être "daily HH:MM", "once HH:MM", "every Xh", "weekly lun HH:MM" + # on détecte le début de l'agent_id (premier token qui n'appartient pas à la freq) + p = rest.split() + if len(p) < 3: + return "Usage : /script schedule [args]" + + # Reconstruit fréquence selon le type + if p[0] in ("daily", "once") and len(p) >= 3: + freq, agent_id, script_name = p[0] + " " + p[1], p[2], p[3] if len(p) > 3 else "" + script_args = " ".join(p[4:]) if len(p) > 4 else "" + elif p[0] == "every" and len(p) >= 4: + freq, agent_id, script_name = p[0] + " " + p[1], p[2], p[3] + script_args = " ".join(p[4:]) if len(p) > 4 else "" + elif p[0] == "weekly" and len(p) >= 5: + freq, agent_id, script_name = p[0] + " " + p[1] + " " + p[2], p[3], p[4] + script_args = " ".join(p[5:]) if len(p) > 5 else "" + else: + return "Format de fréquence non reconnu. Ex: daily 03:00 | once 14:30 | every 6h | weekly lun 08:00" + + if not script_name: + return "Précise le nom du script." + try: + job_id = self.scheduler.add_script_job( + frequency=freq, + agent_id=agent_id, + script_name=script_name, + script_args=script_args, + ) + return f"✓ Script '{script_name}' planifié sur @{agent_id} [{freq}] — id: {job_id}" + except Exception as e: + return f"Erreur : {e}" + + # ── unschedule ──────────────────────────────────────────────────── + if action == "unschedule": + job_id = rest.strip() + if not job_id: + return "Usage : /script unschedule " + ok = self.scheduler.cancel_job(job_id) + return f"Job '{job_id}' annulé." if ok else f"Job '{job_id}' introuvable." + + # ── schedules ───────────────────────────────────────────────────── + if action == "schedules": + jobs = {jid: j for jid, j in self.scheduler._jobs.items() + if j.get("type") == "script"} + if not jobs: + return "Aucun script planifié." + lines = ["── Scripts planifiés ────────────────"] + for j in jobs.values(): + lines.append(f" [{j['id']}] {j['frequency']} → @{j['agent']} : {j['task']}") + return "\n".join(lines) + + # ── list ────────────────────────────────────────────────────────── + if action == "list": + agent_id = rest.strip() + if not agent_id: + return "Usage : /script list " + # Demande la liste directement via COMMAND + import uuid as _uuid + corr_id = _uuid.uuid4().hex[:8] + reply_topic = f"agents/results/{corr_id}" + reply_box = [] + reply_evt = threading.Event() + + def _on_list(msg, topic): + body = msg.payload if hasattr(msg, 'payload') else str(msg) + reply_box.append(body) + reply_evt.set() + + self.mqtt.subscribe(reply_topic, _on_list) + try: + self.mqtt.send_to( + agent_id, "/script list", + msg_type=MessageType.COMMAND, + correlation_id=corr_id, + reply_to=reply_topic, + ) + got = reply_evt.wait(timeout=15) + finally: + self.mqtt.unsubscribe(reply_topic) + + return reply_box[0] if got else f"Pas de réponse de {agent_id}." + + return ( + "Usage :\n" + " /script run [args]\n" + " /script schedule [args]\n" + " /script unschedule \n" + " /script schedules\n" + " /script list " + ) + def _handle_queue_command(self, args: str) -> str: """ /queue — état du coordinateur LLM + files d'attente de chaque agent @@ -547,6 +696,11 @@ class Nexus(BaseAgent): /schedules — Voir les tâches planifiées /queue — État du coordinateur LLM + files d'attente /queue — File d'un agent spécifique + /script run — Exécuter un script sur un agent + /script schedule — Planifier un script (daily/once/every/weekly) + /script unschedule — Annuler une planification de script + /script schedules — Voir les scripts planifiés + /script list — Lister les scripts d'un agent /update — Mettre à jour un agent (git pull) /llm — Statut et gestion du LLM /llm local|cloud — Switch le modèle pour tous les agents @@ -584,6 +738,47 @@ Mode @agent : """Demande un rapport quotidien à un agent.""" self.mqtt.send_to(agent_id, "/report", msg_type=MessageType.COMMAND) + def _schedule_send_script(self, agent_id: str, script_args: str): + """Callback du scheduler pour exécuter un script planifié sur un agent.""" + with self._online_lock: + online = agent_id in self._online_agents + if not online: + logger.warning(f"[Scheduler] Agent {agent_id} hors ligne, script ignoré : {script_args}") + return + self.mqtt.send_to( + agent_id, + f"/script exec {script_args}", + msg_type=MessageType.COMMAND, + reply_to=self.mqtt.topic_inbox(), + ) + logger.info(f"[Scheduler] Script envoyé à {agent_id} : {script_args}") + + def _on_script_execution(self, msg, topic: str): + """ + Un agent vient d'exécuter un script — notifie l'utilisateur via XMPP. + Payload JSON : {agent_id, script, timestamp, result} + """ + try: + raw = msg if isinstance(msg, str) else (msg.payload if hasattr(msg, 'payload') else str(msg)) + if isinstance(raw, dict): + data = raw + else: + data = json.loads(raw) + agent_id = data.get("agent_id", "?") + script = data.get("script", "?") + timestamp = data.get("timestamp", "") + result = data.get("result", "") + notif = ( + f"📋 Script exécuté\n" + f" Agent : {agent_id}\n" + f" Script : {script}\n" + f" Heure : {timestamp}\n" + f" Résultat:\n{result}" + ) + self.xmpp.send_to_all_admins(notif) + except Exception as e: + logger.debug(f"[Script] Erreur notification : {e}") + # ────────────────────────────────────────────── # Broadcast handler # ────────────────────────────────────────────── diff --git a/scheduler.py b/scheduler.py index 787f964..0792103 100644 --- a/scheduler.py +++ b/scheduler.py @@ -4,10 +4,12 @@ Basé sur APScheduler. """ import logging import uuid +from datetime import datetime, timedelta from typing import Callable, Optional from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger logger = logging.getLogger(__name__) @@ -18,10 +20,12 @@ class NexusScheduler: self, send_task_callback: Callable[[str, str], None], request_report_callback: Callable[[str], None], + send_script_callback: Optional[Callable[[str, str], None]] = None, ): self._scheduler = BackgroundScheduler(timezone="Europe/Paris") - self._send_task = send_task_callback + self._send_task = send_task_callback self._request_report = request_report_callback + self._send_script = send_script_callback self._jobs: dict[str, dict] = {} # job_id → metadata def start(self, config: dict): @@ -101,6 +105,36 @@ class NexusScheduler: } return job_id + def add_script_job(self, frequency: str, agent_id: str, + script_name: str, script_args: str = "", + job_id: Optional[str] = None) -> str: + """ + Planifie l'exécution d'un script sur un agent. + Utilise la même syntaxe de fréquence que add_job + 'once HH:MM'. + """ + if self._send_script is None: + raise RuntimeError("send_script_callback non configuré") + job_id = job_id or f"script_{agent_id}_{script_name}_{str(uuid.uuid4())[:4]}" + trigger = self._parse_frequency(frequency) + args_str = script_args.strip() + + self._scheduler.add_job( + func=self._send_script, + trigger=trigger, + args=[agent_id, f"{script_name} {args_str}".strip()], + id=job_id, + replace_existing=True, + ) + self._jobs[job_id] = { + "id": job_id, + "frequency": frequency, + "agent": agent_id, + "task": f"[script] {script_name} {args_str}".strip(), + "type": "script", + } + logger.info(f"[Scheduler] Script job {job_id} : [{frequency}] @{agent_id} → {script_name}") + return job_id + def cancel_job(self, job_id: str) -> bool: try: self._scheduler.remove_job(job_id) @@ -146,4 +180,13 @@ class NexusScheduler: if val.endswith("m"): return IntervalTrigger(minutes=int(val[:-1])) + # once HH:MM → exécution unique à la prochaine occurrence de HH:MM + if parts[0] == "once" and len(parts) >= 2: + hour, minute = map(int, parts[1].split(":")) + now = datetime.now() + run_at = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + if run_at <= now: + run_at += timedelta(days=1) + return DateTrigger(run_date=run_at) + raise ValueError(f"Format de fréquence non reconnu : '{frequency}'") diff --git a/skills/script.py b/skills/script.py index bd4e4f2..b4f828e 100644 --- a/skills/script.py +++ b/skills/script.py @@ -18,10 +18,12 @@ Usage LLM : SKILL:script ARGS:run | SKILL:script ARGS:delete """ +import json import os import stat import subprocess import tempfile +from datetime import datetime DESCRIPTION = "Bibliothèque de scripts bash : sauvegarder, lister, afficher, exécuter" USAGE = ( @@ -67,6 +69,19 @@ def _build_env(context, scripts_dir: str) -> dict: return env +def _notify(context, script_name: str, result: str): + """Publie un événement d'exécution sur MQTT pour que Nexus notifie l'utilisateur.""" + try: + context.mqtt.publish_raw("agents/scripts/execution", json.dumps({ + "agent_id": context.agent_id, + "script": script_name, + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "result": result[:1000], + })) + except Exception: + pass + + def _run_script(cmd: str, env: dict, timeout: int = 120) -> str: try: result = subprocess.run( @@ -145,7 +160,9 @@ def run(args: str, context) -> str: if not os.path.exists(path): return f"Script '{name}' introuvable. Utilise 'list' pour voir les scripts disponibles." env = _build_env(context, d) - return _run_script(f'"{path}" {sargs}', env=env, timeout=120) + out = _run_script(f'"{path}" {sargs}', env=env, timeout=120) + _notify(context, name, out) + return out # ── run (inline) ────────────────────────────────────────────────────── if action == "run": @@ -162,6 +179,7 @@ def run(args: str, context) -> str: env = _build_env(context, d) out = _run_script(tmpfile, env=env, timeout=60) os.unlink(tmpfile) + _notify(context, "", out) return out # ── delete ────────────────────────────────────────────────────────────