feat: script scheduling, execution notifications, LLM coordinator
- Add /script command (run/schedule/unschedule/schedules/list) - Add scheduler.add_script_job() with 'once HH:MM' one-shot support - Subscribe to agents/scripts/execution and notify admins via XMPP - Integrate LLMCoordinator (local acquire/release for Nexus) - Update /help with script commands Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -43,6 +43,7 @@ class Nexus(BaseAgent):
|
||||
self.scheduler = NexusScheduler(
|
||||
send_task_callback=self._schedule_send_task,
|
||||
request_report_callback=self._request_daily_report,
|
||||
send_script_callback=self._schedule_send_script,
|
||||
)
|
||||
|
||||
# Résultats en attente de réponse XMPP
|
||||
@@ -98,6 +99,8 @@ class Nexus(BaseAgent):
|
||||
self.mqtt.subscribe("agents/nexus/inbox", self._on_agent_result)
|
||||
# Rapports quotidiens des agents
|
||||
self.mqtt.subscribe("agents/daily_report", self._on_daily_report)
|
||||
# Notifications d'exécution de scripts
|
||||
self.mqtt.subscribe("agents/scripts/execution", self._on_script_execution)
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Réception des résultats agents (MQTT)
|
||||
@@ -276,6 +279,9 @@ class Nexus(BaseAgent):
|
||||
if cmd == "schedules":
|
||||
return self.scheduler.list_jobs()
|
||||
|
||||
if cmd == "script":
|
||||
return self._handle_script_nexus_command(args)
|
||||
|
||||
if cmd == "queue":
|
||||
return self._handle_queue_command(args)
|
||||
|
||||
@@ -341,6 +347,149 @@ class Nexus(BaseAgent):
|
||||
|
||||
return f"Commande inconnue : /{cmd}. Tape /help."
|
||||
|
||||
def _handle_script_nexus_command(self, args: str) -> str:
|
||||
"""
|
||||
/script run <agent> <nom> [args] — exécuter maintenant
|
||||
/script schedule <freq> <agent> <nom> [args] — planifier
|
||||
/script unschedule <job_id> — annuler une planification
|
||||
/script list [agent] — scripts dispo ou planifications
|
||||
/script schedules — voir les scripts planifiés
|
||||
|
||||
Fréquences : daily HH:MM | once HH:MM | every Xh | every Xmin | weekly <jour> HH:MM
|
||||
"""
|
||||
parts = args.strip().split(None, 1)
|
||||
action = parts[0].lower() if parts else ""
|
||||
rest = parts[1] if len(parts) > 1 else ""
|
||||
|
||||
# ── run ──────────────────────────────────────────────────────────
|
||||
if action == "run":
|
||||
p = rest.split(None, 1)
|
||||
if len(p) < 2:
|
||||
return "Usage : /script run <agent> <nom> [args]"
|
||||
agent_id, script_rest = p[0], p[1]
|
||||
# Envoi direct COMMAND + attente réponse 30s
|
||||
import uuid as _uuid
|
||||
corr_id = _uuid.uuid4().hex[:8]
|
||||
reply_topic = f"agents/results/{corr_id}"
|
||||
reply_box = []
|
||||
reply_evt = threading.Event()
|
||||
|
||||
def _on_result(msg, topic):
|
||||
body = msg.payload if hasattr(msg, 'payload') else str(msg)
|
||||
reply_box.append(body)
|
||||
reply_evt.set()
|
||||
|
||||
self.mqtt.subscribe(reply_topic, _on_result)
|
||||
try:
|
||||
self.mqtt.send_to(
|
||||
agent_id,
|
||||
f"/script exec {script_rest}",
|
||||
msg_type=MessageType.COMMAND,
|
||||
correlation_id=corr_id,
|
||||
reply_to=reply_topic,
|
||||
)
|
||||
got = reply_evt.wait(timeout=30)
|
||||
finally:
|
||||
self.mqtt.unsubscribe(reply_topic)
|
||||
|
||||
if got:
|
||||
return reply_box[0]
|
||||
return f"Pas de réponse de {agent_id} dans 30s."
|
||||
|
||||
# ── schedule ─────────────────────────────────────────────────────
|
||||
if action == "schedule":
|
||||
# format : <fréquence> <agent> <nom> [args]
|
||||
# la fréquence peut être "daily HH:MM", "once HH:MM", "every Xh", "weekly lun HH:MM"
|
||||
# on détecte le début de l'agent_id (premier token qui n'appartient pas à la freq)
|
||||
p = rest.split()
|
||||
if len(p) < 3:
|
||||
return "Usage : /script schedule <fréquence> <agent> <nom> [args]"
|
||||
|
||||
# Reconstruit fréquence selon le type
|
||||
if p[0] in ("daily", "once") and len(p) >= 3:
|
||||
freq, agent_id, script_name = p[0] + " " + p[1], p[2], p[3] if len(p) > 3 else ""
|
||||
script_args = " ".join(p[4:]) if len(p) > 4 else ""
|
||||
elif p[0] == "every" and len(p) >= 4:
|
||||
freq, agent_id, script_name = p[0] + " " + p[1], p[2], p[3]
|
||||
script_args = " ".join(p[4:]) if len(p) > 4 else ""
|
||||
elif p[0] == "weekly" and len(p) >= 5:
|
||||
freq, agent_id, script_name = p[0] + " " + p[1] + " " + p[2], p[3], p[4]
|
||||
script_args = " ".join(p[5:]) if len(p) > 5 else ""
|
||||
else:
|
||||
return "Format de fréquence non reconnu. Ex: daily 03:00 | once 14:30 | every 6h | weekly lun 08:00"
|
||||
|
||||
if not script_name:
|
||||
return "Précise le nom du script."
|
||||
try:
|
||||
job_id = self.scheduler.add_script_job(
|
||||
frequency=freq,
|
||||
agent_id=agent_id,
|
||||
script_name=script_name,
|
||||
script_args=script_args,
|
||||
)
|
||||
return f"✓ Script '{script_name}' planifié sur @{agent_id} [{freq}] — id: {job_id}"
|
||||
except Exception as e:
|
||||
return f"Erreur : {e}"
|
||||
|
||||
# ── unschedule ────────────────────────────────────────────────────
|
||||
if action == "unschedule":
|
||||
job_id = rest.strip()
|
||||
if not job_id:
|
||||
return "Usage : /script unschedule <job_id>"
|
||||
ok = self.scheduler.cancel_job(job_id)
|
||||
return f"Job '{job_id}' annulé." if ok else f"Job '{job_id}' introuvable."
|
||||
|
||||
# ── schedules ─────────────────────────────────────────────────────
|
||||
if action == "schedules":
|
||||
jobs = {jid: j for jid, j in self.scheduler._jobs.items()
|
||||
if j.get("type") == "script"}
|
||||
if not jobs:
|
||||
return "Aucun script planifié."
|
||||
lines = ["── Scripts planifiés ────────────────"]
|
||||
for j in jobs.values():
|
||||
lines.append(f" [{j['id']}] {j['frequency']} → @{j['agent']} : {j['task']}")
|
||||
return "\n".join(lines)
|
||||
|
||||
# ── list ──────────────────────────────────────────────────────────
|
||||
if action == "list":
|
||||
agent_id = rest.strip()
|
||||
if not agent_id:
|
||||
return "Usage : /script list <agent>"
|
||||
# Demande la liste directement via COMMAND
|
||||
import uuid as _uuid
|
||||
corr_id = _uuid.uuid4().hex[:8]
|
||||
reply_topic = f"agents/results/{corr_id}"
|
||||
reply_box = []
|
||||
reply_evt = threading.Event()
|
||||
|
||||
def _on_list(msg, topic):
|
||||
body = msg.payload if hasattr(msg, 'payload') else str(msg)
|
||||
reply_box.append(body)
|
||||
reply_evt.set()
|
||||
|
||||
self.mqtt.subscribe(reply_topic, _on_list)
|
||||
try:
|
||||
self.mqtt.send_to(
|
||||
agent_id, "/script list",
|
||||
msg_type=MessageType.COMMAND,
|
||||
correlation_id=corr_id,
|
||||
reply_to=reply_topic,
|
||||
)
|
||||
got = reply_evt.wait(timeout=15)
|
||||
finally:
|
||||
self.mqtt.unsubscribe(reply_topic)
|
||||
|
||||
return reply_box[0] if got else f"Pas de réponse de {agent_id}."
|
||||
|
||||
return (
|
||||
"Usage :\n"
|
||||
" /script run <agent> <nom> [args]\n"
|
||||
" /script schedule <fréquence> <agent> <nom> [args]\n"
|
||||
" /script unschedule <job_id>\n"
|
||||
" /script schedules\n"
|
||||
" /script list <agent>"
|
||||
)
|
||||
|
||||
def _handle_queue_command(self, args: str) -> str:
|
||||
"""
|
||||
/queue — état du coordinateur LLM + files d'attente de chaque agent
|
||||
@@ -547,6 +696,11 @@ class Nexus(BaseAgent):
|
||||
/schedules — Voir les tâches planifiées
|
||||
/queue — État du coordinateur LLM + files d'attente
|
||||
/queue <agent> — File d'un agent spécifique
|
||||
/script run <a> <nom> — Exécuter un script sur un agent
|
||||
/script schedule <f> <a> <nom> — Planifier un script (daily/once/every/weekly)
|
||||
/script unschedule <id> — Annuler une planification de script
|
||||
/script schedules — Voir les scripts planifiés
|
||||
/script list <agent> — Lister les scripts d'un agent
|
||||
/update <agent> — Mettre à jour un agent (git pull)
|
||||
/llm — Statut et gestion du LLM
|
||||
/llm local|cloud — Switch le modèle pour tous les agents
|
||||
@@ -584,6 +738,47 @@ Mode @agent :
|
||||
"""Demande un rapport quotidien à un agent."""
|
||||
self.mqtt.send_to(agent_id, "/report", msg_type=MessageType.COMMAND)
|
||||
|
||||
def _schedule_send_script(self, agent_id: str, script_args: str):
|
||||
"""Callback du scheduler pour exécuter un script planifié sur un agent."""
|
||||
with self._online_lock:
|
||||
online = agent_id in self._online_agents
|
||||
if not online:
|
||||
logger.warning(f"[Scheduler] Agent {agent_id} hors ligne, script ignoré : {script_args}")
|
||||
return
|
||||
self.mqtt.send_to(
|
||||
agent_id,
|
||||
f"/script exec {script_args}",
|
||||
msg_type=MessageType.COMMAND,
|
||||
reply_to=self.mqtt.topic_inbox(),
|
||||
)
|
||||
logger.info(f"[Scheduler] Script envoyé à {agent_id} : {script_args}")
|
||||
|
||||
def _on_script_execution(self, msg, topic: str):
|
||||
"""
|
||||
Un agent vient d'exécuter un script — notifie l'utilisateur via XMPP.
|
||||
Payload JSON : {agent_id, script, timestamp, result}
|
||||
"""
|
||||
try:
|
||||
raw = msg if isinstance(msg, str) else (msg.payload if hasattr(msg, 'payload') else str(msg))
|
||||
if isinstance(raw, dict):
|
||||
data = raw
|
||||
else:
|
||||
data = json.loads(raw)
|
||||
agent_id = data.get("agent_id", "?")
|
||||
script = data.get("script", "?")
|
||||
timestamp = data.get("timestamp", "")
|
||||
result = data.get("result", "")
|
||||
notif = (
|
||||
f"📋 Script exécuté\n"
|
||||
f" Agent : {agent_id}\n"
|
||||
f" Script : {script}\n"
|
||||
f" Heure : {timestamp}\n"
|
||||
f" Résultat:\n{result}"
|
||||
)
|
||||
self.xmpp.send_to_all_admins(notif)
|
||||
except Exception as e:
|
||||
logger.debug(f"[Script] Erreur notification : {e}")
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Broadcast handler
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
Reference in New Issue
Block a user