feat: global LLM coordinator to prevent Ollama overload
- llm_coordinator.py: LLMCoordinator server (runs in Nexus) FIFO queue + semaphore, max_concurrent configurable local_acquire/release for Nexus, MQTT request/release for sub-agents - base_agent.py: _llm_slot_acquire/_llm_slot_release around _llm_loop MQTT round-trip to agents/llm/request, 90s timeout degraded fallback Enabled via use_llm_coordinator: true in config Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+59
-11
@@ -395,21 +395,69 @@ class BaseAgent(ABC):
|
||||
def _llm_loop(self, prompt: str, context: AgentContext,
|
||||
extra_ctx: Optional[str] = None, max_steps: int = 10) -> str:
|
||||
"""Boucle LLM avec exécution de skills."""
|
||||
response = self.llm.chat(prompt, extra_context=extra_ctx)
|
||||
slot_id = self._llm_slot_acquire()
|
||||
try:
|
||||
response = self.llm.chat(prompt, extra_context=extra_ctx)
|
||||
|
||||
for _ in range(max_steps):
|
||||
skill_call = self.llm.extract_skill_call(response)
|
||||
if not skill_call:
|
||||
break
|
||||
skill_name, args = skill_call
|
||||
skill_result = self.skills.run(skill_name, args, context)
|
||||
logger.info(f"[{self.agent_id}] Skill {skill_name} → {str(skill_result)[:80]}")
|
||||
response = self.llm.chat(
|
||||
f"Résultat du skill '{skill_name}':\n{skill_result}\n\nContinue ou donne ta réponse finale."
|
||||
)
|
||||
for _ in range(max_steps):
|
||||
skill_call = self.llm.extract_skill_call(response)
|
||||
if not skill_call:
|
||||
break
|
||||
skill_name, args = skill_call
|
||||
skill_result = self.skills.run(skill_name, args, context)
|
||||
logger.info(f"[{self.agent_id}] Skill {skill_name} → {str(skill_result)[:80]}")
|
||||
response = self.llm.chat(
|
||||
f"Résultat du skill '{skill_name}':\n{skill_result}\n\nContinue ou donne ta réponse finale."
|
||||
)
|
||||
finally:
|
||||
self._llm_slot_release(slot_id)
|
||||
|
||||
return response
|
||||
|
||||
def _llm_slot_acquire(self) -> Optional[str]:
|
||||
"""
|
||||
Demande un slot LLM au coordinateur Nexus via MQTT.
|
||||
Retourne un slot_id si accordé (à passer à _llm_slot_release).
|
||||
Retourne None si coordination désactivée ou timeout (appel direct).
|
||||
"""
|
||||
if not self.config.get("use_llm_coordinator", False):
|
||||
return None
|
||||
|
||||
import uuid as _uuid
|
||||
slot_id = _uuid.uuid4().hex[:8]
|
||||
grant_topic = f"agents/{self.agent_id}/llm/grant/{slot_id}"
|
||||
granted_evt = threading.Event()
|
||||
|
||||
def _on_grant(msg, topic):
|
||||
granted_evt.set()
|
||||
|
||||
self.mqtt.subscribe(grant_topic, _on_grant)
|
||||
self.mqtt.publish_raw("agents/llm/request", json.dumps({
|
||||
"agent_id": self.agent_id,
|
||||
"grant_topic": grant_topic,
|
||||
}))
|
||||
logger.debug(f"[LLM] Slot demandé au coordinateur (id={slot_id})")
|
||||
|
||||
granted = granted_evt.wait(timeout=90)
|
||||
self.mqtt.unsubscribe(grant_topic)
|
||||
|
||||
if not granted:
|
||||
logger.warning("[LLM] Pas de slot accordé en 90s — appel direct (mode dégradé)")
|
||||
return None
|
||||
|
||||
logger.debug(f"[LLM] Slot accordé (id={slot_id})")
|
||||
return slot_id
|
||||
|
||||
def _llm_slot_release(self, slot_id: Optional[str]):
|
||||
"""Libère le slot LLM auprès du coordinateur."""
|
||||
if slot_id is None:
|
||||
return
|
||||
self.mqtt.publish_raw("agents/llm/release", json.dumps({
|
||||
"agent_id": self.agent_id,
|
||||
"slot_id": slot_id,
|
||||
}))
|
||||
logger.debug(f"[LLM] Slot libéré (id={slot_id})")
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# XMPP
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
@@ -0,0 +1,147 @@
|
||||
"""
|
||||
Coordinateur LLM centralisé — évite de surcharger Ollama.
|
||||
|
||||
Nexus instancie LLMCoordinator (serveur).
|
||||
Les autres agents utilisent le mécanisme MQTT de BaseAgent (client).
|
||||
|
||||
Topics MQTT :
|
||||
agents/llm/request ← agents publient ici pour demander un slot
|
||||
payload : {"agent_id": "...", "grant_topic": "agents/<id>/llm/grant/<uuid>"}
|
||||
agents/llm/release ← agents publient ici quand ils libèrent
|
||||
payload : {"agent_id": "..."}
|
||||
agents/<id>/llm/grant/<uuid> ← nexus publie ici pour accorder le slot
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LLMCoordinator:
|
||||
"""
|
||||
Gestionnaire de slots LLM — s'exécute dans Nexus.
|
||||
|
||||
Garantit qu'au plus `max_concurrent` appels LLM (Nexus compris)
|
||||
sont en cours simultanément sur Ollama.
|
||||
"""
|
||||
|
||||
def __init__(self, mqtt_client, max_concurrent: int = 1):
|
||||
self._mqtt = mqtt_client
|
||||
self._max = max_concurrent
|
||||
self._active = 0
|
||||
self._queue = [] # grant_topics en attente (agents distants)
|
||||
self._local_wait = [] # threading.Event en attente (Nexus local)
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def setup(self):
|
||||
"""Souscrit aux topics MQTT. À appeler après connexion MQTT."""
|
||||
self._mqtt.subscribe("agents/llm/request", self._on_request)
|
||||
self._mqtt.subscribe("agents/llm/release", self._on_release)
|
||||
logger.info(f"[LLMCoordinator] Démarré — max {self._max} concurrent(s)")
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# API locale (Nexus lui-même)
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def local_acquire(self, timeout: float = 120) -> bool:
|
||||
"""
|
||||
Nexus demande un slot directement (sans MQTT).
|
||||
Retourne True si accordé, False si timeout.
|
||||
"""
|
||||
evt = threading.Event()
|
||||
with self._lock:
|
||||
if self._active < self._max:
|
||||
self._active += 1
|
||||
logger.debug(f"[LLMCoordinator] Slot local accordé immédiatement ({self._active}/{self._max})")
|
||||
return True
|
||||
# Slot plein → mise en attente
|
||||
self._local_wait.append(evt)
|
||||
logger.debug(f"[LLMCoordinator] Nexus en attente de slot ({len(self._local_wait)} waiters)")
|
||||
|
||||
granted = evt.wait(timeout=timeout)
|
||||
if not granted:
|
||||
# Timeout : retire de la liste d'attente si encore présent
|
||||
with self._lock:
|
||||
try:
|
||||
self._local_wait.remove(evt)
|
||||
except ValueError:
|
||||
pass
|
||||
logger.warning("[LLMCoordinator] Nexus n'a pas obtenu de slot dans le délai imparti")
|
||||
return granted
|
||||
|
||||
def local_release(self):
|
||||
"""Nexus libère son slot."""
|
||||
with self._lock:
|
||||
self._active -= 1
|
||||
logger.debug(f"[LLMCoordinator] Slot local libéré ({self._active}/{self._max})")
|
||||
self._try_grant()
|
||||
|
||||
# ──────────────────────────────────────────────
|
||||
# Handlers MQTT (agents distants)
|
||||
# ──────────────────────────────────────────────
|
||||
|
||||
def _on_request(self, msg, topic: str):
|
||||
"""Un agent demande un slot."""
|
||||
try:
|
||||
raw = msg if isinstance(msg, str) else (msg.payload if hasattr(msg, 'payload') else str(msg))
|
||||
data = json.loads(raw)
|
||||
grant_topic = data.get("grant_topic")
|
||||
agent_id = data.get("agent_id", "?")
|
||||
if not grant_topic:
|
||||
return
|
||||
except Exception as e:
|
||||
logger.debug(f"[LLMCoordinator] Requête invalide : {e}")
|
||||
return
|
||||
|
||||
with self._lock:
|
||||
self._queue.append((grant_topic, agent_id))
|
||||
logger.debug(f"[LLMCoordinator] Requête de {agent_id} — file : {len(self._queue)}")
|
||||
self._try_grant()
|
||||
|
||||
def _on_release(self, msg, topic: str):
|
||||
"""Un agent libère son slot."""
|
||||
try:
|
||||
raw = msg if isinstance(msg, str) else (msg.payload if hasattr(msg, 'payload') else str(msg))
|
||||
data = json.loads(raw)
|
||||
agent_id = data.get("agent_id", "?")
|
||||
except Exception:
|
||||
agent_id = "?"
|
||||
|
||||
with self._lock:
|
||||
self._active -= 1
|
||||
logger.debug(f"[LLMCoordinator] Slot libéré par {agent_id} ({self._active}/{self._max})")
|
||||
self._try_grant()
|
||||
|
||||
def _try_grant(self):
|
||||
"""
|
||||
Accorde des slots aux suivants dans la file.
|
||||
DOIT être appelé avec self._lock tenu.
|
||||
"""
|
||||
while self._active < self._max:
|
||||
# Priorité aux waiters locaux (Nexus)
|
||||
if self._local_wait:
|
||||
evt = self._local_wait.pop(0)
|
||||
self._active += 1
|
||||
evt.set()
|
||||
logger.debug(f"[LLMCoordinator] Slot accordé à Nexus (local) ({self._active}/{self._max})")
|
||||
elif self._queue:
|
||||
grant_topic, agent_id = self._queue.pop(0)
|
||||
self._active += 1
|
||||
try:
|
||||
self._mqtt.publish_raw(grant_topic, '{"granted": true}')
|
||||
logger.debug(f"[LLMCoordinator] Slot accordé à {agent_id} ({self._active}/{self._max})")
|
||||
except Exception as e:
|
||||
logger.error(f"[LLMCoordinator] Impossible d'accorder le slot à {agent_id} : {e}")
|
||||
self._active -= 1
|
||||
else:
|
||||
break
|
||||
|
||||
def status(self) -> str:
|
||||
"""Résumé pour /status ou /llm."""
|
||||
with self._lock:
|
||||
return (
|
||||
f"LLM coordinator : {self._active}/{self._max} slot(s) actif(s), "
|
||||
f"{len(self._queue)} en attente"
|
||||
)
|
||||
Reference in New Issue
Block a user