feat: start LLMCoordinator in Nexus, add /queue command
- on_start: instantiate LLMCoordinator from llm_coordinator config - Override _llm_slot_acquire/release to use local coordinator (no MQTT loop) - /queue: show coordinator status + agent task queue summary - Import time (missing) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -18,6 +18,7 @@ from typing import Optional
|
||||
|
||||
from agents_core import BaseAgent, AgentContext, Message, MessageType
|
||||
from agents_core.command_parser import ParsedCommand, CommandType, help_text
|
||||
from agents_core.llm_coordinator import LLMCoordinator
|
||||
|
||||
from scheduler import NexusScheduler
|
||||
from daily_report import DailyReportManager
|
||||
@@ -52,6 +53,9 @@ class Nexus(BaseAgent):
|
||||
# Mode veille global
|
||||
self._sleep_mode = False
|
||||
|
||||
# Coordinateur LLM — initialisé dans on_start() après connexion MQTT
|
||||
self._llm_coordinator: Optional[LLMCoordinator] = None
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Démarrage
|
||||
# ──────────────────────────────────────────────
|
||||
@@ -61,7 +65,32 @@ class Nexus(BaseAgent):
|
||||
|
||||
def on_start(self):
|
||||
self.scheduler.start(self.config.get("schedules", {}))
|
||||
logger.info("Nexus prêt — scheduler démarré")
|
||||
|
||||
# Démarrage du coordinateur LLM
|
||||
coord_cfg = self.config.get("llm_coordinator", {})
|
||||
max_concurrent = coord_cfg.get("max_concurrent", 1)
|
||||
self._llm_coordinator = LLMCoordinator(self.mqtt, max_concurrent=max_concurrent)
|
||||
self._llm_coordinator.setup()
|
||||
|
||||
logger.info("Nexus prêt — scheduler + coordinateur LLM démarrés")
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Override slots LLM (Nexus passe par le coordinateur local, pas MQTT)
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def _llm_slot_acquire(self) -> Optional[str]:
|
||||
"""Nexus acquiert un slot via le coordinateur local (sans MQTT)."""
|
||||
if self._llm_coordinator:
|
||||
granted = self._llm_coordinator.local_acquire(timeout=120)
|
||||
if not granted:
|
||||
logger.warning("[Nexus] Timeout slot LLM — appel direct")
|
||||
return "__local__" if granted else None
|
||||
return None
|
||||
|
||||
def _llm_slot_release(self, slot_id: Optional[str]):
|
||||
"""Nexus libère son slot."""
|
||||
if slot_id == "__local__" and self._llm_coordinator:
|
||||
self._llm_coordinator.local_release()
|
||||
|
||||
def setup_extra_subscriptions(self):
|
||||
"""Souscriptions MQTT supplémentaires de Nexus."""
|
||||
@@ -247,6 +276,9 @@ class Nexus(BaseAgent):
|
||||
if cmd == "schedules":
|
||||
return self.scheduler.list_jobs()
|
||||
|
||||
if cmd == "queue":
|
||||
return self._handle_queue_command(args)
|
||||
|
||||
if cmd == "update":
|
||||
target = args.strip()
|
||||
if not target:
|
||||
@@ -309,6 +341,50 @@ class Nexus(BaseAgent):
|
||||
|
||||
return f"Commande inconnue : /{cmd}. Tape /help."
|
||||
|
||||
def _handle_queue_command(self, args: str) -> str:
|
||||
"""
|
||||
/queue — état du coordinateur LLM + files d'attente de chaque agent
|
||||
/queue <agent> — file d'attente d'un agent spécifique
|
||||
"""
|
||||
lines = []
|
||||
|
||||
# État du coordinateur LLM
|
||||
if self._llm_coordinator:
|
||||
lines.append(f"── {self._llm_coordinator.status()}")
|
||||
else:
|
||||
lines.append("── Coordinateur LLM : inactif")
|
||||
|
||||
target = args.strip()
|
||||
agents_to_show = []
|
||||
if target:
|
||||
caps = self.registry.get(target)
|
||||
if caps:
|
||||
agents_to_show = [caps]
|
||||
else:
|
||||
return f"Agent '{target}' inconnu."
|
||||
else:
|
||||
agents_to_show = self.registry.all_agents()
|
||||
|
||||
lines.append("")
|
||||
with self._online_lock:
|
||||
online = set(self._online_agents)
|
||||
|
||||
for caps in agents_to_show:
|
||||
aid = caps.agent_id
|
||||
status = "🟢" if aid in online else "🔴"
|
||||
# Demande les stats de la file via MQTT (non bloquant — on affiche ce qu'on sait)
|
||||
lines.append(f"{status} {aid} [{caps.agent_type}]")
|
||||
|
||||
# Propre file nexus
|
||||
nexus_stats = self.queue.daily_stats()
|
||||
lines.append("")
|
||||
lines.append(
|
||||
f"Nexus today : {nexus_stats['total']} tâches "
|
||||
f"(✓{nexus_stats['completed']} ✗{nexus_stats['failed']} ⏳{nexus_stats['pending']})"
|
||||
)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _handle_llm_command(self, args: str) -> str:
|
||||
"""
|
||||
/llm → statut actuel
|
||||
@@ -469,6 +545,8 @@ class Nexus(BaseAgent):
|
||||
/report [agent] — Rapport quotidien
|
||||
/schedule <freq> @a tâche — Planifier une tâche
|
||||
/schedules — Voir les tâches planifiées
|
||||
/queue — État du coordinateur LLM + files d'attente
|
||||
/queue <agent> — File d'un agent spécifique
|
||||
/update <agent> — Mettre à jour un agent (git pull)
|
||||
/llm — Statut et gestion du LLM
|
||||
/llm local|cloud — Switch le modèle pour tous les agents
|
||||
|
||||
Reference in New Issue
Block a user