Fix real-time agent status tracking and add status hooks

- Fix root cause: Message.from_json was parsing status/capabilities
  payloads (plain JSON dicts) as Message objects with empty payload,
  causing _on_status_update to silently fail. Now plain JSON payloads
  (no 'type'/'sender' keys) are passed as raw strings to callbacks.
- _on_status_update: ignore own status, call on_agent_status_change
  hook only when status actually changes (was_online != is_online)
- Add on_agent_status_change(agent_id, status) hook in BaseAgent
- Add on_xmpp_connected() hook in BaseAgent
- XMPPClient.connect_async() accepts on_ready callback

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 13:03:30 +00:00
parent 7ab395037d
commit d390d8a07a
13 changed files with 1735 additions and 13 deletions
+26 -2
View File
@@ -195,7 +195,7 @@ class BaseAgent(ABC):
# Connexion XMPP # Connexion XMPP
if self.xmpp: if self.xmpp:
self.xmpp.set_message_callback(self._on_xmpp_message) self.xmpp.set_message_callback(self._on_xmpp_message)
self.xmpp.connect_async() self.xmpp.connect_async(on_ready=self.on_xmpp_connected)
# Démarrage du worker de tâches # Démarrage du worker de tâches
self.queue.start_worker(self._execute_task) self.queue.start_worker(self._execute_task)
@@ -297,15 +297,35 @@ class BaseAgent(ABC):
return return
agent_id = data.get("agent_id") agent_id = data.get("agent_id")
status = data.get("status") status = data.get("status")
if agent_id and status: if not agent_id or not status:
return
# Ignorer nos propres messages de statut
if agent_id == self.agent_id:
return
with self._online_lock: with self._online_lock:
was_online = agent_id in self._online_agents
if status == "online": if status == "online":
self._online_agents.add(agent_id) self._online_agents.add(agent_id)
else: else:
self._online_agents.discard(agent_id) self._online_agents.discard(agent_id)
is_online = agent_id in self._online_agents
# Appel du hook seulement si le statut a changé
if was_online != is_online:
try:
self.on_agent_status_change(agent_id, status)
except Exception as e:
logger.debug(f"on_agent_status_change error: {e}")
except Exception: except Exception:
pass pass
def on_agent_status_change(self, agent_id: str, status: str):
"""
Hook appelé en temps réel quand un agent change de statut.
status = "online" | "offline"
À surcharger dans les sous-classes pour réagir aux changements.
"""
pass
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
# Traitement des tâches # Traitement des tâches
# ────────────────────────────────────────────── # ──────────────────────────────────────────────
@@ -487,6 +507,10 @@ class BaseAgent(ABC):
"""Hook appelé après le démarrage complet. Surcharger si besoin.""" """Hook appelé après le démarrage complet. Surcharger si besoin."""
pass pass
def on_xmpp_connected(self):
"""Hook appelé une fois la connexion XMPP établie. Surcharger si besoin."""
pass
def on_broadcast(self, msg: Message): def on_broadcast(self, msg: Message):
"""Hook appelé à la réception d'un broadcast. Surcharger si besoin.""" """Hook appelé à la réception d'un broadcast. Surcharger si besoin."""
pass pass
+12 -3
View File
@@ -131,15 +131,24 @@ class MQTTClient:
return return
try: try:
msg = Message.from_json(mqtt_msg.payload) raw_str = mqtt_msg.payload.decode()
# Tente de parser comme Message structuré seulement si
# le JSON contient les champs attendus (type ou sender)
parsed = json.loads(raw_str)
if isinstance(parsed, dict) and ("type" in parsed or "sender" in parsed):
msg = Message.from_json(raw_str)
callback(msg, topic) callback(msg, topic)
except Exception as e: else:
# Payload JSON simple (ex: statut, capacités) → passe la string brute
callback(raw_str, topic)
except (json.JSONDecodeError, UnicodeDecodeError):
# Payload non-JSON (ex: commande shell brute) # Payload non-JSON (ex: commande shell brute)
logger.debug(f"[{self.agent_id}] Payload non-JSON sur {topic}: {e}")
try: try:
callback(mqtt_msg.payload.decode(), topic) callback(mqtt_msg.payload.decode(), topic)
except Exception as e2: except Exception as e2:
logger.error(f"[{self.agent_id}] Erreur callback sur {topic}: {e2}") logger.error(f"[{self.agent_id}] Erreur callback sur {topic}: {e2}")
except Exception as e:
logger.error(f"[{self.agent_id}] Erreur callback sur {topic}: {e}")
@staticmethod @staticmethod
def _topic_matches(pattern: str, topic: str) -> bool: def _topic_matches(pattern: str, topic: str) -> bool:
+10 -2
View File
@@ -70,8 +70,11 @@ class XMPPClient:
bare = jid.lower().split("/")[0] bare = jid.lower().split("/")[0]
return bare in self.admin_jids return bare in self.admin_jids
def connect_async(self): def connect_async(self, on_ready: Optional[Callable] = None):
"""Connexion XMPP dans un thread dédié.""" """Connexion XMPP dans un thread dédié.
on_ready() est appelé une fois la connexion établie.
"""
self._on_ready_cb = on_ready
self._client = _SlixClient( self._client = _SlixClient(
jid=self.jid, jid=self.jid,
password=self.password, password=self.password,
@@ -96,6 +99,11 @@ class XMPPClient:
logger.info(f"[XMPP] Groupe rejoint : {self.muc_room}") logger.info(f"[XMPP] Groupe rejoint : {self.muc_room}")
if self.admin_jids: if self.admin_jids:
logger.info(f"[XMPP] Admins autorisés : {', '.join(sorted(self.admin_jids))}") logger.info(f"[XMPP] Admins autorisés : {', '.join(sorted(self.admin_jids))}")
if self._on_ready_cb:
try:
self._on_ready_cb()
except Exception as e:
logger.error(f"[XMPP] Erreur on_ready callback : {e}")
else: else:
logger.warning("[XMPP] Timeout connexion") logger.warning("[XMPP] Timeout connexion")
+25
View File
@@ -0,0 +1,25 @@
"""
agents_core — Librairie commune pour tous les agents du système.
"""
from .base_agent import BaseAgent, AgentContext
from .message_bus import Message, MessageType
from .mqtt_client import MQTTClient
from .xmpp_client import XMPPClient
from .llm_client import LLMClient
from .task_queue import TaskQueue, Task, TaskStatus
from .skill_loader import SkillLoader, Skill
from .capabilities import AgentCapabilities, CapabilitiesRegistry
from .command_parser import parse as parse_command, ParsedCommand, CommandType
__version__ = "2.0.0"
__all__ = [
"BaseAgent", "AgentContext",
"Message", "MessageType",
"MQTTClient",
"XMPPClient",
"LLMClient",
"TaskQueue", "Task", "TaskStatus",
"SkillLoader", "Skill",
"AgentCapabilities", "CapabilitiesRegistry",
"parse_command", "ParsedCommand", "CommandType",
]
+501
View File
@@ -0,0 +1,501 @@
"""
Classe de base pour tous les agents du système.
Hériter de BaseAgent et implémenter les méthodes abstraites.
Usage minimal :
class MyAgent(BaseAgent):
AGENT_TYPE = "debian"
DESCRIPTION = "Administration système Debian"
def get_skills_dir(self) -> str:
return os.path.join(os.path.dirname(__file__), "skills")
if __name__ == "__main__":
MyAgent().run()
"""
import json
import logging
import os
import signal
import sys
import threading
import time
from abc import ABC, abstractmethod
from typing import Optional
from .capabilities import AgentCapabilities, CapabilitiesRegistry
from .command_parser import ParsedCommand, parse as parse_command, CommandType, help_text
from .llm_client import LLMClient
from .message_bus import Message, MessageType
from .mqtt_client import MQTTClient
from .skill_loader import SkillLoader
from .task_queue import TaskQueue, Task
from .xmpp_client import XMPPClient
logger = logging.getLogger(__name__)
class AgentContext:
"""Contexte passé aux skills lors de leur exécution."""
def __init__(self, agent: "BaseAgent", current_task: Optional[Task] = None,
current_message: Optional[Message] = None):
self.agent = agent
self.task = current_task
self.message = current_message
@property
def mqtt(self) -> MQTTClient:
return self.agent.mqtt
@property
def xmpp(self) -> Optional[XMPPClient]:
return self.agent.xmpp
@property
def llm(self) -> LLMClient:
return self.agent.llm
@property
def registry(self) -> CapabilitiesRegistry:
return self.agent.registry
@property
def config(self) -> dict:
return self.agent.config
@property
def agent_id(self) -> str:
return self.agent.agent_id
class BaseAgent(ABC):
"""
Classe mère de tous les agents.
Fournit : MQTT, XMPP, LLM, TaskQueue, SkillLoader, CapabilitiesRegistry.
"""
# À surcharger dans chaque agent
AGENT_TYPE: str = "generic"
DESCRIPTION: str = "Agent générique"
DEFAULT_CONFIG_PATH: str = "config/config.json"
def __init__(self, config_path: Optional[str] = None):
self.config = self._load_config(config_path or self.DEFAULT_CONFIG_PATH)
self.agent_id: str = self.config["agent_id"]
logging.basicConfig(
level=logging.INFO,
format=f"%(asctime)s [{self.agent_id}] %(levelname)s %(message)s",
)
# Composants principaux
self.mqtt = self._setup_mqtt()
self.xmpp: Optional[XMPPClient] = self._setup_xmpp()
self.llm = self._setup_llm()
self.skills = SkillLoader()
self.queue = TaskQueue(self.config.get("queue_db", "queue.db"))
self.registry = CapabilitiesRegistry()
# Agents en ligne (mis à jour via MQTT)
self._online_agents: set[str] = set()
self._online_lock = threading.Lock()
self._running = False
# ──────────────────────────────────────────────
# Setup
# ──────────────────────────────────────────────
def _load_config(self, path: str) -> dict:
if not os.path.exists(path):
raise FileNotFoundError(f"Config introuvable : {path}")
with open(path) as f:
return json.load(f)
def _setup_mqtt(self) -> MQTTClient:
mc = self.config.get("mqtt", {})
return MQTTClient(
agent_id=self.config["agent_id"],
broker_host=mc.get("host", "localhost"),
broker_port=mc.get("port", 1883),
username=mc.get("username"),
password=mc.get("password"),
tls=mc.get("tls", False),
)
def _setup_xmpp(self) -> Optional[XMPPClient]:
xc = self.config.get("xmpp")
if not xc:
return None
# Supporte admin_jids (list) et admin_jid (str) pour compatibilité
admin_jids = xc.get("admin_jids") or []
if not admin_jids and xc.get("admin_jid"):
admin_jids = [xc["admin_jid"]]
return XMPPClient(
jid=xc["jid"],
password=xc["password"],
admin_jids=admin_jids,
muc_room=xc.get("muc_room"),
muc_nick=self.config["agent_id"],
use_omemo=xc.get("use_omemo", False),
)
def _setup_llm(self) -> LLMClient:
lc = self.config.get("llm", {})
return LLMClient(
base_url=lc.get("base_url", "http://localhost:11434"),
model=lc.get("model", "mistral"),
temperature=lc.get("temperature", 0.3),
system_prompt=self._load_system_prompt(),
)
def _load_system_prompt(self) -> str:
path = self.config.get("system_prompt", "config/system_prompt.txt")
if os.path.exists(path):
with open(path) as f:
return f.read()
return self._default_system_prompt()
def _default_system_prompt(self) -> str:
return (
f"Tu es {self.agent_id}, un agent IA de type '{self.AGENT_TYPE}'.\n"
f"Description : {self.DESCRIPTION}\n"
f"Tu communiques via XMPP et MQTT. "
f"Tu dois répondre de façon concise et précise.\n"
f"Pour exécuter un skill, utilise : SKILL:<nom> ARGS:<arguments>\n"
)
# ──────────────────────────────────────────────
# Démarrage
# ──────────────────────────────────────────────
def run(self):
"""Point d'entrée principal de l'agent."""
logger.info(f"Démarrage de {self.agent_id} ({self.AGENT_TYPE})")
# Chargement des skills
skills_dir = self.get_skills_dir()
if skills_dir:
self.skills.load_directory(skills_dir)
# Mise à jour du system prompt avec les skills
if self.skills.list_names():
extra = self.skills.system_prompt_section()
self.llm.system_prompt += f"\n\n{extra}"
# Connexion MQTT
self.mqtt.connect()
self._setup_mqtt_subscriptions()
# Publication des capacités
self._publish_capabilities()
# Connexion XMPP
if self.xmpp:
self.xmpp.set_message_callback(self._on_xmpp_message)
self.xmpp.connect_async()
# Démarrage du worker de tâches
self.queue.start_worker(self._execute_task)
# Hook de démarrage custom
self.on_start()
# Gestion des signaux
self._running = True
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
logger.info(f"{self.agent_id} prêt.")
try:
while self._running:
time.sleep(1)
except KeyboardInterrupt:
self._shutdown()
def _shutdown(self, *args):
logger.info(f"Arrêt de {self.agent_id}...")
self._running = False
self.queue.stop_worker()
self.mqtt.disconnect()
if self.xmpp:
self.xmpp.disconnect()
sys.exit(0)
# ──────────────────────────────────────────────
# MQTT — souscriptions
# ──────────────────────────────────────────────
def _setup_mqtt_subscriptions(self):
self.mqtt.subscribe_inbox(self._on_mqtt_task)
self.mqtt.subscribe_broadcast(self._on_mqtt_broadcast)
self.mqtt.subscribe_all_capabilities(self._on_capabilities_update)
self.mqtt.subscribe_all_status(self._on_status_update)
# Souscriptions custom de l'agent
self.setup_extra_subscriptions()
def _on_mqtt_task(self, msg: Message | str, topic: str):
"""Réception d'une tâche via MQTT inbox."""
if isinstance(msg, str):
# Message brut non structuré
task_id = self.queue.enqueue(
payload=msg,
correlation_id="raw",
sender="unknown",
)
return
if msg.type == MessageType.COMMAND:
self._handle_system_command(msg.payload, source_msg=msg)
return
task_id = self.queue.enqueue(
payload=msg.payload,
correlation_id=msg.correlation_id,
sender=msg.sender,
reply_to=msg.reply_to,
)
logger.info(f"Tâche #{task_id} reçue de {msg.sender}")
def _on_mqtt_broadcast(self, msg: Message | str, topic: str):
"""Réception d'un broadcast — comportement par défaut : ignorer ou logger."""
if isinstance(msg, Message):
logger.info(f"Broadcast reçu de {msg.sender}: {str(msg.payload)[:80]}")
self.on_broadcast(msg)
def _on_capabilities_update(self, msg: Message | str, topic: str):
"""Mise à jour du registre de capacités d'un autre agent."""
try:
raw = msg if isinstance(msg, (str, bytes)) else msg.payload
if isinstance(raw, str):
caps = self.registry.update_from_json(raw)
elif isinstance(raw, dict):
from .capabilities import AgentCapabilities
import json as _json
caps = self.registry.update_from_json(_json.dumps(raw))
else:
caps = None
if caps and caps.agent_id != self.agent_id:
logger.debug(f"Capacités reçues : {caps.agent_id}")
# Mise à jour du context LLM
self._refresh_llm_context()
except Exception as e:
logger.debug(f"Erreur parsing capabilities: {e}")
def _on_status_update(self, msg: Message | str, topic: str):
"""Mise à jour du statut d'un agent."""
try:
raw = msg if isinstance(msg, (str, bytes)) else msg.payload
if isinstance(raw, str):
import json as _json
data = _json.loads(raw)
elif isinstance(raw, dict):
data = raw
else:
return
agent_id = data.get("agent_id")
status = data.get("status")
if agent_id and status:
with self._online_lock:
if status == "online":
self._online_agents.add(agent_id)
else:
self._online_agents.discard(agent_id)
except Exception:
pass
# ──────────────────────────────────────────────
# Traitement des tâches
# ──────────────────────────────────────────────
def _execute_task(self, task: Task) -> tuple[str, bool]:
"""Exécute une tâche via le LLM + skills. Retourne (résultat, succès)."""
context = AgentContext(self, current_task=task)
# Enrichir le LLM avec le contexte actuel
extra_ctx = self.registry.summary_for_llm(self._online_agents)
result = self._llm_loop(task.payload, context, extra_ctx)
# Renvoyer le résultat à l'expéditeur
if task.sender and task.sender != "unknown":
import uuid
dummy_msg = Message(
msg_type=MessageType.TASK,
payload=task.payload,
sender=task.sender,
correlation_id=task.correlation_id,
reply_to=task.reply_to,
)
dummy_msg.id = task.correlation_id
self.mqtt.reply(dummy_msg, result)
return result, True
def _llm_loop(self, prompt: str, context: AgentContext,
extra_ctx: Optional[str] = None, max_steps: int = 10) -> str:
"""Boucle LLM avec exécution de skills."""
response = self.llm.chat(prompt, extra_context=extra_ctx)
for _ in range(max_steps):
skill_call = self.llm.extract_skill_call(response)
if not skill_call:
break
skill_name, args = skill_call
skill_result = self.skills.run(skill_name, args, context)
logger.info(f"[{self.agent_id}] Skill {skill_name}{str(skill_result)[:80]}")
response = self.llm.chat(
f"Résultat du skill '{skill_name}':\n{skill_result}\n\nContinue ou donne ta réponse finale."
)
return response
# ──────────────────────────────────────────────
# XMPP
# ──────────────────────────────────────────────
def _on_xmpp_message(self, sender: str, body: str, is_muc: bool = False):
"""Traitement des messages XMPP entrants."""
cmd = parse_command(body)
context = AgentContext(self)
if cmd.type == CommandType.SYSTEM:
reply = self._handle_system_command(f"/{cmd.command} {cmd.args}", raw_cmd=cmd)
if reply and self.xmpp:
self.xmpp.send_message(sender, reply)
return
if cmd.type == CommandType.DIRECT:
# @agent_name message → router via MQTT
reply = self._route_direct_command(cmd)
if reply and self.xmpp:
self.xmpp.send_message(sender, reply)
return
if cmd.type == CommandType.BROADCAST:
msg = self.mqtt.broadcast(cmd.args or "")
if self.xmpp:
self.xmpp.send_message(sender, f"Broadcast envoyé à tous les agents.")
return
# Mode naturel → LLM
extra_ctx = self.registry.summary_for_llm(self._online_agents)
response = self._llm_loop(body, context, extra_ctx)
if self.xmpp:
self.xmpp.send_message(sender, response)
def _route_direct_command(self, cmd: ParsedCommand) -> str:
"""Route un @agent commande vers l'agent cible via MQTT."""
target = cmd.target
message = cmd.args or ""
caps = self.registry.get(target)
if caps is None:
return f"Agent '{target}' inconnu. Agents connus : {[a.agent_id for a in self.registry.all_agents()]}"
self.mqtt.send_to(target, message)
return f"Message envoyé à {target}."
# ──────────────────────────────────────────────
# Commandes système
# ──────────────────────────────────────────────
def _handle_system_command(self, text: str, source_msg: Optional[Message] = None,
raw_cmd: Optional[ParsedCommand] = None) -> Optional[str]:
"""Gère les commandes /xxx."""
if raw_cmd is None:
raw_cmd = parse_command(text)
cmd = raw_cmd.command
args = (raw_cmd.args or "").strip()
if cmd == "help":
return help_text()
if cmd == "pause":
self.queue.pause()
return f"[{self.agent_id}] En pause."
if cmd == "resume":
self.queue.resume()
return f"[{self.agent_id}] Reprise."
if cmd == "reset":
self.llm.reset_history()
return f"[{self.agent_id}] Historique effacé."
if cmd == "status":
stats = self.queue.daily_stats()
paused = "OUI" if self.queue.is_paused else "NON"
return (
f"[{self.agent_id}] Statut\n"
f" En pause : {paused}\n"
f" Tâches aujourd'hui : {stats['total']} "
f"(OK:{stats['completed']} ERR:{stats['failed']} ATT:{stats['pending']})\n"
f" Durée moyenne : {stats['avg_duration_s']}s"
)
if cmd == "agents":
with self._online_lock:
online = list(self._online_agents)
all_agents = [a.agent_id for a in self.registry.all_agents()]
return f"Agents en ligne : {online}\nAgents connus : {all_agents}"
# Commandes custom de l'agent
return self.handle_custom_command(cmd, args, source_msg)
# ──────────────────────────────────────────────
# Capacités
# ──────────────────────────────────────────────
def _publish_capabilities(self):
"""Publie les capacités de cet agent (retained)."""
xmpp_cfg = self.config.get("xmpp", {})
caps = AgentCapabilities(
agent_id=self.agent_id,
agent_type=self.AGENT_TYPE,
description=self.DESCRIPTION,
skills=self.skills.capabilities_summary(),
xmpp_jid=xmpp_cfg.get("jid"),
xmpp_muc=xmpp_cfg.get("muc_room"),
mqtt_inbox=self.mqtt.topic_inbox(),
can_send_xmpp=self.xmpp is not None,
can_send_mqtt=True,
work_hours=self.config.get("work_hours", "00:00-23:59"),
)
self.mqtt.publish_capabilities(caps.to_dict())
# S'enregistre aussi dans son propre registre
self.registry.update(caps)
logger.info(f"Capacités publiées : {len(caps.skills)} skill(s)")
def _refresh_llm_context(self):
"""Met à jour le context LLM avec les agents connus."""
# Injecté dynamiquement à chaque appel LLM via extra_context
pass
# ──────────────────────────────────────────────
# Méthodes à implémenter / surcharger
# ──────────────────────────────────────────────
@abstractmethod
def get_skills_dir(self) -> Optional[str]:
"""Retourne le chemin vers le dossier skills de cet agent."""
...
def on_start(self):
"""Hook appelé après le démarrage complet. Surcharger si besoin."""
pass
def on_broadcast(self, msg: Message):
"""Hook appelé à la réception d'un broadcast. Surcharger si besoin."""
pass
def setup_extra_subscriptions(self):
"""Souscriptions MQTT supplémentaires. Surcharger si besoin."""
pass
def handle_custom_command(self, cmd: str, args: str,
source_msg: Optional[Message] = None) -> Optional[str]:
"""Commandes /xxx non reconnues par la base. Surcharger si besoin."""
return f"Commande inconnue : /{cmd}. Tape /help pour l'aide."
+108
View File
@@ -0,0 +1,108 @@
"""
Gestion des capacités — chaque agent sait ce qu'il peut faire
et connaît les autres agents via les messages MQTT retained.
"""
import json
import logging
import threading
from dataclasses import dataclass, field, asdict
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class AgentCapabilities:
agent_id: str
agent_type: str # nexus | debian | ansible | deploy | custom
description: str
skills: list[dict] # [{"name": ..., "description": ..., "usage": ...}]
xmpp_jid: Optional[str] = None
xmpp_muc: Optional[str] = None
mqtt_inbox: str = ""
can_send_xmpp: bool = False
can_send_mqtt: bool = True
work_hours: str = "00:00-23:59"
version: str = "2.0"
def to_dict(self) -> dict:
return asdict(self)
def to_json(self) -> str:
return json.dumps(self.to_dict(), ensure_ascii=False)
@classmethod
def from_json(cls, data: str | bytes) -> "AgentCapabilities":
d = json.loads(data)
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
def summary_for_llm(self) -> str:
"""Résumé compact pour injection dans le system prompt."""
skill_names = [s["name"] for s in self.skills]
xmpp = f", XMPP: {self.xmpp_jid}" if self.xmpp_jid else ""
return (
f" [{self.agent_id}] ({self.agent_type}) — {self.description}\n"
f" Skills: {', '.join(skill_names) or 'aucun'}\n"
f" Inbox MQTT: {self.mqtt_inbox}{xmpp}\n"
f" Horaires: {self.work_hours}"
)
class CapabilitiesRegistry:
"""
Registre local des capacités de tous les agents connus.
Mis à jour en temps réel via les messages MQTT retained.
"""
def __init__(self):
self._agents: dict[str, AgentCapabilities] = {}
self._lock = threading.Lock()
def update(self, caps: AgentCapabilities):
with self._lock:
self._agents[caps.agent_id] = caps
logger.debug(f"[Registry] Capacités mises à jour pour {caps.agent_id}")
def update_from_json(self, data: str | bytes) -> Optional[AgentCapabilities]:
if not data or (isinstance(data, (str, bytes)) and not data.strip()):
return None # Empty retained message — ignore silently
try:
caps = AgentCapabilities.from_json(data)
self.update(caps)
return caps
except Exception as e:
logger.error(f"[Registry] Erreur parsing capacités: {e}")
return None
def get(self, agent_id: str) -> Optional[AgentCapabilities]:
with self._lock:
return self._agents.get(agent_id)
def all_agents(self) -> list[AgentCapabilities]:
with self._lock:
return list(self._agents.values())
def online_agents(self, online_ids: set[str]) -> list[AgentCapabilities]:
with self._lock:
return [a for a in self._agents.values() if a.agent_id in online_ids]
def summary_for_llm(self, online_ids: Optional[set[str]] = None) -> str:
"""Génère la section du system prompt listant les agents connus."""
with self._lock:
agents = self._agents.values()
if not agents:
return "Aucun agent enregistré."
lines = ["## Agents disponibles\n"]
for a in agents:
status = ""
if online_ids is not None:
status = " [EN LIGNE]" if a.agent_id in online_ids else " [HORS LIGNE]"
lines.append(f"{a.summary_for_llm()}{status}")
return "\n".join(lines)
def find_capable_agent(self, skill_name: str) -> Optional[AgentCapabilities]:
"""Trouve un agent capable d'exécuter un skill donné."""
with self._lock:
for agent in self._agents.values():
if any(s["name"] == skill_name for s in agent.skills):
return agent
return None
+108
View File
@@ -0,0 +1,108 @@
"""
Parseur de commandes — supporte deux modes :
- Mode commande rapide : @agent message ou /commande [args]
- Mode naturel : tout le reste → LLM
Syntaxe :
@agent_name <message> → message direct à l'agent nommé
@all <message> → broadcast à tous les agents
/pause [agent] → mettre en pause un agent (ou tous)
/resume [agent] → reprendre un agent (ou tous)
/status [agent] → statut d'un agent
/reset [agent] → effacer l'historique LLM
/schedule ... → gérer les tâches planifiées
/help → aide
"""
from dataclasses import dataclass
from typing import Optional
class CommandType:
NATURAL = "natural" # Traitement par le LLM
DIRECT = "direct" # @agent message
BROADCAST = "broadcast" # @all message
SYSTEM = "system" # /commande
@dataclass
class ParsedCommand:
type: str
target: Optional[str] # agent_id pour DIRECT, None pour BROADCAST/SYSTEM/NATURAL
command: Optional[str] # nom de la commande système
args: Optional[str] # arguments
raw: str # texte original
def parse(text: str) -> ParsedCommand:
"""Parse un message entrant et retourne sa nature."""
text = text.strip()
# ── Mode commande directe : @agent_name message
if text.startswith("@"):
parts = text[1:].split(None, 1)
target = parts[0].lower()
message = parts[1] if len(parts) > 1 else ""
if target == "all":
return ParsedCommand(
type=CommandType.BROADCAST,
target=None,
command=None,
args=message,
raw=text,
)
return ParsedCommand(
type=CommandType.DIRECT,
target=target,
command=None,
args=message,
raw=text,
)
# ── Mode commande système : /commande [args]
if text.startswith("/"):
parts = text[1:].split(None, 1)
command = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
return ParsedCommand(
type=CommandType.SYSTEM,
target=None,
command=command,
args=args,
raw=text,
)
# ── Mode naturel — tout le reste
return ParsedCommand(
type=CommandType.NATURAL,
target=None,
command=None,
args=text,
raw=text,
)
# Commandes système reconnues et leur aide
SYSTEM_COMMANDS_HELP = {
"pause": "/pause [agent] — Mettre en pause un agent ou tous",
"resume": "/resume [agent] — Reprendre un agent ou tous",
"status": "/status [agent] — Voir le statut des agents",
"reset": "/reset [agent] — Effacer l'historique LLM",
"schedule": "/schedule ... — Gérer les tâches planifiées",
"agents": "/agents — Lister les agents en ligne",
"help": "/help — Afficher cette aide",
}
def help_text() -> str:
lines = [
"── Aide commandes ──────────────────",
"Mode direct :",
" @<agent> <message> → Envoyer un message à un agent",
" @all <message> → Broadcast à tous les agents",
"",
"Mode système :",
]
lines.extend(f" {v}" for v in SYSTEM_COMMANDS_HELP.values())
lines.append("")
lines.append("Mode naturel : écris simplement ce que tu veux faire.")
return "\n".join(lines)
+95
View File
@@ -0,0 +1,95 @@
"""
Wrapper Ollama — interface unifiée pour le LLM local.
"""
import json
import logging
import requests
from typing import Optional
logger = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 120
MAX_HISTORY = 20 # messages conservés dans le contexte
class LLMClient:
def __init__(self, base_url: str, model: str, temperature: float = 0.3,
system_prompt: str = ""):
self.base_url = base_url.rstrip("/")
self.model = model
self.temperature = temperature
self.system_prompt = system_prompt
self._history: list[dict] = []
def chat(self, user_message: str, extra_context: Optional[str] = None) -> str:
"""Envoie un message et retourne la réponse du LLM."""
messages = []
# System prompt enrichi avec contexte dynamique si fourni
system = self.system_prompt
if extra_context:
system = f"{system}\n\n[CONTEXTE ACTUEL]\n{extra_context}"
if system:
messages.append({"role": "system", "content": system})
# Historique tronqué
messages.extend(self._history[-MAX_HISTORY:])
messages.append({"role": "user", "content": user_message})
try:
resp = requests.post(
f"{self.base_url}/api/chat",
json={
"model": self.model,
"messages": messages,
"stream": False,
"options": {"temperature": self.temperature},
},
timeout=DEFAULT_TIMEOUT,
)
resp.raise_for_status()
assistant_msg = resp.json()["message"]["content"]
# Mise à jour de l'historique
self._history.append({"role": "user", "content": user_message})
self._history.append({"role": "assistant", "content": assistant_msg})
return assistant_msg
except requests.exceptions.Timeout:
logger.error("LLM timeout")
return "Erreur : le LLM n'a pas répondu dans les temps."
except Exception as e:
logger.error(f"Erreur LLM: {e}")
return f"Erreur LLM: {e}"
def reset_history(self):
"""Efface l'historique de conversation."""
self._history.clear()
logger.info("Historique LLM effacé")
def extract_skill_call(self, llm_response: str) -> Optional[tuple[str, str]]:
"""
Extrait un appel de skill depuis la réponse du LLM.
Format attendu : SKILL:nom_skill ARGS:arguments
Retourne (skill_name, args) ou None.
"""
for line in llm_response.splitlines():
line = line.strip()
if line.startswith("SKILL:"):
parts = line.split(" ARGS:", 1)
skill_name = parts[0].replace("SKILL:", "").strip()
args = parts[1].strip() if len(parts) > 1 else ""
return skill_name, args
return None
def extract_json_block(self, text: str) -> Optional[dict]:
"""Extrait un bloc JSON depuis la réponse du LLM."""
start = text.find("{")
end = text.rfind("}") + 1
if start >= 0 and end > start:
try:
return json.loads(text[start:end])
except json.JSONDecodeError:
pass
return None
+86
View File
@@ -0,0 +1,86 @@
"""
Message envelope standard pour tout le système.
Tous les messages MQTT et XMPP utilisent ce format.
"""
import uuid
import json
from datetime import datetime, timezone
from typing import Optional, Any
class MessageType:
TASK = "task" # Tâche à exécuter
RESULT = "result" # Résultat d'une tâche
STATUS = "status" # Statut d'un agent
BROADCAST = "broadcast" # Message à tous les agents
DIRECT = "direct_message" # Message direct agent→agent
CAPABILITIES = "capabilities" # Déclaration des capacités
ALERT = "alert" # Alerte proactive d'un agent
COMMAND = "command" # Commande système (/pause, /resume...)
class Message:
def __init__(
self,
msg_type: str,
payload: Any,
sender: str,
recipient: Optional[str] = None,
correlation_id: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[dict] = None,
):
self.id = str(uuid.uuid4())
self.type = msg_type
self.payload = payload
self.sender = sender
self.recipient = recipient
self.correlation_id = correlation_id or self.id
self.reply_to = reply_to
self.timestamp = datetime.now(timezone.utc).isoformat()
self.metadata = metadata or {}
def to_dict(self) -> dict:
return {
"id": self.id,
"type": self.type,
"payload": self.payload,
"sender": self.sender,
"recipient": self.recipient,
"correlation_id": self.correlation_id,
"reply_to": self.reply_to,
"timestamp": self.timestamp,
"metadata": self.metadata,
}
def to_json(self) -> str:
return json.dumps(self.to_dict(), ensure_ascii=False)
@classmethod
def from_json(cls, data: str | bytes) -> "Message":
d = json.loads(data)
msg = cls.__new__(cls)
msg.id = d.get("id", str(uuid.uuid4()))
msg.type = d.get("type", MessageType.TASK)
msg.payload = d.get("payload", "")
msg.sender = d.get("sender", "unknown")
msg.recipient = d.get("recipient")
msg.correlation_id = d.get("correlation_id", msg.id)
msg.reply_to = d.get("reply_to")
msg.timestamp = d.get("timestamp", datetime.now(timezone.utc).isoformat())
msg.metadata = d.get("metadata", {})
return msg
def make_reply(self, sender: str, result: Any, msg_type: str = MessageType.RESULT) -> "Message":
"""Crée un message de réponse lié à ce message."""
return Message(
msg_type=msg_type,
payload=result,
sender=sender,
recipient=self.sender,
correlation_id=self.correlation_id,
reply_to=self.reply_to,
)
def __repr__(self):
return f"<Message type={self.type} from={self.sender} to={self.recipient}>"
+247
View File
@@ -0,0 +1,247 @@
"""
Wrapper MQTT enrichi — API simple pour publish/subscribe/reply.
Gère la reconnexion automatique, le LWT, et les enveloppes de messages.
"""
import json
import logging
import threading
import time
from typing import Callable, Optional
import paho.mqtt.client as mqtt
from .message_bus import Message, MessageType
logger = logging.getLogger(__name__)
class MQTTClient:
"""
Client MQTT avec API simplifiée.
Topics schema:
agents/{id}/inbox → tâches entrantes
agents/{id}/status → online/offline (retained + LWT)
agents/{id}/capabilities → skills déclarés (retained)
agents/broadcast → message à tous les agents
agents/results/{corr_id} → résultats routés
"""
def __init__(self, agent_id: str, broker_host: str, broker_port: int = 1883,
username: Optional[str] = None, password: Optional[str] = None,
tls: bool = False):
self.agent_id = agent_id
self.broker_host = broker_host
self.broker_port = broker_port
self._subscriptions: dict[str, Callable] = {}
self._connected = threading.Event()
self._lock = threading.Lock()
self.client = mqtt.Client(
client_id=agent_id,
clean_session=False,
protocol=mqtt.MQTTv311,
)
if username:
self.client.username_pw_set(username, password)
if tls:
self.client.tls_set()
# LWT — marque l'agent offline si la connexion est perdue
lwt_payload = json.dumps({"agent_id": agent_id, "status": "offline"})
self.client.will_set(
topic=self.topic_status(),
payload=lwt_payload,
qos=1,
retain=True,
)
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
# ──────────────────────────────────────────────
# Topics helpers
# ──────────────────────────────────────────────
def topic_inbox(self, agent_id: Optional[str] = None) -> str:
return f"agents/{agent_id or self.agent_id}/inbox"
def topic_status(self, agent_id: Optional[str] = None) -> str:
return f"agents/{agent_id or self.agent_id}/status"
def topic_capabilities(self, agent_id: Optional[str] = None) -> str:
return f"agents/{agent_id or self.agent_id}/capabilities"
def topic_broadcast(self) -> str:
return "agents/broadcast"
def topic_results(self, correlation_id: str) -> str:
return f"agents/results/{correlation_id}"
# ──────────────────────────────────────────────
# Connexion
# ──────────────────────────────────────────────
def connect(self):
"""Connexion au broker avec reconnexion automatique."""
self.client.connect_async(self.broker_host, self.broker_port, keepalive=60)
self.client.loop_start()
if not self._connected.wait(timeout=10):
raise ConnectionError(f"Impossible de se connecter au broker MQTT {self.broker_host}:{self.broker_port}")
logger.info(f"[{self.agent_id}] Connecté au broker MQTT {self.broker_host}")
def disconnect(self):
self.publish_status("offline")
self.client.loop_stop()
self.client.disconnect()
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self._connected.set()
self.publish_status("online")
# Réabonnement automatique après reconnexion
with self._lock:
for topic in self._subscriptions:
client.subscribe(topic, qos=1)
logger.info(f"[{self.agent_id}] Reconnecté et réabonné à {len(self._subscriptions)} topics")
else:
logger.error(f"[{self.agent_id}] Échec connexion MQTT, code={rc}")
def _on_disconnect(self, client, userdata, rc):
self._connected.clear()
if rc != 0:
logger.warning(f"[{self.agent_id}] Déconnecté du broker MQTT (rc={rc}), reconnexion automatique...")
def _on_message(self, client, userdata, mqtt_msg):
topic = mqtt_msg.topic
with self._lock:
# Cherche le callback exact ou wildcard
callback = self._subscriptions.get(topic)
if callback is None:
for pattern, cb in self._subscriptions.items():
if self._topic_matches(pattern, topic):
callback = cb
break
if callback is None:
logger.debug(f"[{self.agent_id}] Message reçu sur {topic} sans callback")
return
try:
msg = Message.from_json(mqtt_msg.payload)
callback(msg, topic)
except Exception as e:
# Payload non-JSON (ex: commande shell brute)
logger.debug(f"[{self.agent_id}] Payload non-JSON sur {topic}: {e}")
try:
callback(mqtt_msg.payload.decode(), topic)
except Exception as e2:
logger.error(f"[{self.agent_id}] Erreur callback sur {topic}: {e2}")
@staticmethod
def _topic_matches(pattern: str, topic: str) -> bool:
"""Vérifie si un topic correspond à un pattern MQTT avec wildcards."""
p_parts = pattern.split("/")
t_parts = topic.split("/")
if "#" in p_parts:
idx = p_parts.index("#")
return p_parts[:idx] == t_parts[:idx]
if len(p_parts) != len(t_parts):
return False
return all(pp == tp or pp == "+" for pp, tp in zip(p_parts, t_parts))
# ──────────────────────────────────────────────
# API publique — Subscribe
# ──────────────────────────────────────────────
def subscribe(self, topic: str, callback: Callable):
"""S'abonner à un topic avec un callback(message, topic)."""
with self._lock:
self._subscriptions[topic] = callback
self.client.subscribe(topic, qos=1)
logger.debug(f"[{self.agent_id}] Abonné à {topic}")
def subscribe_inbox(self, callback: Callable):
"""S'abonner à sa propre inbox."""
self.subscribe(self.topic_inbox(), callback)
def subscribe_broadcast(self, callback: Callable):
"""S'abonner aux messages broadcast."""
self.subscribe(self.topic_broadcast(), callback)
def subscribe_all_capabilities(self, callback: Callable):
"""S'abonner aux déclarations de capacités de tous les agents."""
self.subscribe("agents/+/capabilities", callback)
def subscribe_all_status(self, callback: Callable):
"""S'abonner aux statuts de tous les agents."""
self.subscribe("agents/+/status", callback)
# ──────────────────────────────────────────────
# API publique — Publish
# ──────────────────────────────────────────────
def publish_raw(self, topic: str, payload: str | bytes | dict, retain: bool = False, qos: int = 1):
"""Publication brute sur un topic."""
if isinstance(payload, dict):
payload = json.dumps(payload, ensure_ascii=False)
self.client.publish(topic, payload, qos=qos, retain=retain)
def send_to(self, recipient_id: str, payload: str, msg_type: str = MessageType.TASK,
correlation_id: Optional[str] = None, reply_to: Optional[str] = None) -> Message:
"""Envoyer un message à un agent spécifique via son inbox."""
msg = Message(
msg_type=msg_type,
payload=payload,
sender=self.agent_id,
recipient=recipient_id,
correlation_id=correlation_id,
reply_to=reply_to or self.topic_inbox(),
)
self.publish_raw(self.topic_inbox(recipient_id), msg.to_json())
logger.info(f"[{self.agent_id}] → {recipient_id}: {str(payload)[:80]}")
return msg
def reply(self, original: Message, result: str, msg_type: str = MessageType.RESULT):
"""Répondre à un message reçu (utilise reply_to ou inbox du sender)."""
reply_msg = original.make_reply(self.agent_id, result, msg_type)
target_topic = original.reply_to or self.topic_inbox(original.sender)
self.publish_raw(target_topic, reply_msg.to_json())
logger.info(f"[{self.agent_id}] ↩ {original.sender}: {str(result)[:80]}")
return reply_msg
def broadcast(self, payload: str, msg_type: str = MessageType.BROADCAST) -> Message:
"""Envoyer un message à tous les agents."""
msg = Message(
msg_type=msg_type,
payload=payload,
sender=self.agent_id,
)
self.publish_raw(self.topic_broadcast(), msg.to_json())
return msg
def alert(self, message: str, severity: str = "warning"):
"""Envoyer une alerte proactive à l'orchestrateur."""
# Cherche le nexus dans les agents connus ou utilise un topic dédié
msg = Message(
msg_type=MessageType.ALERT,
payload=message,
sender=self.agent_id,
metadata={"severity": severity},
)
# On publie sur l'inbox de l'orchestrateur (nom configurable)
self.publish_raw("agents/nexus/inbox", msg.to_json())
def publish_status(self, status: str, extra: Optional[dict] = None):
"""Publier le statut de l'agent (retained)."""
payload = {"agent_id": self.agent_id, "status": status}
if extra:
payload.update(extra)
self.publish_raw(self.topic_status(), payload, retain=True)
def publish_capabilities(self, capabilities: dict):
"""Publier les capacités de l'agent (retained)."""
self.publish_raw(self.topic_capabilities(), capabilities, retain=True)
+99
View File
@@ -0,0 +1,99 @@
"""
Système de chargement de skills (plugins).
Chaque skill est un module Python avec une fonction run(args, context) → str.
"""
import importlib.util
import logging
import os
from typing import Optional
logger = logging.getLogger(__name__)
class Skill:
def __init__(self, name: str, description: str, usage: str, module):
self.name = name
self.description = description
self.usage = usage
self._module = module
def run(self, args: str, context: "AgentContext") -> str:
try:
return self._module.run(args, context)
except Exception as e:
logger.error(f"[Skill:{self.name}] Erreur: {e}", exc_info=True)
return f"Erreur dans le skill '{self.name}': {e}"
class SkillLoader:
def __init__(self):
self._skills: dict[str, Skill] = {}
def load_directory(self, skills_dir: str):
"""Charge tous les skills d'un dossier."""
if not os.path.isdir(skills_dir):
logger.warning(f"Dossier skills introuvable : {skills_dir}")
return
for filename in sorted(os.listdir(skills_dir)):
if filename.startswith("_") or not filename.endswith(".py"):
continue
skill_name = filename[:-3]
skill_path = os.path.join(skills_dir, filename)
self._load_skill(skill_name, skill_path)
logger.info(f"[SkillLoader] {len(self._skills)} skill(s) chargé(s) : {list(self._skills.keys())}")
def _load_skill(self, name: str, path: str):
try:
spec = importlib.util.spec_from_file_location(name, path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Vérifie les attributs requis
if not hasattr(module, "run"):
logger.warning(f"[SkillLoader] {name} ignoré : pas de fonction run()")
return
description = getattr(module, "DESCRIPTION", "Pas de description")
usage = getattr(module, "USAGE", f"SKILL:{name} ARGS:<arguments>")
self._skills[name] = Skill(name, description, usage, module)
logger.debug(f"[SkillLoader] Skill chargé : {name}")
except Exception as e:
logger.error(f"[SkillLoader] Erreur chargement skill {name}: {e}")
def get(self, name: str) -> Optional[Skill]:
return self._skills.get(name)
def run(self, name: str, args: str, context) -> str:
skill = self.get(name)
if skill is None:
return f"Skill inconnu : '{name}'. Skills disponibles : {self.list_names()}"
return skill.run(args, context)
def list_names(self) -> list[str]:
return list(self._skills.keys())
def capabilities_summary(self) -> list[dict]:
"""Retourne la liste des skills pour la déclaration de capacités."""
return [
{"name": s.name, "description": s.description, "usage": s.usage}
for s in self._skills.values()
]
def system_prompt_section(self) -> str:
"""Génère la section du system prompt décrivant les skills disponibles."""
if not self._skills:
return ""
lines = ["## Skills disponibles\n"]
for s in self._skills.values():
lines.append(f"- **{s.name}** : {s.description}")
lines.append(f" Usage : `{s.usage}`")
lines.append(
"\nPour utiliser un skill, réponds avec une ligne au format :\n"
"`SKILL:<nom> ARGS:<arguments>`\n"
"Tu peux enchaîner plusieurs skills. Explique brièvement ce que tu fais."
)
return "\n".join(lines)
+183
View File
@@ -0,0 +1,183 @@
"""
File d'attente de tâches persistante (SQLite).
Commune à tous les agents — FIFO avec pause/reprise.
"""
import sqlite3
import threading
import logging
import time
from datetime import datetime, timezone
from typing import Optional, Callable
logger = logging.getLogger(__name__)
class TaskStatus:
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
class Task:
def __init__(self, task_id: int, payload: str, correlation_id: str,
sender: str, reply_to: Optional[str], received_at: str):
self.id = task_id
self.payload = payload
self.correlation_id = correlation_id
self.sender = sender
self.reply_to = reply_to
self.received_at = received_at
class TaskQueue:
def __init__(self, db_path: str):
self.db_path = db_path
self._paused = False
self._running = False
self._worker_thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
self._init_db()
def _init_db(self):
with self._connect() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
payload TEXT NOT NULL,
correlation_id TEXT NOT NULL,
sender TEXT NOT NULL,
reply_to TEXT,
status TEXT NOT NULL DEFAULT 'pending',
result TEXT,
duration_s REAL
)
""")
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def enqueue(self, payload: str, correlation_id: str, sender: str,
reply_to: Optional[str] = None) -> int:
now = datetime.now(timezone.utc).isoformat()
with self._connect() as conn:
cur = conn.execute(
"INSERT INTO tasks (received_at, payload, correlation_id, sender, reply_to, status) "
"VALUES (?, ?, ?, ?, ?, ?)",
(now, payload, correlation_id, sender, reply_to, TaskStatus.PENDING)
)
task_id = cur.lastrowid
logger.info(f"[TaskQueue] Tâche #{task_id} en file (sender={sender})")
return task_id
def _next_task(self) -> Optional[Task]:
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM tasks WHERE status = ? ORDER BY id LIMIT 1",
(TaskStatus.PENDING,)
).fetchone()
if row is None:
return None
conn.execute(
"UPDATE tasks SET status = ?, started_at = ? WHERE id = ?",
(TaskStatus.IN_PROGRESS, datetime.now(timezone.utc).isoformat(), row["id"])
)
return Task(
task_id=row["id"],
payload=row["payload"],
correlation_id=row["correlation_id"],
sender=row["sender"],
reply_to=row["reply_to"],
received_at=row["received_at"],
)
def complete(self, task_id: int, result: str, success: bool = True):
now = datetime.now(timezone.utc).isoformat()
status = TaskStatus.COMPLETED if success else TaskStatus.FAILED
with self._connect() as conn:
row = conn.execute("SELECT started_at FROM tasks WHERE id = ?", (task_id,)).fetchone()
duration = None
if row and row["started_at"]:
try:
started = datetime.fromisoformat(row["started_at"])
duration = (datetime.now(timezone.utc) - started).total_seconds()
except Exception:
pass
conn.execute(
"UPDATE tasks SET status = ?, completed_at = ?, result = ?, duration_s = ? WHERE id = ?",
(status, now, result[:4000], duration, task_id)
)
logger.info(f"[TaskQueue] Tâche #{task_id} {'terminée' if success else 'échouée'} en {duration:.1f}s" if duration else f"[TaskQueue] Tâche #{task_id} {status}")
def pause(self):
with self._lock:
self._paused = True
logger.info("[TaskQueue] En pause")
def resume(self):
with self._lock:
self._paused = False
logger.info("[TaskQueue] Reprise")
@property
def is_paused(self) -> bool:
with self._lock:
return self._paused
def start_worker(self, handler: Callable[[Task], tuple[str, bool]]):
"""
Lance le worker en arrière-plan.
handler(task) → (result_str, success_bool)
"""
self._running = True
self._worker_thread = threading.Thread(
target=self._worker_loop,
args=(handler,),
daemon=True,
name="task-queue-worker",
)
self._worker_thread.start()
def stop_worker(self):
self._running = False
def _worker_loop(self, handler: Callable):
while self._running:
if self.is_paused:
time.sleep(1)
continue
task = self._next_task()
if task is None:
time.sleep(0.5)
continue
try:
result, success = handler(task)
except Exception as e:
result, success = str(e), False
self.complete(task.id, result, success)
def daily_stats(self) -> dict:
today = datetime.now(timezone.utc).date().isoformat()
with self._connect() as conn:
rows = conn.execute(
"SELECT status, duration_s FROM tasks WHERE DATE(received_at) = ?", (today,)
).fetchall()
total = len(rows)
completed = sum(1 for r in rows if r["status"] == TaskStatus.COMPLETED)
failed = sum(1 for r in rows if r["status"] == TaskStatus.FAILED)
durations = [r["duration_s"] for r in rows if r["duration_s"] is not None]
avg_duration = sum(durations) / len(durations) if durations else 0
return {
"date": today,
"total": total,
"completed": completed,
"failed": failed,
"pending": total - completed - failed,
"avg_duration_s": round(avg_duration, 2),
}
+229
View File
@@ -0,0 +1,229 @@
"""
Client XMPP avec support OMEMO, MUC (groupe), messages directs, et multi-utilisateurs.
"""
import logging
import threading
from typing import Callable, Optional
from slixmpp import ClientXMPP
logger = logging.getLogger(__name__)
class XMPPClient:
"""
Client XMPP simplifié avec :
- Messages directs (1-to-1)
- Groupes MUC (Multi-User Chat)
- OMEMO (si slixmpp-omemo installé)
- Multi-utilisateurs : liste de JIDs autorisés à envoyer des commandes
- Callback unique pour les messages entrants
"""
def __init__(
self,
jid: str,
password: str,
admin_jids: Optional[list[str]] = None,
muc_room: Optional[str] = None,
muc_nick: Optional[str] = None,
use_omemo: bool = False,
):
self.jid = jid
self.password = password
self.muc_room = muc_room
self.muc_nick = muc_nick or jid.split("@")[0]
self.use_omemo = use_omemo
# Normalise : accepte str ou list, stocke toujours un set de bare JIDs
if admin_jids is None:
self.admin_jids: set[str] = set()
elif isinstance(admin_jids, str):
self.admin_jids = {admin_jids.lower().split("/")[0]} if admin_jids else set()
else:
self.admin_jids = {j.lower().split("/")[0] for j in admin_jids if j}
# Rétro-compat : admin_jid pointe vers le premier JID
self.admin_jid: Optional[str] = next(iter(self.admin_jids), None)
self._message_callback: Optional[Callable] = None
self._client: Optional[_SlixClient] = None
self._thread: Optional[threading.Thread] = None
self._connected = threading.Event()
# ──────────────────────────────────────────────
# API publique
# ──────────────────────────────────────────────
def set_message_callback(self, callback: Callable[[str, str, bool], None]):
"""
Définit le callback pour les messages reçus.
callback(sender_jid, body, is_muc)
Seuls les messages de admin_jids sont transmis (+ MUC).
"""
self._message_callback = callback
def is_authorized(self, jid: str) -> bool:
"""Vérifie si un JID est autorisé à envoyer des commandes."""
if not self.admin_jids:
return True # Aucun filtre configuré → tout le monde
bare = jid.lower().split("/")[0]
return bare in self.admin_jids
def connect_async(self):
"""Connexion XMPP dans un thread dédié."""
self._client = _SlixClient(
jid=self.jid,
password=self.password,
muc_room=self.muc_room,
muc_nick=self.muc_nick,
use_omemo=self.use_omemo,
on_message=self._on_message,
on_connected=self._connected.set,
)
self._thread = threading.Thread(
target=self._client.start,
daemon=True,
name="xmpp-client",
)
self._thread.start()
threading.Thread(target=self._wait_and_log, daemon=True).start()
def _wait_and_log(self):
if self._connected.wait(timeout=30):
logger.info(f"[XMPP] Connecté : {self.jid}")
if self.muc_room:
logger.info(f"[XMPP] Groupe rejoint : {self.muc_room}")
if self.admin_jids:
logger.info(f"[XMPP] Admins autorisés : {', '.join(sorted(self.admin_jids))}")
else:
logger.warning("[XMPP] Timeout connexion")
def _on_message(self, sender: str, body: str, is_muc: bool):
"""Filtre les messages : seuls les admins sont traités (sauf MUC)."""
if not is_muc and not self.is_authorized(sender):
logger.debug(f"[XMPP] Message ignoré (non autorisé) : {sender}")
return
if self._message_callback:
try:
self._message_callback(sender, body, is_muc)
except Exception as e:
logger.error(f"[XMPP] Erreur callback : {e}")
def send_message(self, to: str, body: str, is_muc: bool = False):
"""Envoie un message XMPP (direct ou MUC)."""
if self._client is None:
logger.warning("[XMPP] Client non initialisé")
return
self._client.send_xmpp_message(to, body, is_muc)
def send_to_all_admins(self, body: str):
"""Envoie un message à tous les admins."""
for jid in self.admin_jids:
self.send_message(jid, body)
def send_to_admin(self, body: str):
"""Envoie au premier admin (rétro-compat)."""
if self.admin_jid:
self.send_message(self.admin_jid, body)
def send_to_group(self, body: str):
"""Envoie dans le groupe MUC."""
if self.muc_room:
self.send_message(self.muc_room, body, is_muc=True)
def add_admin(self, jid: str):
"""Ajoute un JID autorisé à la volée."""
bare = jid.lower().split("/")[0]
self.admin_jids.add(bare)
if not self.admin_jid:
self.admin_jid = bare
logger.info(f"[XMPP] Admin ajouté : {bare}")
def remove_admin(self, jid: str):
"""Retire un JID autorisé."""
bare = jid.lower().split("/")[0]
self.admin_jids.discard(bare)
if self.admin_jid == bare:
self.admin_jid = next(iter(self.admin_jids), None)
logger.info(f"[XMPP] Admin retiré : {bare}")
def disconnect(self):
if self._client:
try:
self._client.disconnect()
except Exception:
pass
class _SlixClient(ClientXMPP):
"""Implémentation interne slixmpp."""
def __init__(self, jid, password, muc_room, muc_nick,
use_omemo, on_message, on_connected):
super().__init__(jid, password)
self._muc_room = muc_room
self._muc_nick = muc_nick
self._on_message_cb = on_message
self._on_connected_cb = on_connected
self.register_plugin("xep_0030") # Service Discovery
self.register_plugin("xep_0045") # MUC
self.register_plugin("xep_0085") # Chat state
self.register_plugin("xep_0199") # Ping keepalive
if use_omemo:
self._setup_omemo()
self.add_event_handler("session_start", self._on_session_start)
self.add_event_handler("message", self._on_message)
self.add_event_handler("groupchat_message", self._on_muc_message)
def _setup_omemo(self):
try:
self.register_plugin("xep_0384")
logger.info("[XMPP] OMEMO activé")
except Exception as e:
logger.warning(f"[XMPP] OMEMO non disponible : {e}")
def start(self):
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = loop
self.connect()
loop.run_forever()
async def _on_session_start(self, event):
self.send_presence()
await self.get_roster()
if self._muc_room:
self.plugin["xep_0045"].join_muc(
room=self._muc_room,
nick=self._muc_nick,
)
if self._on_connected_cb:
self._on_connected_cb()
def _on_message(self, msg):
if msg["type"] in ("chat", "normal") and msg["body"]:
body = msg["body"].strip()
if body:
self._on_message_cb(str(msg["from"]), body, is_muc=False)
def _on_muc_message(self, msg):
if msg["mucnick"] == self._muc_nick:
return
if msg["body"]:
body = msg["body"].strip()
if body:
self._on_message_cb(str(msg["from"]), body, is_muc=True)
def send_xmpp_message(self, to: str, body: str, is_muc: bool = False):
import functools
msg_type = "groupchat" if is_muc else "chat"
fn = functools.partial(self.send_message, mto=to, mbody=body, mtype=msg_type)
if hasattr(self, 'loop') and self.loop and self.loop.is_running():
self.loop.call_soon_threadsafe(fn)
else:
fn()