From d390d8a07ab84c2feba028319662f95c27b21d26 Mon Sep 17 00:00:00 2001 From: sylvain Date: Mon, 9 Mar 2026 13:03:30 +0000 Subject: [PATCH] Fix real-time agent status tracking and add status hooks - Fix root cause: Message.from_json was parsing status/capabilities payloads (plain JSON dicts) as Message objects with empty payload, causing _on_status_update to silently fail. Now plain JSON payloads (no 'type'/'sender' keys) are passed as raw strings to callbacks. - _on_status_update: ignore own status, call on_agent_status_change hook only when status actually changes (was_online != is_online) - Add on_agent_status_change(agent_id, status) hook in BaseAgent - Add on_xmpp_connected() hook in BaseAgent - XMPPClient.connect_async() accepts on_ready callback Co-Authored-By: Claude Sonnet 4.6 --- agents_core/base_agent.py | 38 +- agents_core/mqtt_client.py | 17 +- agents_core/xmpp_client.py | 12 +- build/lib/agents_core/__init__.py | 25 ++ build/lib/agents_core/base_agent.py | 501 ++++++++++++++++++++++++ build/lib/agents_core/capabilities.py | 108 +++++ build/lib/agents_core/command_parser.py | 108 +++++ build/lib/agents_core/llm_client.py | 95 +++++ build/lib/agents_core/message_bus.py | 86 ++++ build/lib/agents_core/mqtt_client.py | 247 ++++++++++++ build/lib/agents_core/skill_loader.py | 99 +++++ build/lib/agents_core/task_queue.py | 183 +++++++++ build/lib/agents_core/xmpp_client.py | 229 +++++++++++ 13 files changed, 1735 insertions(+), 13 deletions(-) create mode 100644 build/lib/agents_core/__init__.py create mode 100644 build/lib/agents_core/base_agent.py create mode 100644 build/lib/agents_core/capabilities.py create mode 100644 build/lib/agents_core/command_parser.py create mode 100644 build/lib/agents_core/llm_client.py create mode 100644 build/lib/agents_core/message_bus.py create mode 100644 build/lib/agents_core/mqtt_client.py create mode 100644 build/lib/agents_core/skill_loader.py create mode 100644 build/lib/agents_core/task_queue.py create mode 100644 build/lib/agents_core/xmpp_client.py diff --git a/agents_core/base_agent.py b/agents_core/base_agent.py index b73ed1d..6598974 100644 --- a/agents_core/base_agent.py +++ b/agents_core/base_agent.py @@ -195,7 +195,7 @@ class BaseAgent(ABC): # Connexion XMPP if self.xmpp: self.xmpp.set_message_callback(self._on_xmpp_message) - self.xmpp.connect_async() + self.xmpp.connect_async(on_ready=self.on_xmpp_connected) # Démarrage du worker de tâches self.queue.start_worker(self._execute_task) @@ -297,15 +297,35 @@ class BaseAgent(ABC): return agent_id = data.get("agent_id") status = data.get("status") - if agent_id and status: - with self._online_lock: - if status == "online": - self._online_agents.add(agent_id) - else: - self._online_agents.discard(agent_id) + if not agent_id or not status: + return + # Ignorer nos propres messages de statut + if agent_id == self.agent_id: + return + with self._online_lock: + was_online = agent_id in self._online_agents + if status == "online": + self._online_agents.add(agent_id) + else: + self._online_agents.discard(agent_id) + is_online = agent_id in self._online_agents + # Appel du hook seulement si le statut a changé + if was_online != is_online: + try: + self.on_agent_status_change(agent_id, status) + except Exception as e: + logger.debug(f"on_agent_status_change error: {e}") except Exception: pass + def on_agent_status_change(self, agent_id: str, status: str): + """ + Hook appelé en temps réel quand un agent change de statut. + status = "online" | "offline" + À surcharger dans les sous-classes pour réagir aux changements. + """ + pass + # ────────────────────────────────────────────── # Traitement des tâches # ────────────────────────────────────────────── @@ -487,6 +507,10 @@ class BaseAgent(ABC): """Hook appelé après le démarrage complet. Surcharger si besoin.""" pass + def on_xmpp_connected(self): + """Hook appelé une fois la connexion XMPP établie. Surcharger si besoin.""" + pass + def on_broadcast(self, msg: Message): """Hook appelé à la réception d'un broadcast. Surcharger si besoin.""" pass diff --git a/agents_core/mqtt_client.py b/agents_core/mqtt_client.py index 0d3bc38..929332e 100644 --- a/agents_core/mqtt_client.py +++ b/agents_core/mqtt_client.py @@ -131,15 +131,24 @@ class MQTTClient: return try: - msg = Message.from_json(mqtt_msg.payload) - callback(msg, topic) - except Exception as e: + raw_str = mqtt_msg.payload.decode() + # Tente de parser comme Message structuré seulement si + # le JSON contient les champs attendus (type ou sender) + parsed = json.loads(raw_str) + if isinstance(parsed, dict) and ("type" in parsed or "sender" in parsed): + msg = Message.from_json(raw_str) + callback(msg, topic) + else: + # Payload JSON simple (ex: statut, capacités) → passe la string brute + callback(raw_str, topic) + except (json.JSONDecodeError, UnicodeDecodeError): # Payload non-JSON (ex: commande shell brute) - logger.debug(f"[{self.agent_id}] Payload non-JSON sur {topic}: {e}") try: callback(mqtt_msg.payload.decode(), topic) except Exception as e2: logger.error(f"[{self.agent_id}] Erreur callback sur {topic}: {e2}") + except Exception as e: + logger.error(f"[{self.agent_id}] Erreur callback sur {topic}: {e}") @staticmethod def _topic_matches(pattern: str, topic: str) -> bool: diff --git a/agents_core/xmpp_client.py b/agents_core/xmpp_client.py index 3a7605b..c0e0674 100644 --- a/agents_core/xmpp_client.py +++ b/agents_core/xmpp_client.py @@ -70,8 +70,11 @@ class XMPPClient: bare = jid.lower().split("/")[0] return bare in self.admin_jids - def connect_async(self): - """Connexion XMPP dans un thread dédié.""" + def connect_async(self, on_ready: Optional[Callable] = None): + """Connexion XMPP dans un thread dédié. + on_ready() est appelé une fois la connexion établie. + """ + self._on_ready_cb = on_ready self._client = _SlixClient( jid=self.jid, password=self.password, @@ -96,6 +99,11 @@ class XMPPClient: logger.info(f"[XMPP] Groupe rejoint : {self.muc_room}") if self.admin_jids: logger.info(f"[XMPP] Admins autorisés : {', '.join(sorted(self.admin_jids))}") + if self._on_ready_cb: + try: + self._on_ready_cb() + except Exception as e: + logger.error(f"[XMPP] Erreur on_ready callback : {e}") else: logger.warning("[XMPP] Timeout connexion") diff --git a/build/lib/agents_core/__init__.py b/build/lib/agents_core/__init__.py new file mode 100644 index 0000000..cbc9110 --- /dev/null +++ b/build/lib/agents_core/__init__.py @@ -0,0 +1,25 @@ +""" +agents_core — Librairie commune pour tous les agents du système. +""" +from .base_agent import BaseAgent, AgentContext +from .message_bus import Message, MessageType +from .mqtt_client import MQTTClient +from .xmpp_client import XMPPClient +from .llm_client import LLMClient +from .task_queue import TaskQueue, Task, TaskStatus +from .skill_loader import SkillLoader, Skill +from .capabilities import AgentCapabilities, CapabilitiesRegistry +from .command_parser import parse as parse_command, ParsedCommand, CommandType + +__version__ = "2.0.0" +__all__ = [ + "BaseAgent", "AgentContext", + "Message", "MessageType", + "MQTTClient", + "XMPPClient", + "LLMClient", + "TaskQueue", "Task", "TaskStatus", + "SkillLoader", "Skill", + "AgentCapabilities", "CapabilitiesRegistry", + "parse_command", "ParsedCommand", "CommandType", +] diff --git a/build/lib/agents_core/base_agent.py b/build/lib/agents_core/base_agent.py new file mode 100644 index 0000000..b73ed1d --- /dev/null +++ b/build/lib/agents_core/base_agent.py @@ -0,0 +1,501 @@ +""" +Classe de base pour tous les agents du système. +Hériter de BaseAgent et implémenter les méthodes abstraites. + +Usage minimal : + class MyAgent(BaseAgent): + AGENT_TYPE = "debian" + DESCRIPTION = "Administration système Debian" + + def get_skills_dir(self) -> str: + return os.path.join(os.path.dirname(__file__), "skills") + + if __name__ == "__main__": + MyAgent().run() +""" +import json +import logging +import os +import signal +import sys +import threading +import time +from abc import ABC, abstractmethod +from typing import Optional + +from .capabilities import AgentCapabilities, CapabilitiesRegistry +from .command_parser import ParsedCommand, parse as parse_command, CommandType, help_text +from .llm_client import LLMClient +from .message_bus import Message, MessageType +from .mqtt_client import MQTTClient +from .skill_loader import SkillLoader +from .task_queue import TaskQueue, Task +from .xmpp_client import XMPPClient + +logger = logging.getLogger(__name__) + + +class AgentContext: + """Contexte passé aux skills lors de leur exécution.""" + def __init__(self, agent: "BaseAgent", current_task: Optional[Task] = None, + current_message: Optional[Message] = None): + self.agent = agent + self.task = current_task + self.message = current_message + + @property + def mqtt(self) -> MQTTClient: + return self.agent.mqtt + + @property + def xmpp(self) -> Optional[XMPPClient]: + return self.agent.xmpp + + @property + def llm(self) -> LLMClient: + return self.agent.llm + + @property + def registry(self) -> CapabilitiesRegistry: + return self.agent.registry + + @property + def config(self) -> dict: + return self.agent.config + + @property + def agent_id(self) -> str: + return self.agent.agent_id + + +class BaseAgent(ABC): + """ + Classe mère de tous les agents. + Fournit : MQTT, XMPP, LLM, TaskQueue, SkillLoader, CapabilitiesRegistry. + """ + + # À surcharger dans chaque agent + AGENT_TYPE: str = "generic" + DESCRIPTION: str = "Agent générique" + DEFAULT_CONFIG_PATH: str = "config/config.json" + + def __init__(self, config_path: Optional[str] = None): + self.config = self._load_config(config_path or self.DEFAULT_CONFIG_PATH) + self.agent_id: str = self.config["agent_id"] + + logging.basicConfig( + level=logging.INFO, + format=f"%(asctime)s [{self.agent_id}] %(levelname)s %(message)s", + ) + + # Composants principaux + self.mqtt = self._setup_mqtt() + self.xmpp: Optional[XMPPClient] = self._setup_xmpp() + self.llm = self._setup_llm() + self.skills = SkillLoader() + self.queue = TaskQueue(self.config.get("queue_db", "queue.db")) + self.registry = CapabilitiesRegistry() + + # Agents en ligne (mis à jour via MQTT) + self._online_agents: set[str] = set() + self._online_lock = threading.Lock() + + self._running = False + + # ────────────────────────────────────────────── + # Setup + # ────────────────────────────────────────────── + + def _load_config(self, path: str) -> dict: + if not os.path.exists(path): + raise FileNotFoundError(f"Config introuvable : {path}") + with open(path) as f: + return json.load(f) + + def _setup_mqtt(self) -> MQTTClient: + mc = self.config.get("mqtt", {}) + return MQTTClient( + agent_id=self.config["agent_id"], + broker_host=mc.get("host", "localhost"), + broker_port=mc.get("port", 1883), + username=mc.get("username"), + password=mc.get("password"), + tls=mc.get("tls", False), + ) + + def _setup_xmpp(self) -> Optional[XMPPClient]: + xc = self.config.get("xmpp") + if not xc: + return None + + # Supporte admin_jids (list) et admin_jid (str) pour compatibilité + admin_jids = xc.get("admin_jids") or [] + if not admin_jids and xc.get("admin_jid"): + admin_jids = [xc["admin_jid"]] + + return XMPPClient( + jid=xc["jid"], + password=xc["password"], + admin_jids=admin_jids, + muc_room=xc.get("muc_room"), + muc_nick=self.config["agent_id"], + use_omemo=xc.get("use_omemo", False), + ) + + def _setup_llm(self) -> LLMClient: + lc = self.config.get("llm", {}) + return LLMClient( + base_url=lc.get("base_url", "http://localhost:11434"), + model=lc.get("model", "mistral"), + temperature=lc.get("temperature", 0.3), + system_prompt=self._load_system_prompt(), + ) + + def _load_system_prompt(self) -> str: + path = self.config.get("system_prompt", "config/system_prompt.txt") + if os.path.exists(path): + with open(path) as f: + return f.read() + return self._default_system_prompt() + + def _default_system_prompt(self) -> str: + return ( + f"Tu es {self.agent_id}, un agent IA de type '{self.AGENT_TYPE}'.\n" + f"Description : {self.DESCRIPTION}\n" + f"Tu communiques via XMPP et MQTT. " + f"Tu dois répondre de façon concise et précise.\n" + f"Pour exécuter un skill, utilise : SKILL: ARGS:\n" + ) + + # ────────────────────────────────────────────── + # Démarrage + # ────────────────────────────────────────────── + + def run(self): + """Point d'entrée principal de l'agent.""" + logger.info(f"Démarrage de {self.agent_id} ({self.AGENT_TYPE})") + + # Chargement des skills + skills_dir = self.get_skills_dir() + if skills_dir: + self.skills.load_directory(skills_dir) + + # Mise à jour du system prompt avec les skills + if self.skills.list_names(): + extra = self.skills.system_prompt_section() + self.llm.system_prompt += f"\n\n{extra}" + + # Connexion MQTT + self.mqtt.connect() + self._setup_mqtt_subscriptions() + + # Publication des capacités + self._publish_capabilities() + + # Connexion XMPP + if self.xmpp: + self.xmpp.set_message_callback(self._on_xmpp_message) + self.xmpp.connect_async() + + # Démarrage du worker de tâches + self.queue.start_worker(self._execute_task) + + # Hook de démarrage custom + self.on_start() + + # Gestion des signaux + self._running = True + signal.signal(signal.SIGINT, self._shutdown) + signal.signal(signal.SIGTERM, self._shutdown) + + logger.info(f"{self.agent_id} prêt.") + try: + while self._running: + time.sleep(1) + except KeyboardInterrupt: + self._shutdown() + + def _shutdown(self, *args): + logger.info(f"Arrêt de {self.agent_id}...") + self._running = False + self.queue.stop_worker() + self.mqtt.disconnect() + if self.xmpp: + self.xmpp.disconnect() + sys.exit(0) + + # ────────────────────────────────────────────── + # MQTT — souscriptions + # ────────────────────────────────────────────── + + def _setup_mqtt_subscriptions(self): + self.mqtt.subscribe_inbox(self._on_mqtt_task) + self.mqtt.subscribe_broadcast(self._on_mqtt_broadcast) + self.mqtt.subscribe_all_capabilities(self._on_capabilities_update) + self.mqtt.subscribe_all_status(self._on_status_update) + # Souscriptions custom de l'agent + self.setup_extra_subscriptions() + + def _on_mqtt_task(self, msg: Message | str, topic: str): + """Réception d'une tâche via MQTT inbox.""" + if isinstance(msg, str): + # Message brut non structuré + task_id = self.queue.enqueue( + payload=msg, + correlation_id="raw", + sender="unknown", + ) + return + + if msg.type == MessageType.COMMAND: + self._handle_system_command(msg.payload, source_msg=msg) + return + + task_id = self.queue.enqueue( + payload=msg.payload, + correlation_id=msg.correlation_id, + sender=msg.sender, + reply_to=msg.reply_to, + ) + logger.info(f"Tâche #{task_id} reçue de {msg.sender}") + + def _on_mqtt_broadcast(self, msg: Message | str, topic: str): + """Réception d'un broadcast — comportement par défaut : ignorer ou logger.""" + if isinstance(msg, Message): + logger.info(f"Broadcast reçu de {msg.sender}: {str(msg.payload)[:80]}") + self.on_broadcast(msg) + + def _on_capabilities_update(self, msg: Message | str, topic: str): + """Mise à jour du registre de capacités d'un autre agent.""" + try: + raw = msg if isinstance(msg, (str, bytes)) else msg.payload + if isinstance(raw, str): + caps = self.registry.update_from_json(raw) + elif isinstance(raw, dict): + from .capabilities import AgentCapabilities + import json as _json + caps = self.registry.update_from_json(_json.dumps(raw)) + else: + caps = None + if caps and caps.agent_id != self.agent_id: + logger.debug(f"Capacités reçues : {caps.agent_id}") + # Mise à jour du context LLM + self._refresh_llm_context() + except Exception as e: + logger.debug(f"Erreur parsing capabilities: {e}") + + def _on_status_update(self, msg: Message | str, topic: str): + """Mise à jour du statut d'un agent.""" + try: + raw = msg if isinstance(msg, (str, bytes)) else msg.payload + if isinstance(raw, str): + import json as _json + data = _json.loads(raw) + elif isinstance(raw, dict): + data = raw + else: + return + agent_id = data.get("agent_id") + status = data.get("status") + if agent_id and status: + with self._online_lock: + if status == "online": + self._online_agents.add(agent_id) + else: + self._online_agents.discard(agent_id) + except Exception: + pass + + # ────────────────────────────────────────────── + # Traitement des tâches + # ────────────────────────────────────────────── + + def _execute_task(self, task: Task) -> tuple[str, bool]: + """Exécute une tâche via le LLM + skills. Retourne (résultat, succès).""" + context = AgentContext(self, current_task=task) + + # Enrichir le LLM avec le contexte actuel + extra_ctx = self.registry.summary_for_llm(self._online_agents) + + result = self._llm_loop(task.payload, context, extra_ctx) + + # Renvoyer le résultat à l'expéditeur + if task.sender and task.sender != "unknown": + import uuid + dummy_msg = Message( + msg_type=MessageType.TASK, + payload=task.payload, + sender=task.sender, + correlation_id=task.correlation_id, + reply_to=task.reply_to, + ) + dummy_msg.id = task.correlation_id + self.mqtt.reply(dummy_msg, result) + + return result, True + + def _llm_loop(self, prompt: str, context: AgentContext, + extra_ctx: Optional[str] = None, max_steps: int = 10) -> str: + """Boucle LLM avec exécution de skills.""" + response = self.llm.chat(prompt, extra_context=extra_ctx) + + for _ in range(max_steps): + skill_call = self.llm.extract_skill_call(response) + if not skill_call: + break + skill_name, args = skill_call + skill_result = self.skills.run(skill_name, args, context) + logger.info(f"[{self.agent_id}] Skill {skill_name} → {str(skill_result)[:80]}") + response = self.llm.chat( + f"Résultat du skill '{skill_name}':\n{skill_result}\n\nContinue ou donne ta réponse finale." + ) + + return response + + # ────────────────────────────────────────────── + # XMPP + # ────────────────────────────────────────────── + + def _on_xmpp_message(self, sender: str, body: str, is_muc: bool = False): + """Traitement des messages XMPP entrants.""" + cmd = parse_command(body) + context = AgentContext(self) + + if cmd.type == CommandType.SYSTEM: + reply = self._handle_system_command(f"/{cmd.command} {cmd.args}", raw_cmd=cmd) + if reply and self.xmpp: + self.xmpp.send_message(sender, reply) + return + + if cmd.type == CommandType.DIRECT: + # @agent_name message → router via MQTT + reply = self._route_direct_command(cmd) + if reply and self.xmpp: + self.xmpp.send_message(sender, reply) + return + + if cmd.type == CommandType.BROADCAST: + msg = self.mqtt.broadcast(cmd.args or "") + if self.xmpp: + self.xmpp.send_message(sender, f"Broadcast envoyé à tous les agents.") + return + + # Mode naturel → LLM + extra_ctx = self.registry.summary_for_llm(self._online_agents) + response = self._llm_loop(body, context, extra_ctx) + if self.xmpp: + self.xmpp.send_message(sender, response) + + def _route_direct_command(self, cmd: ParsedCommand) -> str: + """Route un @agent commande vers l'agent cible via MQTT.""" + target = cmd.target + message = cmd.args or "" + + caps = self.registry.get(target) + if caps is None: + return f"Agent '{target}' inconnu. Agents connus : {[a.agent_id for a in self.registry.all_agents()]}" + + self.mqtt.send_to(target, message) + return f"Message envoyé à {target}." + + # ────────────────────────────────────────────── + # Commandes système + # ────────────────────────────────────────────── + + def _handle_system_command(self, text: str, source_msg: Optional[Message] = None, + raw_cmd: Optional[ParsedCommand] = None) -> Optional[str]: + """Gère les commandes /xxx.""" + if raw_cmd is None: + raw_cmd = parse_command(text) + cmd = raw_cmd.command + args = (raw_cmd.args or "").strip() + + if cmd == "help": + return help_text() + + if cmd == "pause": + self.queue.pause() + return f"[{self.agent_id}] En pause." + + if cmd == "resume": + self.queue.resume() + return f"[{self.agent_id}] Reprise." + + if cmd == "reset": + self.llm.reset_history() + return f"[{self.agent_id}] Historique effacé." + + if cmd == "status": + stats = self.queue.daily_stats() + paused = "OUI" if self.queue.is_paused else "NON" + return ( + f"[{self.agent_id}] Statut\n" + f" En pause : {paused}\n" + f" Tâches aujourd'hui : {stats['total']} " + f"(OK:{stats['completed']} ERR:{stats['failed']} ATT:{stats['pending']})\n" + f" Durée moyenne : {stats['avg_duration_s']}s" + ) + + if cmd == "agents": + with self._online_lock: + online = list(self._online_agents) + all_agents = [a.agent_id for a in self.registry.all_agents()] + return f"Agents en ligne : {online}\nAgents connus : {all_agents}" + + # Commandes custom de l'agent + return self.handle_custom_command(cmd, args, source_msg) + + # ────────────────────────────────────────────── + # Capacités + # ────────────────────────────────────────────── + + def _publish_capabilities(self): + """Publie les capacités de cet agent (retained).""" + xmpp_cfg = self.config.get("xmpp", {}) + caps = AgentCapabilities( + agent_id=self.agent_id, + agent_type=self.AGENT_TYPE, + description=self.DESCRIPTION, + skills=self.skills.capabilities_summary(), + xmpp_jid=xmpp_cfg.get("jid"), + xmpp_muc=xmpp_cfg.get("muc_room"), + mqtt_inbox=self.mqtt.topic_inbox(), + can_send_xmpp=self.xmpp is not None, + can_send_mqtt=True, + work_hours=self.config.get("work_hours", "00:00-23:59"), + ) + self.mqtt.publish_capabilities(caps.to_dict()) + # S'enregistre aussi dans son propre registre + self.registry.update(caps) + logger.info(f"Capacités publiées : {len(caps.skills)} skill(s)") + + def _refresh_llm_context(self): + """Met à jour le context LLM avec les agents connus.""" + # Injecté dynamiquement à chaque appel LLM via extra_context + pass + + # ────────────────────────────────────────────── + # Méthodes à implémenter / surcharger + # ────────────────────────────────────────────── + + @abstractmethod + def get_skills_dir(self) -> Optional[str]: + """Retourne le chemin vers le dossier skills de cet agent.""" + ... + + def on_start(self): + """Hook appelé après le démarrage complet. Surcharger si besoin.""" + pass + + def on_broadcast(self, msg: Message): + """Hook appelé à la réception d'un broadcast. Surcharger si besoin.""" + pass + + def setup_extra_subscriptions(self): + """Souscriptions MQTT supplémentaires. Surcharger si besoin.""" + pass + + def handle_custom_command(self, cmd: str, args: str, + source_msg: Optional[Message] = None) -> Optional[str]: + """Commandes /xxx non reconnues par la base. Surcharger si besoin.""" + return f"Commande inconnue : /{cmd}. Tape /help pour l'aide." diff --git a/build/lib/agents_core/capabilities.py b/build/lib/agents_core/capabilities.py new file mode 100644 index 0000000..9b60077 --- /dev/null +++ b/build/lib/agents_core/capabilities.py @@ -0,0 +1,108 @@ +""" +Gestion des capacités — chaque agent sait ce qu'il peut faire +et connaît les autres agents via les messages MQTT retained. +""" +import json +import logging +import threading +from dataclasses import dataclass, field, asdict +from typing import Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class AgentCapabilities: + agent_id: str + agent_type: str # nexus | debian | ansible | deploy | custom + description: str + skills: list[dict] # [{"name": ..., "description": ..., "usage": ...}] + xmpp_jid: Optional[str] = None + xmpp_muc: Optional[str] = None + mqtt_inbox: str = "" + can_send_xmpp: bool = False + can_send_mqtt: bool = True + work_hours: str = "00:00-23:59" + version: str = "2.0" + + def to_dict(self) -> dict: + return asdict(self) + + def to_json(self) -> str: + return json.dumps(self.to_dict(), ensure_ascii=False) + + @classmethod + def from_json(cls, data: str | bytes) -> "AgentCapabilities": + d = json.loads(data) + return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) + + def summary_for_llm(self) -> str: + """Résumé compact pour injection dans le system prompt.""" + skill_names = [s["name"] for s in self.skills] + xmpp = f", XMPP: {self.xmpp_jid}" if self.xmpp_jid else "" + return ( + f" [{self.agent_id}] ({self.agent_type}) — {self.description}\n" + f" Skills: {', '.join(skill_names) or 'aucun'}\n" + f" Inbox MQTT: {self.mqtt_inbox}{xmpp}\n" + f" Horaires: {self.work_hours}" + ) + + +class CapabilitiesRegistry: + """ + Registre local des capacités de tous les agents connus. + Mis à jour en temps réel via les messages MQTT retained. + """ + def __init__(self): + self._agents: dict[str, AgentCapabilities] = {} + self._lock = threading.Lock() + + def update(self, caps: AgentCapabilities): + with self._lock: + self._agents[caps.agent_id] = caps + logger.debug(f"[Registry] Capacités mises à jour pour {caps.agent_id}") + + def update_from_json(self, data: str | bytes) -> Optional[AgentCapabilities]: + if not data or (isinstance(data, (str, bytes)) and not data.strip()): + return None # Empty retained message — ignore silently + try: + caps = AgentCapabilities.from_json(data) + self.update(caps) + return caps + except Exception as e: + logger.error(f"[Registry] Erreur parsing capacités: {e}") + return None + + def get(self, agent_id: str) -> Optional[AgentCapabilities]: + with self._lock: + return self._agents.get(agent_id) + + def all_agents(self) -> list[AgentCapabilities]: + with self._lock: + return list(self._agents.values()) + + def online_agents(self, online_ids: set[str]) -> list[AgentCapabilities]: + with self._lock: + return [a for a in self._agents.values() if a.agent_id in online_ids] + + def summary_for_llm(self, online_ids: Optional[set[str]] = None) -> str: + """Génère la section du system prompt listant les agents connus.""" + with self._lock: + agents = self._agents.values() + if not agents: + return "Aucun agent enregistré." + lines = ["## Agents disponibles\n"] + for a in agents: + status = "" + if online_ids is not None: + status = " [EN LIGNE]" if a.agent_id in online_ids else " [HORS LIGNE]" + lines.append(f"{a.summary_for_llm()}{status}") + return "\n".join(lines) + + def find_capable_agent(self, skill_name: str) -> Optional[AgentCapabilities]: + """Trouve un agent capable d'exécuter un skill donné.""" + with self._lock: + for agent in self._agents.values(): + if any(s["name"] == skill_name for s in agent.skills): + return agent + return None diff --git a/build/lib/agents_core/command_parser.py b/build/lib/agents_core/command_parser.py new file mode 100644 index 0000000..2a3973c --- /dev/null +++ b/build/lib/agents_core/command_parser.py @@ -0,0 +1,108 @@ +""" +Parseur de commandes — supporte deux modes : + - Mode commande rapide : @agent message ou /commande [args] + - Mode naturel : tout le reste → LLM + +Syntaxe : + @agent_name → message direct à l'agent nommé + @all → broadcast à tous les agents + /pause [agent] → mettre en pause un agent (ou tous) + /resume [agent] → reprendre un agent (ou tous) + /status [agent] → statut d'un agent + /reset [agent] → effacer l'historique LLM + /schedule ... → gérer les tâches planifiées + /help → aide +""" +from dataclasses import dataclass +from typing import Optional + + +class CommandType: + NATURAL = "natural" # Traitement par le LLM + DIRECT = "direct" # @agent message + BROADCAST = "broadcast" # @all message + SYSTEM = "system" # /commande + + +@dataclass +class ParsedCommand: + type: str + target: Optional[str] # agent_id pour DIRECT, None pour BROADCAST/SYSTEM/NATURAL + command: Optional[str] # nom de la commande système + args: Optional[str] # arguments + raw: str # texte original + + +def parse(text: str) -> ParsedCommand: + """Parse un message entrant et retourne sa nature.""" + text = text.strip() + + # ── Mode commande directe : @agent_name message + if text.startswith("@"): + parts = text[1:].split(None, 1) + target = parts[0].lower() + message = parts[1] if len(parts) > 1 else "" + if target == "all": + return ParsedCommand( + type=CommandType.BROADCAST, + target=None, + command=None, + args=message, + raw=text, + ) + return ParsedCommand( + type=CommandType.DIRECT, + target=target, + command=None, + args=message, + raw=text, + ) + + # ── Mode commande système : /commande [args] + if text.startswith("/"): + parts = text[1:].split(None, 1) + command = parts[0].lower() + args = parts[1] if len(parts) > 1 else "" + return ParsedCommand( + type=CommandType.SYSTEM, + target=None, + command=command, + args=args, + raw=text, + ) + + # ── Mode naturel — tout le reste + return ParsedCommand( + type=CommandType.NATURAL, + target=None, + command=None, + args=text, + raw=text, + ) + + +# Commandes système reconnues et leur aide +SYSTEM_COMMANDS_HELP = { + "pause": "/pause [agent] — Mettre en pause un agent ou tous", + "resume": "/resume [agent] — Reprendre un agent ou tous", + "status": "/status [agent] — Voir le statut des agents", + "reset": "/reset [agent] — Effacer l'historique LLM", + "schedule": "/schedule ... — Gérer les tâches planifiées", + "agents": "/agents — Lister les agents en ligne", + "help": "/help — Afficher cette aide", +} + + +def help_text() -> str: + lines = [ + "── Aide commandes ──────────────────", + "Mode direct :", + " @ → Envoyer un message à un agent", + " @all → Broadcast à tous les agents", + "", + "Mode système :", + ] + lines.extend(f" {v}" for v in SYSTEM_COMMANDS_HELP.values()) + lines.append("") + lines.append("Mode naturel : écris simplement ce que tu veux faire.") + return "\n".join(lines) diff --git a/build/lib/agents_core/llm_client.py b/build/lib/agents_core/llm_client.py new file mode 100644 index 0000000..3fe64c2 --- /dev/null +++ b/build/lib/agents_core/llm_client.py @@ -0,0 +1,95 @@ +""" +Wrapper Ollama — interface unifiée pour le LLM local. +""" +import json +import logging +import requests +from typing import Optional + +logger = logging.getLogger(__name__) + +DEFAULT_TIMEOUT = 120 +MAX_HISTORY = 20 # messages conservés dans le contexte + + +class LLMClient: + def __init__(self, base_url: str, model: str, temperature: float = 0.3, + system_prompt: str = ""): + self.base_url = base_url.rstrip("/") + self.model = model + self.temperature = temperature + self.system_prompt = system_prompt + self._history: list[dict] = [] + + def chat(self, user_message: str, extra_context: Optional[str] = None) -> str: + """Envoie un message et retourne la réponse du LLM.""" + messages = [] + + # System prompt enrichi avec contexte dynamique si fourni + system = self.system_prompt + if extra_context: + system = f"{system}\n\n[CONTEXTE ACTUEL]\n{extra_context}" + if system: + messages.append({"role": "system", "content": system}) + + # Historique tronqué + messages.extend(self._history[-MAX_HISTORY:]) + messages.append({"role": "user", "content": user_message}) + + try: + resp = requests.post( + f"{self.base_url}/api/chat", + json={ + "model": self.model, + "messages": messages, + "stream": False, + "options": {"temperature": self.temperature}, + }, + timeout=DEFAULT_TIMEOUT, + ) + resp.raise_for_status() + assistant_msg = resp.json()["message"]["content"] + + # Mise à jour de l'historique + self._history.append({"role": "user", "content": user_message}) + self._history.append({"role": "assistant", "content": assistant_msg}) + + return assistant_msg + + except requests.exceptions.Timeout: + logger.error("LLM timeout") + return "Erreur : le LLM n'a pas répondu dans les temps." + except Exception as e: + logger.error(f"Erreur LLM: {e}") + return f"Erreur LLM: {e}" + + def reset_history(self): + """Efface l'historique de conversation.""" + self._history.clear() + logger.info("Historique LLM effacé") + + def extract_skill_call(self, llm_response: str) -> Optional[tuple[str, str]]: + """ + Extrait un appel de skill depuis la réponse du LLM. + Format attendu : SKILL:nom_skill ARGS:arguments + Retourne (skill_name, args) ou None. + """ + for line in llm_response.splitlines(): + line = line.strip() + if line.startswith("SKILL:"): + parts = line.split(" ARGS:", 1) + skill_name = parts[0].replace("SKILL:", "").strip() + args = parts[1].strip() if len(parts) > 1 else "" + return skill_name, args + return None + + def extract_json_block(self, text: str) -> Optional[dict]: + """Extrait un bloc JSON depuis la réponse du LLM.""" + start = text.find("{") + end = text.rfind("}") + 1 + if start >= 0 and end > start: + try: + return json.loads(text[start:end]) + except json.JSONDecodeError: + pass + return None diff --git a/build/lib/agents_core/message_bus.py b/build/lib/agents_core/message_bus.py new file mode 100644 index 0000000..5517db0 --- /dev/null +++ b/build/lib/agents_core/message_bus.py @@ -0,0 +1,86 @@ +""" +Message envelope standard pour tout le système. +Tous les messages MQTT et XMPP utilisent ce format. +""" +import uuid +import json +from datetime import datetime, timezone +from typing import Optional, Any + + +class MessageType: + TASK = "task" # Tâche à exécuter + RESULT = "result" # Résultat d'une tâche + STATUS = "status" # Statut d'un agent + BROADCAST = "broadcast" # Message à tous les agents + DIRECT = "direct_message" # Message direct agent→agent + CAPABILITIES = "capabilities" # Déclaration des capacités + ALERT = "alert" # Alerte proactive d'un agent + COMMAND = "command" # Commande système (/pause, /resume...) + + +class Message: + def __init__( + self, + msg_type: str, + payload: Any, + sender: str, + recipient: Optional[str] = None, + correlation_id: Optional[str] = None, + reply_to: Optional[str] = None, + metadata: Optional[dict] = None, + ): + self.id = str(uuid.uuid4()) + self.type = msg_type + self.payload = payload + self.sender = sender + self.recipient = recipient + self.correlation_id = correlation_id or self.id + self.reply_to = reply_to + self.timestamp = datetime.now(timezone.utc).isoformat() + self.metadata = metadata or {} + + def to_dict(self) -> dict: + return { + "id": self.id, + "type": self.type, + "payload": self.payload, + "sender": self.sender, + "recipient": self.recipient, + "correlation_id": self.correlation_id, + "reply_to": self.reply_to, + "timestamp": self.timestamp, + "metadata": self.metadata, + } + + def to_json(self) -> str: + return json.dumps(self.to_dict(), ensure_ascii=False) + + @classmethod + def from_json(cls, data: str | bytes) -> "Message": + d = json.loads(data) + msg = cls.__new__(cls) + msg.id = d.get("id", str(uuid.uuid4())) + msg.type = d.get("type", MessageType.TASK) + msg.payload = d.get("payload", "") + msg.sender = d.get("sender", "unknown") + msg.recipient = d.get("recipient") + msg.correlation_id = d.get("correlation_id", msg.id) + msg.reply_to = d.get("reply_to") + msg.timestamp = d.get("timestamp", datetime.now(timezone.utc).isoformat()) + msg.metadata = d.get("metadata", {}) + return msg + + def make_reply(self, sender: str, result: Any, msg_type: str = MessageType.RESULT) -> "Message": + """Crée un message de réponse lié à ce message.""" + return Message( + msg_type=msg_type, + payload=result, + sender=sender, + recipient=self.sender, + correlation_id=self.correlation_id, + reply_to=self.reply_to, + ) + + def __repr__(self): + return f"" diff --git a/build/lib/agents_core/mqtt_client.py b/build/lib/agents_core/mqtt_client.py new file mode 100644 index 0000000..0d3bc38 --- /dev/null +++ b/build/lib/agents_core/mqtt_client.py @@ -0,0 +1,247 @@ +""" +Wrapper MQTT enrichi — API simple pour publish/subscribe/reply. +Gère la reconnexion automatique, le LWT, et les enveloppes de messages. +""" +import json +import logging +import threading +import time +from typing import Callable, Optional + +import paho.mqtt.client as mqtt + +from .message_bus import Message, MessageType + +logger = logging.getLogger(__name__) + + +class MQTTClient: + """ + Client MQTT avec API simplifiée. + + Topics schema: + agents/{id}/inbox → tâches entrantes + agents/{id}/status → online/offline (retained + LWT) + agents/{id}/capabilities → skills déclarés (retained) + agents/broadcast → message à tous les agents + agents/results/{corr_id} → résultats routés + """ + + def __init__(self, agent_id: str, broker_host: str, broker_port: int = 1883, + username: Optional[str] = None, password: Optional[str] = None, + tls: bool = False): + self.agent_id = agent_id + self.broker_host = broker_host + self.broker_port = broker_port + self._subscriptions: dict[str, Callable] = {} + self._connected = threading.Event() + self._lock = threading.Lock() + + self.client = mqtt.Client( + client_id=agent_id, + clean_session=False, + protocol=mqtt.MQTTv311, + ) + + if username: + self.client.username_pw_set(username, password) + + if tls: + self.client.tls_set() + + # LWT — marque l'agent offline si la connexion est perdue + lwt_payload = json.dumps({"agent_id": agent_id, "status": "offline"}) + self.client.will_set( + topic=self.topic_status(), + payload=lwt_payload, + qos=1, + retain=True, + ) + + self.client.on_connect = self._on_connect + self.client.on_disconnect = self._on_disconnect + self.client.on_message = self._on_message + + # ────────────────────────────────────────────── + # Topics helpers + # ────────────────────────────────────────────── + + def topic_inbox(self, agent_id: Optional[str] = None) -> str: + return f"agents/{agent_id or self.agent_id}/inbox" + + def topic_status(self, agent_id: Optional[str] = None) -> str: + return f"agents/{agent_id or self.agent_id}/status" + + def topic_capabilities(self, agent_id: Optional[str] = None) -> str: + return f"agents/{agent_id or self.agent_id}/capabilities" + + def topic_broadcast(self) -> str: + return "agents/broadcast" + + def topic_results(self, correlation_id: str) -> str: + return f"agents/results/{correlation_id}" + + # ────────────────────────────────────────────── + # Connexion + # ────────────────────────────────────────────── + + def connect(self): + """Connexion au broker avec reconnexion automatique.""" + self.client.connect_async(self.broker_host, self.broker_port, keepalive=60) + self.client.loop_start() + if not self._connected.wait(timeout=10): + raise ConnectionError(f"Impossible de se connecter au broker MQTT {self.broker_host}:{self.broker_port}") + logger.info(f"[{self.agent_id}] Connecté au broker MQTT {self.broker_host}") + + def disconnect(self): + self.publish_status("offline") + self.client.loop_stop() + self.client.disconnect() + + def _on_connect(self, client, userdata, flags, rc): + if rc == 0: + self._connected.set() + self.publish_status("online") + # Réabonnement automatique après reconnexion + with self._lock: + for topic in self._subscriptions: + client.subscribe(topic, qos=1) + logger.info(f"[{self.agent_id}] Reconnecté et réabonné à {len(self._subscriptions)} topics") + else: + logger.error(f"[{self.agent_id}] Échec connexion MQTT, code={rc}") + + def _on_disconnect(self, client, userdata, rc): + self._connected.clear() + if rc != 0: + logger.warning(f"[{self.agent_id}] Déconnecté du broker MQTT (rc={rc}), reconnexion automatique...") + + def _on_message(self, client, userdata, mqtt_msg): + topic = mqtt_msg.topic + with self._lock: + # Cherche le callback exact ou wildcard + callback = self._subscriptions.get(topic) + if callback is None: + for pattern, cb in self._subscriptions.items(): + if self._topic_matches(pattern, topic): + callback = cb + break + + if callback is None: + logger.debug(f"[{self.agent_id}] Message reçu sur {topic} sans callback") + return + + try: + msg = Message.from_json(mqtt_msg.payload) + callback(msg, topic) + except Exception as e: + # Payload non-JSON (ex: commande shell brute) + logger.debug(f"[{self.agent_id}] Payload non-JSON sur {topic}: {e}") + try: + callback(mqtt_msg.payload.decode(), topic) + except Exception as e2: + logger.error(f"[{self.agent_id}] Erreur callback sur {topic}: {e2}") + + @staticmethod + def _topic_matches(pattern: str, topic: str) -> bool: + """Vérifie si un topic correspond à un pattern MQTT avec wildcards.""" + p_parts = pattern.split("/") + t_parts = topic.split("/") + if "#" in p_parts: + idx = p_parts.index("#") + return p_parts[:idx] == t_parts[:idx] + if len(p_parts) != len(t_parts): + return False + return all(pp == tp or pp == "+" for pp, tp in zip(p_parts, t_parts)) + + # ────────────────────────────────────────────── + # API publique — Subscribe + # ────────────────────────────────────────────── + + def subscribe(self, topic: str, callback: Callable): + """S'abonner à un topic avec un callback(message, topic).""" + with self._lock: + self._subscriptions[topic] = callback + self.client.subscribe(topic, qos=1) + logger.debug(f"[{self.agent_id}] Abonné à {topic}") + + def subscribe_inbox(self, callback: Callable): + """S'abonner à sa propre inbox.""" + self.subscribe(self.topic_inbox(), callback) + + def subscribe_broadcast(self, callback: Callable): + """S'abonner aux messages broadcast.""" + self.subscribe(self.topic_broadcast(), callback) + + def subscribe_all_capabilities(self, callback: Callable): + """S'abonner aux déclarations de capacités de tous les agents.""" + self.subscribe("agents/+/capabilities", callback) + + def subscribe_all_status(self, callback: Callable): + """S'abonner aux statuts de tous les agents.""" + self.subscribe("agents/+/status", callback) + + # ────────────────────────────────────────────── + # API publique — Publish + # ────────────────────────────────────────────── + + def publish_raw(self, topic: str, payload: str | bytes | dict, retain: bool = False, qos: int = 1): + """Publication brute sur un topic.""" + if isinstance(payload, dict): + payload = json.dumps(payload, ensure_ascii=False) + self.client.publish(topic, payload, qos=qos, retain=retain) + + def send_to(self, recipient_id: str, payload: str, msg_type: str = MessageType.TASK, + correlation_id: Optional[str] = None, reply_to: Optional[str] = None) -> Message: + """Envoyer un message à un agent spécifique via son inbox.""" + msg = Message( + msg_type=msg_type, + payload=payload, + sender=self.agent_id, + recipient=recipient_id, + correlation_id=correlation_id, + reply_to=reply_to or self.topic_inbox(), + ) + self.publish_raw(self.topic_inbox(recipient_id), msg.to_json()) + logger.info(f"[{self.agent_id}] → {recipient_id}: {str(payload)[:80]}") + return msg + + def reply(self, original: Message, result: str, msg_type: str = MessageType.RESULT): + """Répondre à un message reçu (utilise reply_to ou inbox du sender).""" + reply_msg = original.make_reply(self.agent_id, result, msg_type) + target_topic = original.reply_to or self.topic_inbox(original.sender) + self.publish_raw(target_topic, reply_msg.to_json()) + logger.info(f"[{self.agent_id}] ↩ {original.sender}: {str(result)[:80]}") + return reply_msg + + def broadcast(self, payload: str, msg_type: str = MessageType.BROADCAST) -> Message: + """Envoyer un message à tous les agents.""" + msg = Message( + msg_type=msg_type, + payload=payload, + sender=self.agent_id, + ) + self.publish_raw(self.topic_broadcast(), msg.to_json()) + return msg + + def alert(self, message: str, severity: str = "warning"): + """Envoyer une alerte proactive à l'orchestrateur.""" + # Cherche le nexus dans les agents connus ou utilise un topic dédié + msg = Message( + msg_type=MessageType.ALERT, + payload=message, + sender=self.agent_id, + metadata={"severity": severity}, + ) + # On publie sur l'inbox de l'orchestrateur (nom configurable) + self.publish_raw("agents/nexus/inbox", msg.to_json()) + + def publish_status(self, status: str, extra: Optional[dict] = None): + """Publier le statut de l'agent (retained).""" + payload = {"agent_id": self.agent_id, "status": status} + if extra: + payload.update(extra) + self.publish_raw(self.topic_status(), payload, retain=True) + + def publish_capabilities(self, capabilities: dict): + """Publier les capacités de l'agent (retained).""" + self.publish_raw(self.topic_capabilities(), capabilities, retain=True) diff --git a/build/lib/agents_core/skill_loader.py b/build/lib/agents_core/skill_loader.py new file mode 100644 index 0000000..560781b --- /dev/null +++ b/build/lib/agents_core/skill_loader.py @@ -0,0 +1,99 @@ +""" +Système de chargement de skills (plugins). +Chaque skill est un module Python avec une fonction run(args, context) → str. +""" +import importlib.util +import logging +import os +from typing import Optional + +logger = logging.getLogger(__name__) + + +class Skill: + def __init__(self, name: str, description: str, usage: str, module): + self.name = name + self.description = description + self.usage = usage + self._module = module + + def run(self, args: str, context: "AgentContext") -> str: + try: + return self._module.run(args, context) + except Exception as e: + logger.error(f"[Skill:{self.name}] Erreur: {e}", exc_info=True) + return f"Erreur dans le skill '{self.name}': {e}" + + +class SkillLoader: + def __init__(self): + self._skills: dict[str, Skill] = {} + + def load_directory(self, skills_dir: str): + """Charge tous les skills d'un dossier.""" + if not os.path.isdir(skills_dir): + logger.warning(f"Dossier skills introuvable : {skills_dir}") + return + + for filename in sorted(os.listdir(skills_dir)): + if filename.startswith("_") or not filename.endswith(".py"): + continue + skill_name = filename[:-3] + skill_path = os.path.join(skills_dir, filename) + self._load_skill(skill_name, skill_path) + + logger.info(f"[SkillLoader] {len(self._skills)} skill(s) chargé(s) : {list(self._skills.keys())}") + + def _load_skill(self, name: str, path: str): + try: + spec = importlib.util.spec_from_file_location(name, path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Vérifie les attributs requis + if not hasattr(module, "run"): + logger.warning(f"[SkillLoader] {name} ignoré : pas de fonction run()") + return + + description = getattr(module, "DESCRIPTION", "Pas de description") + usage = getattr(module, "USAGE", f"SKILL:{name} ARGS:") + + self._skills[name] = Skill(name, description, usage, module) + logger.debug(f"[SkillLoader] Skill chargé : {name}") + + except Exception as e: + logger.error(f"[SkillLoader] Erreur chargement skill {name}: {e}") + + def get(self, name: str) -> Optional[Skill]: + return self._skills.get(name) + + def run(self, name: str, args: str, context) -> str: + skill = self.get(name) + if skill is None: + return f"Skill inconnu : '{name}'. Skills disponibles : {self.list_names()}" + return skill.run(args, context) + + def list_names(self) -> list[str]: + return list(self._skills.keys()) + + def capabilities_summary(self) -> list[dict]: + """Retourne la liste des skills pour la déclaration de capacités.""" + return [ + {"name": s.name, "description": s.description, "usage": s.usage} + for s in self._skills.values() + ] + + def system_prompt_section(self) -> str: + """Génère la section du system prompt décrivant les skills disponibles.""" + if not self._skills: + return "" + lines = ["## Skills disponibles\n"] + for s in self._skills.values(): + lines.append(f"- **{s.name}** : {s.description}") + lines.append(f" Usage : `{s.usage}`") + lines.append( + "\nPour utiliser un skill, réponds avec une ligne au format :\n" + "`SKILL: ARGS:`\n" + "Tu peux enchaîner plusieurs skills. Explique brièvement ce que tu fais." + ) + return "\n".join(lines) diff --git a/build/lib/agents_core/task_queue.py b/build/lib/agents_core/task_queue.py new file mode 100644 index 0000000..a141eff --- /dev/null +++ b/build/lib/agents_core/task_queue.py @@ -0,0 +1,183 @@ +""" +File d'attente de tâches persistante (SQLite). +Commune à tous les agents — FIFO avec pause/reprise. +""" +import sqlite3 +import threading +import logging +import time +from datetime import datetime, timezone +from typing import Optional, Callable + +logger = logging.getLogger(__name__) + + +class TaskStatus: + PENDING = "pending" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + FAILED = "failed" + SKIPPED = "skipped" + + +class Task: + def __init__(self, task_id: int, payload: str, correlation_id: str, + sender: str, reply_to: Optional[str], received_at: str): + self.id = task_id + self.payload = payload + self.correlation_id = correlation_id + self.sender = sender + self.reply_to = reply_to + self.received_at = received_at + + +class TaskQueue: + def __init__(self, db_path: str): + self.db_path = db_path + self._paused = False + self._running = False + self._worker_thread: Optional[threading.Thread] = None + self._lock = threading.Lock() + self._init_db() + + def _init_db(self): + with self._connect() as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + received_at TEXT NOT NULL, + started_at TEXT, + completed_at TEXT, + payload TEXT NOT NULL, + correlation_id TEXT NOT NULL, + sender TEXT NOT NULL, + reply_to TEXT, + status TEXT NOT NULL DEFAULT 'pending', + result TEXT, + duration_s REAL + ) + """) + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path, check_same_thread=False) + conn.row_factory = sqlite3.Row + return conn + + def enqueue(self, payload: str, correlation_id: str, sender: str, + reply_to: Optional[str] = None) -> int: + now = datetime.now(timezone.utc).isoformat() + with self._connect() as conn: + cur = conn.execute( + "INSERT INTO tasks (received_at, payload, correlation_id, sender, reply_to, status) " + "VALUES (?, ?, ?, ?, ?, ?)", + (now, payload, correlation_id, sender, reply_to, TaskStatus.PENDING) + ) + task_id = cur.lastrowid + logger.info(f"[TaskQueue] Tâche #{task_id} en file (sender={sender})") + return task_id + + def _next_task(self) -> Optional[Task]: + with self._connect() as conn: + row = conn.execute( + "SELECT * FROM tasks WHERE status = ? ORDER BY id LIMIT 1", + (TaskStatus.PENDING,) + ).fetchone() + if row is None: + return None + conn.execute( + "UPDATE tasks SET status = ?, started_at = ? WHERE id = ?", + (TaskStatus.IN_PROGRESS, datetime.now(timezone.utc).isoformat(), row["id"]) + ) + return Task( + task_id=row["id"], + payload=row["payload"], + correlation_id=row["correlation_id"], + sender=row["sender"], + reply_to=row["reply_to"], + received_at=row["received_at"], + ) + + def complete(self, task_id: int, result: str, success: bool = True): + now = datetime.now(timezone.utc).isoformat() + status = TaskStatus.COMPLETED if success else TaskStatus.FAILED + with self._connect() as conn: + row = conn.execute("SELECT started_at FROM tasks WHERE id = ?", (task_id,)).fetchone() + duration = None + if row and row["started_at"]: + try: + started = datetime.fromisoformat(row["started_at"]) + duration = (datetime.now(timezone.utc) - started).total_seconds() + except Exception: + pass + conn.execute( + "UPDATE tasks SET status = ?, completed_at = ?, result = ?, duration_s = ? WHERE id = ?", + (status, now, result[:4000], duration, task_id) + ) + logger.info(f"[TaskQueue] Tâche #{task_id} {'terminée' if success else 'échouée'} en {duration:.1f}s" if duration else f"[TaskQueue] Tâche #{task_id} {status}") + + def pause(self): + with self._lock: + self._paused = True + logger.info("[TaskQueue] En pause") + + def resume(self): + with self._lock: + self._paused = False + logger.info("[TaskQueue] Reprise") + + @property + def is_paused(self) -> bool: + with self._lock: + return self._paused + + def start_worker(self, handler: Callable[[Task], tuple[str, bool]]): + """ + Lance le worker en arrière-plan. + handler(task) → (result_str, success_bool) + """ + self._running = True + self._worker_thread = threading.Thread( + target=self._worker_loop, + args=(handler,), + daemon=True, + name="task-queue-worker", + ) + self._worker_thread.start() + + def stop_worker(self): + self._running = False + + def _worker_loop(self, handler: Callable): + while self._running: + if self.is_paused: + time.sleep(1) + continue + task = self._next_task() + if task is None: + time.sleep(0.5) + continue + try: + result, success = handler(task) + except Exception as e: + result, success = str(e), False + self.complete(task.id, result, success) + + def daily_stats(self) -> dict: + today = datetime.now(timezone.utc).date().isoformat() + with self._connect() as conn: + rows = conn.execute( + "SELECT status, duration_s FROM tasks WHERE DATE(received_at) = ?", (today,) + ).fetchall() + total = len(rows) + completed = sum(1 for r in rows if r["status"] == TaskStatus.COMPLETED) + failed = sum(1 for r in rows if r["status"] == TaskStatus.FAILED) + durations = [r["duration_s"] for r in rows if r["duration_s"] is not None] + avg_duration = sum(durations) / len(durations) if durations else 0 + return { + "date": today, + "total": total, + "completed": completed, + "failed": failed, + "pending": total - completed - failed, + "avg_duration_s": round(avg_duration, 2), + } diff --git a/build/lib/agents_core/xmpp_client.py b/build/lib/agents_core/xmpp_client.py new file mode 100644 index 0000000..3a7605b --- /dev/null +++ b/build/lib/agents_core/xmpp_client.py @@ -0,0 +1,229 @@ +""" +Client XMPP avec support OMEMO, MUC (groupe), messages directs, et multi-utilisateurs. +""" +import logging +import threading +from typing import Callable, Optional + +from slixmpp import ClientXMPP + +logger = logging.getLogger(__name__) + + +class XMPPClient: + """ + Client XMPP simplifié avec : + - Messages directs (1-to-1) + - Groupes MUC (Multi-User Chat) + - OMEMO (si slixmpp-omemo installé) + - Multi-utilisateurs : liste de JIDs autorisés à envoyer des commandes + - Callback unique pour les messages entrants + """ + + def __init__( + self, + jid: str, + password: str, + admin_jids: Optional[list[str]] = None, + muc_room: Optional[str] = None, + muc_nick: Optional[str] = None, + use_omemo: bool = False, + ): + self.jid = jid + self.password = password + self.muc_room = muc_room + self.muc_nick = muc_nick or jid.split("@")[0] + self.use_omemo = use_omemo + + # Normalise : accepte str ou list, stocke toujours un set de bare JIDs + if admin_jids is None: + self.admin_jids: set[str] = set() + elif isinstance(admin_jids, str): + self.admin_jids = {admin_jids.lower().split("/")[0]} if admin_jids else set() + else: + self.admin_jids = {j.lower().split("/")[0] for j in admin_jids if j} + + # Rétro-compat : admin_jid pointe vers le premier JID + self.admin_jid: Optional[str] = next(iter(self.admin_jids), None) + + self._message_callback: Optional[Callable] = None + self._client: Optional[_SlixClient] = None + self._thread: Optional[threading.Thread] = None + self._connected = threading.Event() + + # ────────────────────────────────────────────── + # API publique + # ────────────────────────────────────────────── + + def set_message_callback(self, callback: Callable[[str, str, bool], None]): + """ + Définit le callback pour les messages reçus. + callback(sender_jid, body, is_muc) + Seuls les messages de admin_jids sont transmis (+ MUC). + """ + self._message_callback = callback + + def is_authorized(self, jid: str) -> bool: + """Vérifie si un JID est autorisé à envoyer des commandes.""" + if not self.admin_jids: + return True # Aucun filtre configuré → tout le monde + bare = jid.lower().split("/")[0] + return bare in self.admin_jids + + def connect_async(self): + """Connexion XMPP dans un thread dédié.""" + self._client = _SlixClient( + jid=self.jid, + password=self.password, + muc_room=self.muc_room, + muc_nick=self.muc_nick, + use_omemo=self.use_omemo, + on_message=self._on_message, + on_connected=self._connected.set, + ) + self._thread = threading.Thread( + target=self._client.start, + daemon=True, + name="xmpp-client", + ) + self._thread.start() + threading.Thread(target=self._wait_and_log, daemon=True).start() + + def _wait_and_log(self): + if self._connected.wait(timeout=30): + logger.info(f"[XMPP] Connecté : {self.jid}") + if self.muc_room: + logger.info(f"[XMPP] Groupe rejoint : {self.muc_room}") + if self.admin_jids: + logger.info(f"[XMPP] Admins autorisés : {', '.join(sorted(self.admin_jids))}") + else: + logger.warning("[XMPP] Timeout connexion") + + def _on_message(self, sender: str, body: str, is_muc: bool): + """Filtre les messages : seuls les admins sont traités (sauf MUC).""" + if not is_muc and not self.is_authorized(sender): + logger.debug(f"[XMPP] Message ignoré (non autorisé) : {sender}") + return + if self._message_callback: + try: + self._message_callback(sender, body, is_muc) + except Exception as e: + logger.error(f"[XMPP] Erreur callback : {e}") + + def send_message(self, to: str, body: str, is_muc: bool = False): + """Envoie un message XMPP (direct ou MUC).""" + if self._client is None: + logger.warning("[XMPP] Client non initialisé") + return + self._client.send_xmpp_message(to, body, is_muc) + + def send_to_all_admins(self, body: str): + """Envoie un message à tous les admins.""" + for jid in self.admin_jids: + self.send_message(jid, body) + + def send_to_admin(self, body: str): + """Envoie au premier admin (rétro-compat).""" + if self.admin_jid: + self.send_message(self.admin_jid, body) + + def send_to_group(self, body: str): + """Envoie dans le groupe MUC.""" + if self.muc_room: + self.send_message(self.muc_room, body, is_muc=True) + + def add_admin(self, jid: str): + """Ajoute un JID autorisé à la volée.""" + bare = jid.lower().split("/")[0] + self.admin_jids.add(bare) + if not self.admin_jid: + self.admin_jid = bare + logger.info(f"[XMPP] Admin ajouté : {bare}") + + def remove_admin(self, jid: str): + """Retire un JID autorisé.""" + bare = jid.lower().split("/")[0] + self.admin_jids.discard(bare) + if self.admin_jid == bare: + self.admin_jid = next(iter(self.admin_jids), None) + logger.info(f"[XMPP] Admin retiré : {bare}") + + def disconnect(self): + if self._client: + try: + self._client.disconnect() + except Exception: + pass + + +class _SlixClient(ClientXMPP): + """Implémentation interne slixmpp.""" + + def __init__(self, jid, password, muc_room, muc_nick, + use_omemo, on_message, on_connected): + super().__init__(jid, password) + self._muc_room = muc_room + self._muc_nick = muc_nick + self._on_message_cb = on_message + self._on_connected_cb = on_connected + + self.register_plugin("xep_0030") # Service Discovery + self.register_plugin("xep_0045") # MUC + self.register_plugin("xep_0085") # Chat state + self.register_plugin("xep_0199") # Ping keepalive + + if use_omemo: + self._setup_omemo() + + self.add_event_handler("session_start", self._on_session_start) + self.add_event_handler("message", self._on_message) + self.add_event_handler("groupchat_message", self._on_muc_message) + + def _setup_omemo(self): + try: + self.register_plugin("xep_0384") + logger.info("[XMPP] OMEMO activé") + except Exception as e: + logger.warning(f"[XMPP] OMEMO non disponible : {e}") + + def start(self): + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self.loop = loop + self.connect() + loop.run_forever() + + async def _on_session_start(self, event): + self.send_presence() + await self.get_roster() + if self._muc_room: + self.plugin["xep_0045"].join_muc( + room=self._muc_room, + nick=self._muc_nick, + ) + if self._on_connected_cb: + self._on_connected_cb() + + def _on_message(self, msg): + if msg["type"] in ("chat", "normal") and msg["body"]: + body = msg["body"].strip() + if body: + self._on_message_cb(str(msg["from"]), body, is_muc=False) + + def _on_muc_message(self, msg): + if msg["mucnick"] == self._muc_nick: + return + if msg["body"]: + body = msg["body"].strip() + if body: + self._on_message_cb(str(msg["from"]), body, is_muc=True) + + def send_xmpp_message(self, to: str, body: str, is_muc: bool = False): + import functools + msg_type = "groupchat" if is_muc else "chat" + fn = functools.partial(self.send_message, mto=to, mbody=body, mtype=msg_type) + if hasattr(self, 'loop') and self.loop and self.loop.is_running(): + self.loop.call_soon_threadsafe(fn) + else: + fn()