From 4b5960ada62152ef84d0b814c561c9411c15bcfd Mon Sep 17 00:00:00 2001 From: sylvain Date: Sun, 15 Mar 2026 19:32:27 +0000 Subject: [PATCH] feat: global LLM coordinator to prevent Ollama overload - llm_coordinator.py: LLMCoordinator server (runs in Nexus) FIFO queue + semaphore, max_concurrent configurable local_acquire/release for Nexus, MQTT request/release for sub-agents - base_agent.py: _llm_slot_acquire/_llm_slot_release around _llm_loop MQTT round-trip to agents/llm/request, 90s timeout degraded fallback Enabled via use_llm_coordinator: true in config Co-Authored-By: Claude Sonnet 4.6 --- agents_core/base_agent.py | 70 +++++++++++++--- agents_core/llm_coordinator.py | 147 +++++++++++++++++++++++++++++++++ 2 files changed, 206 insertions(+), 11 deletions(-) create mode 100644 agents_core/llm_coordinator.py diff --git a/agents_core/base_agent.py b/agents_core/base_agent.py index 7d0f357..ce1c3f7 100644 --- a/agents_core/base_agent.py +++ b/agents_core/base_agent.py @@ -395,21 +395,69 @@ class BaseAgent(ABC): def _llm_loop(self, prompt: str, context: AgentContext, extra_ctx: Optional[str] = None, max_steps: int = 10) -> str: """Boucle LLM avec exécution de skills.""" - response = self.llm.chat(prompt, extra_context=extra_ctx) + slot_id = self._llm_slot_acquire() + try: + response = self.llm.chat(prompt, extra_context=extra_ctx) - for _ in range(max_steps): - skill_call = self.llm.extract_skill_call(response) - if not skill_call: - break - skill_name, args = skill_call - skill_result = self.skills.run(skill_name, args, context) - logger.info(f"[{self.agent_id}] Skill {skill_name} → {str(skill_result)[:80]}") - response = self.llm.chat( - f"Résultat du skill '{skill_name}':\n{skill_result}\n\nContinue ou donne ta réponse finale." - ) + for _ in range(max_steps): + skill_call = self.llm.extract_skill_call(response) + if not skill_call: + break + skill_name, args = skill_call + skill_result = self.skills.run(skill_name, args, context) + logger.info(f"[{self.agent_id}] Skill {skill_name} → {str(skill_result)[:80]}") + response = self.llm.chat( + f"Résultat du skill '{skill_name}':\n{skill_result}\n\nContinue ou donne ta réponse finale." + ) + finally: + self._llm_slot_release(slot_id) return response + def _llm_slot_acquire(self) -> Optional[str]: + """ + Demande un slot LLM au coordinateur Nexus via MQTT. + Retourne un slot_id si accordé (à passer à _llm_slot_release). + Retourne None si coordination désactivée ou timeout (appel direct). + """ + if not self.config.get("use_llm_coordinator", False): + return None + + import uuid as _uuid + slot_id = _uuid.uuid4().hex[:8] + grant_topic = f"agents/{self.agent_id}/llm/grant/{slot_id}" + granted_evt = threading.Event() + + def _on_grant(msg, topic): + granted_evt.set() + + self.mqtt.subscribe(grant_topic, _on_grant) + self.mqtt.publish_raw("agents/llm/request", json.dumps({ + "agent_id": self.agent_id, + "grant_topic": grant_topic, + })) + logger.debug(f"[LLM] Slot demandé au coordinateur (id={slot_id})") + + granted = granted_evt.wait(timeout=90) + self.mqtt.unsubscribe(grant_topic) + + if not granted: + logger.warning("[LLM] Pas de slot accordé en 90s — appel direct (mode dégradé)") + return None + + logger.debug(f"[LLM] Slot accordé (id={slot_id})") + return slot_id + + def _llm_slot_release(self, slot_id: Optional[str]): + """Libère le slot LLM auprès du coordinateur.""" + if slot_id is None: + return + self.mqtt.publish_raw("agents/llm/release", json.dumps({ + "agent_id": self.agent_id, + "slot_id": slot_id, + })) + logger.debug(f"[LLM] Slot libéré (id={slot_id})") + # ────────────────────────────────────────────── # XMPP # ────────────────────────────────────────────── diff --git a/agents_core/llm_coordinator.py b/agents_core/llm_coordinator.py new file mode 100644 index 0000000..5fd7013 --- /dev/null +++ b/agents_core/llm_coordinator.py @@ -0,0 +1,147 @@ +""" +Coordinateur LLM centralisé — évite de surcharger Ollama. + +Nexus instancie LLMCoordinator (serveur). +Les autres agents utilisent le mécanisme MQTT de BaseAgent (client). + +Topics MQTT : + agents/llm/request ← agents publient ici pour demander un slot + payload : {"agent_id": "...", "grant_topic": "agents//llm/grant/"} + agents/llm/release ← agents publient ici quand ils libèrent + payload : {"agent_id": "..."} + agents//llm/grant/ ← nexus publie ici pour accorder le slot +""" +import json +import logging +import threading +from typing import Optional + +logger = logging.getLogger(__name__) + + +class LLMCoordinator: + """ + Gestionnaire de slots LLM — s'exécute dans Nexus. + + Garantit qu'au plus `max_concurrent` appels LLM (Nexus compris) + sont en cours simultanément sur Ollama. + """ + + def __init__(self, mqtt_client, max_concurrent: int = 1): + self._mqtt = mqtt_client + self._max = max_concurrent + self._active = 0 + self._queue = [] # grant_topics en attente (agents distants) + self._local_wait = [] # threading.Event en attente (Nexus local) + self._lock = threading.Lock() + + def setup(self): + """Souscrit aux topics MQTT. À appeler après connexion MQTT.""" + self._mqtt.subscribe("agents/llm/request", self._on_request) + self._mqtt.subscribe("agents/llm/release", self._on_release) + logger.info(f"[LLMCoordinator] Démarré — max {self._max} concurrent(s)") + + # ────────────────────────────────────────────── + # API locale (Nexus lui-même) + # ────────────────────────────────────────────── + + def local_acquire(self, timeout: float = 120) -> bool: + """ + Nexus demande un slot directement (sans MQTT). + Retourne True si accordé, False si timeout. + """ + evt = threading.Event() + with self._lock: + if self._active < self._max: + self._active += 1 + logger.debug(f"[LLMCoordinator] Slot local accordé immédiatement ({self._active}/{self._max})") + return True + # Slot plein → mise en attente + self._local_wait.append(evt) + logger.debug(f"[LLMCoordinator] Nexus en attente de slot ({len(self._local_wait)} waiters)") + + granted = evt.wait(timeout=timeout) + if not granted: + # Timeout : retire de la liste d'attente si encore présent + with self._lock: + try: + self._local_wait.remove(evt) + except ValueError: + pass + logger.warning("[LLMCoordinator] Nexus n'a pas obtenu de slot dans le délai imparti") + return granted + + def local_release(self): + """Nexus libère son slot.""" + with self._lock: + self._active -= 1 + logger.debug(f"[LLMCoordinator] Slot local libéré ({self._active}/{self._max})") + self._try_grant() + + # ────────────────────────────────────────────── + # Handlers MQTT (agents distants) + # ────────────────────────────────────────────── + + def _on_request(self, msg, topic: str): + """Un agent demande un slot.""" + try: + raw = msg if isinstance(msg, str) else (msg.payload if hasattr(msg, 'payload') else str(msg)) + data = json.loads(raw) + grant_topic = data.get("grant_topic") + agent_id = data.get("agent_id", "?") + if not grant_topic: + return + except Exception as e: + logger.debug(f"[LLMCoordinator] Requête invalide : {e}") + return + + with self._lock: + self._queue.append((grant_topic, agent_id)) + logger.debug(f"[LLMCoordinator] Requête de {agent_id} — file : {len(self._queue)}") + self._try_grant() + + def _on_release(self, msg, topic: str): + """Un agent libère son slot.""" + try: + raw = msg if isinstance(msg, str) else (msg.payload if hasattr(msg, 'payload') else str(msg)) + data = json.loads(raw) + agent_id = data.get("agent_id", "?") + except Exception: + agent_id = "?" + + with self._lock: + self._active -= 1 + logger.debug(f"[LLMCoordinator] Slot libéré par {agent_id} ({self._active}/{self._max})") + self._try_grant() + + def _try_grant(self): + """ + Accorde des slots aux suivants dans la file. + DOIT être appelé avec self._lock tenu. + """ + while self._active < self._max: + # Priorité aux waiters locaux (Nexus) + if self._local_wait: + evt = self._local_wait.pop(0) + self._active += 1 + evt.set() + logger.debug(f"[LLMCoordinator] Slot accordé à Nexus (local) ({self._active}/{self._max})") + elif self._queue: + grant_topic, agent_id = self._queue.pop(0) + self._active += 1 + try: + self._mqtt.publish_raw(grant_topic, '{"granted": true}') + logger.debug(f"[LLMCoordinator] Slot accordé à {agent_id} ({self._active}/{self._max})") + except Exception as e: + logger.error(f"[LLMCoordinator] Impossible d'accorder le slot à {agent_id} : {e}") + self._active -= 1 + else: + break + + def status(self) -> str: + """Résumé pour /status ou /llm.""" + with self._lock: + return ( + f"LLM coordinator : {self._active}/{self._max} slot(s) actif(s), " + f"{len(self._queue)} en attente" + )