add11ee6c1
- Add _handle_script_command() to bypass LLM and call script skill directly - Add 'script' to _handle_system_command routing - Add _llm_slot_acquire() / _llm_slot_release() MQTT semaphore for Ollama coordination Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
720 lines
28 KiB
Python
720 lines
28 KiB
Python
"""
|
|
Classe de base pour tous les agents du système.
|
|
Hériter de BaseAgent et implémenter les méthodes abstraites.
|
|
|
|
Usage minimal :
|
|
class MyAgent(BaseAgent):
|
|
AGENT_TYPE = "debian"
|
|
DESCRIPTION = "Administration système Debian"
|
|
|
|
def get_skills_dir(self) -> str:
|
|
return os.path.join(os.path.dirname(__file__), "skills")
|
|
|
|
if __name__ == "__main__":
|
|
MyAgent().run()
|
|
"""
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from typing import Optional
|
|
|
|
from .capabilities import AgentCapabilities, CapabilitiesRegistry
|
|
from .command_parser import ParsedCommand, parse as parse_command, CommandType, help_text
|
|
from .llm_client import LLMClient
|
|
from .message_bus import Message, MessageType
|
|
from .mqtt_client import MQTTClient
|
|
from .skill_loader import SkillLoader
|
|
from .task_queue import TaskQueue, Task
|
|
from .xmpp_client import XMPPClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AgentContext:
|
|
"""Contexte passé aux skills lors de leur exécution."""
|
|
def __init__(self, agent: "BaseAgent", current_task: Optional[Task] = None,
|
|
current_message: Optional[Message] = None):
|
|
self.agent = agent
|
|
self.task = current_task
|
|
self.message = current_message
|
|
|
|
@property
|
|
def mqtt(self) -> MQTTClient:
|
|
return self.agent.mqtt
|
|
|
|
@property
|
|
def xmpp(self) -> Optional[XMPPClient]:
|
|
return self.agent.xmpp
|
|
|
|
@property
|
|
def llm(self) -> LLMClient:
|
|
return self.agent.llm
|
|
|
|
@property
|
|
def registry(self) -> CapabilitiesRegistry:
|
|
return self.agent.registry
|
|
|
|
@property
|
|
def config(self) -> dict:
|
|
return self.agent.config
|
|
|
|
@property
|
|
def agent_id(self) -> str:
|
|
return self.agent.agent_id
|
|
|
|
|
|
class BaseAgent(ABC):
|
|
"""
|
|
Classe mère de tous les agents.
|
|
Fournit : MQTT, XMPP, LLM, TaskQueue, SkillLoader, CapabilitiesRegistry.
|
|
"""
|
|
|
|
# À surcharger dans chaque agent
|
|
AGENT_TYPE: str = "generic"
|
|
DESCRIPTION: str = "Agent générique"
|
|
DEFAULT_CONFIG_PATH: str = "config/config.json"
|
|
|
|
def __init__(self, config_path: Optional[str] = None):
|
|
self._config_path = config_path or self.DEFAULT_CONFIG_PATH
|
|
self.config = self._load_config(self._config_path)
|
|
self.agent_id: str = self.config["agent_id"]
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format=f"%(asctime)s [{self.agent_id}] %(levelname)s %(message)s",
|
|
)
|
|
|
|
# Composants principaux
|
|
self.mqtt = self._setup_mqtt()
|
|
self.xmpp: Optional[XMPPClient] = self._setup_xmpp()
|
|
self.llm = self._setup_llm()
|
|
self.skills = SkillLoader()
|
|
self.queue = TaskQueue(self.config.get("queue_db", "queue.db"))
|
|
self.registry = CapabilitiesRegistry()
|
|
|
|
# Agents en ligne (mis à jour via MQTT)
|
|
self._online_agents: set[str] = set()
|
|
self._online_lock = threading.Lock()
|
|
self._llm_lock = threading.Lock() # Empêche les appels LLM concurrents
|
|
|
|
self._running = False
|
|
|
|
# ──────────────────────────────────────────────
|
|
# Setup
|
|
# ──────────────────────────────────────────────
|
|
|
|
def _load_config(self, path: str) -> dict:
|
|
if not os.path.exists(path):
|
|
raise FileNotFoundError(f"Config introuvable : {path}")
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
def _setup_mqtt(self) -> MQTTClient:
|
|
mc = self.config.get("mqtt", {})
|
|
return MQTTClient(
|
|
agent_id=self.config["agent_id"],
|
|
broker_host=mc.get("host", "localhost"),
|
|
broker_port=mc.get("port", 1883),
|
|
username=mc.get("username"),
|
|
password=mc.get("password"),
|
|
tls=mc.get("tls", False),
|
|
)
|
|
|
|
def _setup_xmpp(self) -> Optional[XMPPClient]:
|
|
xc = self.config.get("xmpp")
|
|
if not xc:
|
|
return None
|
|
|
|
# Supporte admin_jids (list) et admin_jid (str) pour compatibilité
|
|
admin_jids = xc.get("admin_jids") or []
|
|
if not admin_jids and xc.get("admin_jid"):
|
|
admin_jids = [xc["admin_jid"]]
|
|
|
|
# Répertoire de données (pour le stockage OMEMO)
|
|
data_dir = os.path.dirname(self.config.get("queue_db", "data/queue.db"))
|
|
|
|
return XMPPClient(
|
|
jid=xc["jid"],
|
|
password=xc["password"],
|
|
admin_jids=admin_jids,
|
|
muc_room=xc.get("muc_room"),
|
|
muc_nick=self.config["agent_id"],
|
|
use_omemo=xc.get("use_omemo", False),
|
|
data_dir=data_dir,
|
|
)
|
|
|
|
def _setup_llm(self) -> LLMClient:
|
|
lc = self.config.get("llm", {})
|
|
return LLMClient(
|
|
base_url=lc.get("base_url", "http://localhost:11434"),
|
|
model=lc.get("model", "mistral"),
|
|
temperature=lc.get("temperature", 0.3),
|
|
system_prompt=self._load_system_prompt(),
|
|
)
|
|
|
|
def _load_system_prompt(self) -> str:
|
|
path = self.config.get("system_prompt", "config/system_prompt.txt")
|
|
if os.path.exists(path):
|
|
with open(path) as f:
|
|
return f.read()
|
|
return self._default_system_prompt()
|
|
|
|
def _default_system_prompt(self) -> str:
|
|
return (
|
|
f"Tu es {self.agent_id}, un agent IA de type '{self.AGENT_TYPE}'.\n"
|
|
f"Description : {self.DESCRIPTION}\n"
|
|
f"Tu communiques via XMPP et MQTT. "
|
|
f"Tu dois répondre de façon concise et précise.\n"
|
|
f"Pour exécuter un skill, utilise : SKILL:<nom> ARGS:<arguments>\n"
|
|
)
|
|
|
|
# ──────────────────────────────────────────────
|
|
# Démarrage
|
|
# ──────────────────────────────────────────────
|
|
|
|
def run(self):
|
|
"""Point d'entrée principal de l'agent."""
|
|
logger.info(f"Démarrage de {self.agent_id} ({self.AGENT_TYPE})")
|
|
|
|
# Chargement des skills
|
|
skills_dir = self.get_skills_dir()
|
|
if skills_dir:
|
|
self.skills.load_directory(skills_dir)
|
|
|
|
# Mise à jour du system prompt avec les skills
|
|
if self.skills.list_names():
|
|
extra = self.skills.system_prompt_section()
|
|
self.llm.system_prompt += f"\n\n{extra}"
|
|
|
|
# Connexion MQTT
|
|
self.mqtt.connect()
|
|
self._setup_mqtt_subscriptions()
|
|
|
|
# Publication des capacités
|
|
self._publish_capabilities()
|
|
|
|
# Connexion XMPP
|
|
if self.xmpp:
|
|
self.xmpp.set_message_callback(self._on_xmpp_message)
|
|
self.xmpp.connect_async(on_ready=self.on_xmpp_connected)
|
|
|
|
# Démarrage du worker de tâches
|
|
self.queue.start_worker(self._execute_task)
|
|
|
|
# Hook de démarrage custom
|
|
self.on_start()
|
|
|
|
# Gestion des signaux
|
|
self._running = True
|
|
signal.signal(signal.SIGINT, self._shutdown)
|
|
signal.signal(signal.SIGTERM, self._shutdown)
|
|
|
|
logger.info(f"{self.agent_id} prêt.")
|
|
try:
|
|
while self._running:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
self._shutdown()
|
|
|
|
def _shutdown(self, *args):
|
|
logger.info(f"Arrêt de {self.agent_id}...")
|
|
self._running = False
|
|
self.queue.stop_worker()
|
|
self.mqtt.disconnect()
|
|
if self.xmpp:
|
|
self.xmpp.disconnect()
|
|
sys.exit(0)
|
|
|
|
# ──────────────────────────────────────────────
|
|
# MQTT — souscriptions
|
|
# ──────────────────────────────────────────────
|
|
|
|
def _setup_mqtt_subscriptions(self):
|
|
self.mqtt.subscribe_inbox(self._on_mqtt_task)
|
|
self.mqtt.subscribe_broadcast(self._on_mqtt_broadcast)
|
|
self.mqtt.subscribe_all_capabilities(self._on_capabilities_update)
|
|
self.mqtt.subscribe_all_status(self._on_status_update)
|
|
self.mqtt.subscribe("agents/llm/switch", self._on_llm_switch)
|
|
# Souscriptions custom de l'agent
|
|
self.setup_extra_subscriptions()
|
|
|
|
def _on_mqtt_task(self, msg: Message | str, topic: str):
|
|
"""Réception d'une tâche via MQTT inbox."""
|
|
if isinstance(msg, str):
|
|
# Message brut non structuré
|
|
task_id = self.queue.enqueue(
|
|
payload=msg,
|
|
correlation_id="raw",
|
|
sender="unknown",
|
|
)
|
|
return
|
|
|
|
if msg.type == MessageType.COMMAND:
|
|
self._handle_system_command(msg.payload, source_msg=msg)
|
|
return
|
|
|
|
task_id = self.queue.enqueue(
|
|
payload=msg.payload,
|
|
correlation_id=msg.correlation_id,
|
|
sender=msg.sender,
|
|
reply_to=msg.reply_to,
|
|
)
|
|
logger.info(f"Tâche #{task_id} reçue de {msg.sender}")
|
|
|
|
def _on_mqtt_broadcast(self, msg: Message | str, topic: str):
|
|
"""Réception d'un broadcast — comportement par défaut : ignorer ou logger."""
|
|
if isinstance(msg, Message):
|
|
logger.info(f"Broadcast reçu de {msg.sender}: {str(msg.payload)[:80]}")
|
|
self.on_broadcast(msg)
|
|
|
|
def _on_capabilities_update(self, msg: Message | str, topic: str):
|
|
"""Mise à jour du registre de capacités d'un autre agent."""
|
|
try:
|
|
raw = msg if isinstance(msg, (str, bytes)) else msg.payload
|
|
if isinstance(raw, str):
|
|
caps = self.registry.update_from_json(raw)
|
|
elif isinstance(raw, dict):
|
|
from .capabilities import AgentCapabilities
|
|
import json as _json
|
|
caps = self.registry.update_from_json(_json.dumps(raw))
|
|
else:
|
|
caps = None
|
|
if caps and caps.agent_id != self.agent_id:
|
|
logger.debug(f"Capacités reçues : {caps.agent_id}")
|
|
# Mise à jour du context LLM
|
|
self._refresh_llm_context()
|
|
except Exception as e:
|
|
logger.debug(f"Erreur parsing capabilities: {e}")
|
|
|
|
def _on_llm_switch(self, msg: Message | str, topic: str):
|
|
"""Reçoit un ordre de switch LLM depuis le topic agents/llm/switch."""
|
|
try:
|
|
import json as _json
|
|
raw = msg if isinstance(msg, str) else (msg.payload if isinstance(msg, Message) else str(msg))
|
|
data = _json.loads(raw)
|
|
model = data.get("model")
|
|
profile = data.get("profile", "")
|
|
if not model:
|
|
return
|
|
if self.llm.model == model:
|
|
return # Déjà sur ce modèle
|
|
self.llm.model = model
|
|
self.llm.reset_history()
|
|
self.config["llm"]["model"] = model
|
|
profiles = self.config.setdefault("llm_profiles", {})
|
|
if profile:
|
|
profiles[profile] = model
|
|
self._save_config()
|
|
logger.info(f"[LLM] Modèle changé → {model} (profil: {profile or 'direct'})")
|
|
except Exception as e:
|
|
logger.error(f"[LLM] Erreur switch: {e}")
|
|
|
|
def _save_config(self):
|
|
"""Persiste la config en mémoire dans le fichier JSON."""
|
|
try:
|
|
with open(self._config_path, "w") as f:
|
|
json.dump(self.config, f, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
logger.error(f"Erreur sauvegarde config ({self._config_path}): {e}")
|
|
|
|
def _on_status_update(self, msg: Message | str, topic: str):
|
|
"""Mise à jour du statut d'un agent."""
|
|
try:
|
|
raw = msg if isinstance(msg, (str, bytes)) else msg.payload
|
|
if isinstance(raw, str):
|
|
import json as _json
|
|
data = _json.loads(raw)
|
|
elif isinstance(raw, dict):
|
|
data = raw
|
|
else:
|
|
return
|
|
agent_id = data.get("agent_id")
|
|
status = data.get("status")
|
|
if not agent_id or not status:
|
|
return
|
|
# Ignorer nos propres messages de statut
|
|
if agent_id == self.agent_id:
|
|
return
|
|
with self._online_lock:
|
|
was_online = agent_id in self._online_agents
|
|
if status == "online":
|
|
self._online_agents.add(agent_id)
|
|
else:
|
|
self._online_agents.discard(agent_id)
|
|
is_online = agent_id in self._online_agents
|
|
# Appel du hook seulement si le statut a changé
|
|
if was_online != is_online:
|
|
try:
|
|
self.on_agent_status_change(agent_id, status)
|
|
except Exception as e:
|
|
logger.debug(f"on_agent_status_change error: {e}")
|
|
except Exception:
|
|
pass
|
|
|
|
def on_agent_status_change(self, agent_id: str, status: str):
|
|
"""
|
|
Hook appelé en temps réel quand un agent change de statut.
|
|
status = "online" | "offline"
|
|
À surcharger dans les sous-classes pour réagir aux changements.
|
|
"""
|
|
pass
|
|
|
|
# ──────────────────────────────────────────────
|
|
# Traitement des tâches
|
|
# ──────────────────────────────────────────────
|
|
|
|
def _execute_task(self, task: Task) -> tuple[str, bool]:
|
|
"""Exécute une tâche via le LLM + skills. Retourne (résultat, succès)."""
|
|
context = AgentContext(self, current_task=task)
|
|
|
|
# Enrichir le LLM avec le contexte actuel
|
|
extra_ctx = self.registry.summary_for_llm(self._online_agents)
|
|
|
|
result = self._llm_loop(task.payload, context, extra_ctx)
|
|
|
|
# Renvoyer le résultat à l'expéditeur
|
|
if task.sender and task.sender != "unknown":
|
|
import uuid
|
|
dummy_msg = Message(
|
|
msg_type=MessageType.TASK,
|
|
payload=task.payload,
|
|
sender=task.sender,
|
|
correlation_id=task.correlation_id,
|
|
reply_to=task.reply_to,
|
|
)
|
|
dummy_msg.id = task.correlation_id
|
|
self.mqtt.reply(dummy_msg, result)
|
|
|
|
return result, True
|
|
|
|
def _llm_loop(self, prompt: str, context: AgentContext,
|
|
extra_ctx: Optional[str] = None, max_steps: int = 10) -> str:
|
|
"""Boucle LLM avec exécution de skills."""
|
|
slot_id = self._llm_slot_acquire()
|
|
try:
|
|
response = self.llm.chat(prompt, extra_context=extra_ctx)
|
|
|
|
for _ in range(max_steps):
|
|
skill_call = self.llm.extract_skill_call(response)
|
|
if not skill_call:
|
|
break
|
|
skill_name, args = skill_call
|
|
skill_result = self.skills.run(skill_name, args, context)
|
|
logger.info(f"[{self.agent_id}] Skill {skill_name} → {str(skill_result)[:80]}")
|
|
response = self.llm.chat(
|
|
f"Résultat du skill '{skill_name}':\n{skill_result}\n\nContinue ou donne ta réponse finale."
|
|
)
|
|
finally:
|
|
self._llm_slot_release(slot_id)
|
|
|
|
return response
|
|
|
|
def _llm_slot_acquire(self) -> Optional[str]:
|
|
"""
|
|
Demande un slot LLM au coordinateur Nexus via MQTT.
|
|
Retourne un slot_id si accordé (à passer à _llm_slot_release).
|
|
Retourne None si coordination désactivée ou timeout (appel direct).
|
|
"""
|
|
if not self.config.get("use_llm_coordinator", False):
|
|
return None
|
|
|
|
import uuid as _uuid
|
|
slot_id = _uuid.uuid4().hex[:8]
|
|
grant_topic = f"agents/{self.agent_id}/llm/grant/{slot_id}"
|
|
granted_evt = threading.Event()
|
|
|
|
def _on_grant(msg, topic):
|
|
granted_evt.set()
|
|
|
|
self.mqtt.subscribe(grant_topic, _on_grant)
|
|
self.mqtt.publish_raw("agents/llm/request", json.dumps({
|
|
"agent_id": self.agent_id,
|
|
"grant_topic": grant_topic,
|
|
}))
|
|
logger.debug(f"[LLM] Slot demandé au coordinateur (id={slot_id})")
|
|
|
|
granted = granted_evt.wait(timeout=90)
|
|
self.mqtt.unsubscribe(grant_topic)
|
|
|
|
if not granted:
|
|
logger.warning("[LLM] Pas de slot accordé en 90s — appel direct (mode dégradé)")
|
|
return None
|
|
|
|
logger.debug(f"[LLM] Slot accordé (id={slot_id})")
|
|
return slot_id
|
|
|
|
def _llm_slot_release(self, slot_id: Optional[str]):
|
|
"""Libère le slot LLM auprès du coordinateur."""
|
|
if slot_id is None:
|
|
return
|
|
self.mqtt.publish_raw("agents/llm/release", json.dumps({
|
|
"agent_id": self.agent_id,
|
|
"slot_id": slot_id,
|
|
}))
|
|
logger.debug(f"[LLM] Slot libéré (id={slot_id})")
|
|
|
|
# ──────────────────────────────────────────────
|
|
# XMPP
|
|
# ──────────────────────────────────────────────
|
|
|
|
def _on_xmpp_message(self, sender: str, body: str, is_muc: bool = False):
|
|
"""Traitement des messages XMPP entrants."""
|
|
# Les sub-agents ne traitent pas les messages MUC pour éviter les boucles.
|
|
# Seul Nexus override cette méthode pour gérer le MUC.
|
|
if is_muc:
|
|
return
|
|
cmd = parse_command(body)
|
|
context = AgentContext(self)
|
|
|
|
if cmd.type == CommandType.SYSTEM:
|
|
reply = self._handle_system_command(f"/{cmd.command} {cmd.args}", raw_cmd=cmd)
|
|
if reply and self.xmpp:
|
|
self.xmpp.send_message(sender, reply)
|
|
return
|
|
|
|
if cmd.type == CommandType.DIRECT:
|
|
# @agent_name message → router via MQTT
|
|
reply = self._route_direct_command(cmd)
|
|
if reply and self.xmpp:
|
|
self.xmpp.send_message(sender, reply)
|
|
return
|
|
|
|
if cmd.type == CommandType.BROADCAST:
|
|
msg = self.mqtt.broadcast(cmd.args or "")
|
|
if self.xmpp:
|
|
self.xmpp.send_message(sender, f"Broadcast envoyé à tous les agents.")
|
|
return
|
|
|
|
# Mode naturel → LLM (un seul appel à la fois)
|
|
if not self._llm_lock.acquire(blocking=False):
|
|
if self.xmpp:
|
|
self.xmpp.send_message(sender, "⏳ Je traite déjà une demande, attends un instant.")
|
|
return
|
|
try:
|
|
extra_ctx = self.registry.summary_for_llm(self._online_agents)
|
|
response = self._llm_loop(body, context, extra_ctx)
|
|
if self.xmpp:
|
|
self.xmpp.send_message(sender, response)
|
|
finally:
|
|
self._llm_lock.release()
|
|
|
|
def _route_direct_command(self, cmd: ParsedCommand) -> str:
|
|
"""Route un @agent commande vers l'agent cible via MQTT."""
|
|
target = cmd.target
|
|
message = cmd.args or ""
|
|
|
|
caps = self.registry.get(target)
|
|
if caps is None:
|
|
return f"Agent '{target}' inconnu. Agents connus : {[a.agent_id for a in self.registry.all_agents()]}"
|
|
|
|
self.mqtt.send_to(target, message)
|
|
return f"Message envoyé à {target}."
|
|
|
|
# ──────────────────────────────────────────────
|
|
# Commandes système
|
|
# ──────────────────────────────────────────────
|
|
|
|
def _handle_system_command(self, text: str, source_msg: Optional[Message] = None,
|
|
raw_cmd: Optional[ParsedCommand] = None) -> Optional[str]:
|
|
"""Gère les commandes /xxx."""
|
|
if raw_cmd is None:
|
|
raw_cmd = parse_command(text)
|
|
cmd = raw_cmd.command
|
|
args = (raw_cmd.args or "").strip()
|
|
|
|
if cmd == "help":
|
|
return help_text()
|
|
|
|
if cmd == "pause":
|
|
self.queue.pause()
|
|
return f"[{self.agent_id}] En pause."
|
|
|
|
if cmd == "resume":
|
|
self.queue.resume()
|
|
return f"[{self.agent_id}] Reprise."
|
|
|
|
if cmd == "reset":
|
|
self.llm.reset_history()
|
|
return f"[{self.agent_id}] Historique effacé."
|
|
|
|
if cmd == "status":
|
|
stats = self.queue.daily_stats()
|
|
paused = "OUI" if self.queue.is_paused else "NON"
|
|
return (
|
|
f"[{self.agent_id}] Statut\n"
|
|
f" En pause : {paused}\n"
|
|
f" Tâches aujourd'hui : {stats['total']} "
|
|
f"(OK:{stats['completed']} ERR:{stats['failed']} ATT:{stats['pending']})\n"
|
|
f" Durée moyenne : {stats['avg_duration_s']}s"
|
|
)
|
|
|
|
if cmd == "agents":
|
|
with self._online_lock:
|
|
online = list(self._online_agents)
|
|
all_agents = [a.agent_id for a in self.registry.all_agents()]
|
|
return f"Agents en ligne : {online}\nAgents connus : {all_agents}"
|
|
|
|
if cmd == "script":
|
|
return self._handle_script_command(args, source_msg)
|
|
|
|
if cmd == "update":
|
|
return self._do_self_update(source_msg)
|
|
|
|
# Commandes custom de l'agent
|
|
return self.handle_custom_command(cmd, args, source_msg)
|
|
|
|
# ──────────────────────────────────────────────
|
|
# Commande script directe (bypass LLM)
|
|
# ──────────────────────────────────────────────
|
|
|
|
def _handle_script_command(self, args: str, source_msg: Optional[Message] = None) -> str:
|
|
"""
|
|
Exécute une commande script sans passer par le LLM.
|
|
args : 'exec <nom> [args]' | 'list' | 'show <nom>' | 'run | <contenu>'
|
|
"""
|
|
context = AgentContext(self)
|
|
result = self.skills.run("script", args, context)
|
|
if result is None:
|
|
result = f"[{self.agent_id}] Skill 'script' non disponible."
|
|
|
|
# Renvoi MQTT si la commande vient d'un autre agent
|
|
if source_msg and source_msg.reply_to:
|
|
try:
|
|
self.mqtt.reply(source_msg, result)
|
|
except Exception as e:
|
|
logger.debug(f"[Script] Impossible d'envoyer la réponse : {e}")
|
|
return result
|
|
|
|
# ──────────────────────────────────────────────
|
|
# Mise à jour self
|
|
# ──────────────────────────────────────────────
|
|
|
|
def _get_install_dir(self) -> str:
|
|
"""Répertoire d'installation : parent du dossier config."""
|
|
return os.path.dirname(os.path.dirname(os.path.abspath(self._config_path)))
|
|
|
|
def _get_service_name(self) -> str:
|
|
"""Nom du service systemd : depuis config ou nom du répertoire d'install."""
|
|
return self.config.get("service_name") or os.path.basename(self._get_install_dir())
|
|
|
|
def _do_self_update(self, source_msg: Optional[Message] = None) -> str:
|
|
"""git pull → systemctl restart si des changements sont détectés."""
|
|
import subprocess
|
|
|
|
install_dir = self._get_install_dir()
|
|
service_name = self._get_service_name()
|
|
logger.info(f"[Update] git pull dans {install_dir} (service : {service_name})")
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "pull"],
|
|
cwd=install_dir,
|
|
capture_output=True, text=True, timeout=60,
|
|
)
|
|
except Exception as e:
|
|
msg = f"[{self.agent_id}] ❌ git pull impossible : {e}"
|
|
self._send_update_reply(source_msg, msg)
|
|
return msg
|
|
|
|
if result.returncode != 0:
|
|
msg = f"[{self.agent_id}] ❌ git pull échoué :\n{result.stderr.strip()}"
|
|
self._send_update_reply(source_msg, msg)
|
|
return msg
|
|
|
|
stdout = result.stdout.strip()
|
|
|
|
if "Already up to date" in stdout:
|
|
msg = f"[{self.agent_id}] ✓ Déjà à jour — pas de redémarrage."
|
|
self._send_update_reply(source_msg, msg)
|
|
return msg
|
|
|
|
msg = (
|
|
f"[{self.agent_id}] ✓ Mis à jour :\n{stdout}\n"
|
|
f"Redémarrage du service {service_name}..."
|
|
)
|
|
self._send_update_reply(source_msg, msg)
|
|
|
|
# Restart différé pour laisser le temps au message MQTT d'être envoyé
|
|
def _restart():
|
|
import time
|
|
time.sleep(2)
|
|
try:
|
|
subprocess.run(["systemctl", "restart", service_name], timeout=30, check=True)
|
|
except Exception as e:
|
|
logger.error(f"[Update] Erreur restart {service_name} : {e}")
|
|
|
|
threading.Thread(target=_restart, daemon=True, name="update-restart").start()
|
|
return msg
|
|
|
|
def _send_update_reply(self, source_msg: Optional[Message], body: str):
|
|
"""Renvoie la réponse d'update à Nexus via MQTT si possible."""
|
|
if source_msg and source_msg.reply_to:
|
|
try:
|
|
self.mqtt.reply(source_msg, body)
|
|
except Exception as e:
|
|
logger.debug(f"[Update] Impossible d'envoyer la réponse : {e}")
|
|
|
|
# ──────────────────────────────────────────────
|
|
# Capacités
|
|
# ──────────────────────────────────────────────
|
|
|
|
def _publish_capabilities(self):
|
|
"""Publie les capacités de cet agent (retained)."""
|
|
xmpp_cfg = self.config.get("xmpp", {})
|
|
caps = AgentCapabilities(
|
|
agent_id=self.agent_id,
|
|
agent_type=self.AGENT_TYPE,
|
|
description=self.DESCRIPTION,
|
|
skills=self.skills.capabilities_summary(),
|
|
xmpp_jid=xmpp_cfg.get("jid"),
|
|
xmpp_muc=xmpp_cfg.get("muc_room"),
|
|
mqtt_inbox=self.mqtt.topic_inbox(),
|
|
can_send_xmpp=self.xmpp is not None,
|
|
can_send_mqtt=True,
|
|
work_hours=self.config.get("work_hours", "00:00-23:59"),
|
|
)
|
|
self.mqtt.publish_capabilities(caps.to_dict())
|
|
# S'enregistre aussi dans son propre registre
|
|
self.registry.update(caps)
|
|
logger.info(f"Capacités publiées : {len(caps.skills)} skill(s)")
|
|
|
|
def _refresh_llm_context(self):
|
|
"""Met à jour le context LLM avec les agents connus."""
|
|
# Injecté dynamiquement à chaque appel LLM via extra_context
|
|
pass
|
|
|
|
# ──────────────────────────────────────────────
|
|
# Méthodes à implémenter / surcharger
|
|
# ──────────────────────────────────────────────
|
|
|
|
@abstractmethod
|
|
def get_skills_dir(self) -> Optional[str]:
|
|
"""Retourne le chemin vers le dossier skills de cet agent."""
|
|
...
|
|
|
|
def on_start(self):
|
|
"""Hook appelé après le démarrage complet. Surcharger si besoin."""
|
|
pass
|
|
|
|
def on_xmpp_connected(self):
|
|
"""Hook appelé une fois la connexion XMPP établie. Surcharger si besoin."""
|
|
pass
|
|
|
|
def on_broadcast(self, msg: Message):
|
|
"""Hook appelé à la réception d'un broadcast. Surcharger si besoin."""
|
|
pass
|
|
|
|
def setup_extra_subscriptions(self):
|
|
"""Souscriptions MQTT supplémentaires. Surcharger si besoin."""
|
|
pass
|
|
|
|
def handle_custom_command(self, cmd: str, args: str,
|
|
source_msg: Optional[Message] = None) -> Optional[str]:
|
|
"""Commandes /xxx non reconnues par la base. Surcharger si besoin."""
|
|
return f"Commande inconnue : /{cmd}. Tape /help pour l'aide."
|