From 6be209034c7bd5d63ad8f544209eed16b7dbca94 Mon Sep 17 00:00:00 2001 From: sylvain Date: Sun, 15 Mar 2026 19:32:43 +0000 Subject: [PATCH] feat: start LLMCoordinator in Nexus, add /queue command - on_start: instantiate LLMCoordinator from llm_coordinator config - Override _llm_slot_acquire/release to use local coordinator (no MQTT loop) - /queue: show coordinator status + agent task queue summary - Import time (missing) Co-Authored-By: Claude Sonnet 4.6 --- nexus.py | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/nexus.py b/nexus.py index 2ecc82c..d2cd3a7 100644 --- a/nexus.py +++ b/nexus.py @@ -18,6 +18,7 @@ from typing import Optional from agents_core import BaseAgent, AgentContext, Message, MessageType from agents_core.command_parser import ParsedCommand, CommandType, help_text +from agents_core.llm_coordinator import LLMCoordinator from scheduler import NexusScheduler from daily_report import DailyReportManager @@ -52,6 +53,9 @@ class Nexus(BaseAgent): # Mode veille global self._sleep_mode = False + # Coordinateur LLM — initialisé dans on_start() après connexion MQTT + self._llm_coordinator: Optional[LLMCoordinator] = None + # ────────────────────────────────────────────── # Démarrage # ────────────────────────────────────────────── @@ -61,7 +65,32 @@ class Nexus(BaseAgent): def on_start(self): self.scheduler.start(self.config.get("schedules", {})) - logger.info("Nexus prêt — scheduler démarré") + + # Démarrage du coordinateur LLM + coord_cfg = self.config.get("llm_coordinator", {}) + max_concurrent = coord_cfg.get("max_concurrent", 1) + self._llm_coordinator = LLMCoordinator(self.mqtt, max_concurrent=max_concurrent) + self._llm_coordinator.setup() + + logger.info("Nexus prêt — scheduler + coordinateur LLM démarrés") + + # ────────────────────────────────────────────── + # Override slots LLM (Nexus passe par le coordinateur local, pas MQTT) + # ────────────────────────────────────────────── + + def _llm_slot_acquire(self) -> Optional[str]: + """Nexus acquiert un slot via le coordinateur local (sans MQTT).""" + if self._llm_coordinator: + granted = self._llm_coordinator.local_acquire(timeout=120) + if not granted: + logger.warning("[Nexus] Timeout slot LLM — appel direct") + return "__local__" if granted else None + return None + + def _llm_slot_release(self, slot_id: Optional[str]): + """Nexus libère son slot.""" + if slot_id == "__local__" and self._llm_coordinator: + self._llm_coordinator.local_release() def setup_extra_subscriptions(self): """Souscriptions MQTT supplémentaires de Nexus.""" @@ -247,6 +276,9 @@ class Nexus(BaseAgent): if cmd == "schedules": return self.scheduler.list_jobs() + if cmd == "queue": + return self._handle_queue_command(args) + if cmd == "update": target = args.strip() if not target: @@ -309,6 +341,50 @@ class Nexus(BaseAgent): return f"Commande inconnue : /{cmd}. Tape /help." + def _handle_queue_command(self, args: str) -> str: + """ + /queue — état du coordinateur LLM + files d'attente de chaque agent + /queue — file d'attente d'un agent spécifique + """ + lines = [] + + # État du coordinateur LLM + if self._llm_coordinator: + lines.append(f"── {self._llm_coordinator.status()}") + else: + lines.append("── Coordinateur LLM : inactif") + + target = args.strip() + agents_to_show = [] + if target: + caps = self.registry.get(target) + if caps: + agents_to_show = [caps] + else: + return f"Agent '{target}' inconnu." + else: + agents_to_show = self.registry.all_agents() + + lines.append("") + with self._online_lock: + online = set(self._online_agents) + + for caps in agents_to_show: + aid = caps.agent_id + status = "🟢" if aid in online else "🔴" + # Demande les stats de la file via MQTT (non bloquant — on affiche ce qu'on sait) + lines.append(f"{status} {aid} [{caps.agent_type}]") + + # Propre file nexus + nexus_stats = self.queue.daily_stats() + lines.append("") + lines.append( + f"Nexus today : {nexus_stats['total']} tâches " + f"(✓{nexus_stats['completed']} ✗{nexus_stats['failed']} ⏳{nexus_stats['pending']})" + ) + + return "\n".join(lines) + def _handle_llm_command(self, args: str) -> str: """ /llm → statut actuel @@ -469,6 +545,8 @@ class Nexus(BaseAgent): /report [agent] — Rapport quotidien /schedule @a tâche — Planifier une tâche /schedules — Voir les tâches planifiées + /queue — État du coordinateur LLM + files d'attente + /queue — File d'un agent spécifique /update — Mettre à jour un agent (git pull) /llm — Statut et gestion du LLM /llm local|cloud — Switch le modèle pour tous les agents