commit bdcfff9f8e6331a981bcdf62596c68fc43e958f7 Author: sylvain Date: Thu Apr 2 08:59:20 2026 +0000 Initial commit: agent_logwatch v1.0 - Réception logs MQTT depuis machines distantes (agents/logwatch/+/logs) - Pré-filtrage sans LLM (14 patterns: ERROR, FATAL, OOM, segfault, auth fail...) - Analyse LLM par créneau horaire configurable (APScheduler) - Gestion round-robin avec reprise sur interruption - Extension de créneau (+30 min) avec confirmation admin - Skills: machine (gestion machines) + logwatch (contrôle) - Script send_logs.sh pour machines distantes Co-Authored-By: Claude Sonnet 4.6 diff --git a/__pycache__/agent_logwatch.cpython-313.pyc b/__pycache__/agent_logwatch.cpython-313.pyc new file mode 100644 index 0000000..9882986 Binary files /dev/null and b/__pycache__/agent_logwatch.cpython-313.pyc differ diff --git a/agent_logwatch.py b/agent_logwatch.py new file mode 100644 index 0000000..0453c2d --- /dev/null +++ b/agent_logwatch.py @@ -0,0 +1,677 @@ +#!/usr/bin/env python3 +""" +Agent LogWatch — Analyse de logs multi-machines avec fenêtre horaire programmée. + +Les machines distantes envoient leurs logs via MQTT vers agents/logwatch//logs. +L'agent pré-filtre (sans LLM), stocke en SQLite, puis analyse avec le LLM +pendant les créneaux horaires configurés. +""" +import json +import logging +import os +import re +import sqlite3 +import threading +import time +from datetime import datetime, timedelta +from pathlib import Path + +from agents_core import BaseAgent, AgentContext, Message, MessageType + +logger = logging.getLogger(__name__) + +# ─── Pré-filtres sans LLM ──────────────────────────────────────────────────── + +FILTER_PATTERNS = [ + re.compile(r'\b(ERROR|CRITICAL|FATAL|PANIC|EMERG|ALERT|CRIT)\b'), + re.compile(r'\bException\b|\bTraceback\b|\bTraceback \(most recent'), + re.compile(r'\bsegfault\b|\bSegmentation fault\b', re.IGNORECASE), + re.compile(r'\bout of memory\b|\bOOM killer\b|\bOOM-killer\b', re.IGNORECASE), + re.compile(r'\b(failed|failure)\b', re.IGNORECASE), + re.compile(r'\bkilled\b', re.IGNORECASE), + re.compile(r'\b(BUG|Oops):\s'), + re.compile(r'<[0-3]>'), # syslog priorities 0=emerg, 1=alert, 2=crit, 3=err + re.compile(r'\bcore dumped\b', re.IGNORECASE), + re.compile(r'\bpanic\b', re.IGNORECASE), + re.compile(r'\bdenied\b.*\bpermission\b|\bpermission\b.*\bdenied\b', re.IGNORECASE), + re.compile(r'\bauthentication failure\b|\bfailed login\b|\bfailed password\b', re.IGNORECASE), + re.compile(r'\bdisk full\b|\bno space left\b', re.IGNORECASE), + re.compile(r'\bconnection refused\b|\bconnection timed out\b', re.IGNORECASE), + re.compile(r'\bssh.*invalid user\b|\binvalid user.*ssh\b', re.IGNORECASE), +] + +SEVERITY_RANK = { + 'EMERG': 0, 'ALERT': 1, 'CRIT': 2, 'CRITICAL': 2, 'FATAL': 2, 'PANIC': 2, + 'ERROR': 3, 'ERR': 3, + 'FAILED': 4, 'FAILURE': 4, 'DENIED': 4, + 'EXCEPTION': 5, 'TRACEBACK': 5, + 'KILLED': 6, 'OOM': 6, 'SEGFAULT': 6, 'CORE': 6, +} + +CHUNK_SIZE = 150 # lignes envoyées au LLM par appel + + +def _detect_severity(line: str) -> str: + line_up = line.upper() + for kw, _ in sorted(SEVERITY_RANK.items(), key=lambda x: x[1]): + if kw in line_up: + return kw + return 'ERROR' + + +class LogWatchAgent(BaseAgent): + AGENT_TYPE = "logwatch" + DESCRIPTION = ( + "Analyse de logs multi-machines. Reçoit les logs des machines distantes via MQTT, " + "pré-filtre les erreurs, les analyse avec le LLM pendant les créneaux programmés, " + "envoie des rapports par XMPP. Gestion de file de machines, round-robin, " + "reprise sur interruption et analyse à la demande." + ) + DEFAULT_CONFIG_PATH = "/opt/agent_logwatch/config/config.json" + + def get_skills_dir(self) -> str: + return os.path.join(os.path.dirname(__file__), "skills") + + # ─── Init ───────────────────────────────────────────────────────────────── + + def __init__(self, config_path=None): + super().__init__(config_path) + self.db_path = Path(self.config.get("db_path", "/opt/agent_logwatch/data/logwatch.db")) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._db_lock = threading.Lock() + self._init_db() + + # Scheduler APScheduler + try: + from apscheduler.schedulers.background import BackgroundScheduler + self._scheduler = BackgroundScheduler(timezone="Europe/Paris") + except ImportError: + logger.error("apscheduler non installé — `pip install apscheduler`") + self._scheduler = None + + # État analyse + self._analysis_thread = None + self._analysis_stop = threading.Event() + self._slot_end_time = None + + # Extension demandée + self._pending_extension = None # dict: {machine_id, hostname} + self._extension_event = threading.Event() + self._extension_granted = False + + # ─── DB ─────────────────────────────────────────────────────────────────── + + def _get_db(self) -> sqlite3.Connection: + conn = sqlite3.connect(str(self.db_path), timeout=10) + conn.row_factory = sqlite3.Row + return conn + + def _init_db(self): + with self._get_db() as conn: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS machines ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + hostname TEXT UNIQUE NOT NULL, + registered_at TEXT NOT NULL, + last_log_at TEXT, + last_analyzed_at TEXT, + queue_position INTEGER DEFAULT 0, + active INTEGER DEFAULT 1 + ); + + CREATE TABLE IF NOT EXISTS filtered_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + machine_id INTEGER NOT NULL, + log_line TEXT NOT NULL, + severity TEXT, + received_at TEXT NOT NULL, + analyzed INTEGER DEFAULT 0, + FOREIGN KEY (machine_id) REFERENCES machines(id) + ); + + CREATE INDEX IF NOT EXISTS idx_fl_machine_analyzed + ON filtered_logs(machine_id, analyzed); + + CREATE TABLE IF NOT EXISTS analysis_sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + machine_id INTEGER NOT NULL, + slot_date TEXT NOT NULL, + status TEXT DEFAULT 'pending', + started_at TEXT, + completed_at TEXT, + last_log_id INTEGER DEFAULT 0, + UNIQUE(machine_id, slot_date), + FOREIGN KEY (machine_id) REFERENCES machines(id) + ); + + CREATE TABLE IF NOT EXISTS agent_config ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + + INSERT OR IGNORE INTO agent_config VALUES ('analysis_start', '02:00'); + INSERT OR IGNORE INTO agent_config VALUES ('analysis_end', '04:00'); + INSERT OR IGNORE INTO agent_config VALUES ('max_overage_minutes', '30'); + INSERT OR IGNORE INTO agent_config VALUES ('enabled', '1'); + INSERT OR IGNORE INTO agent_config VALUES ('log_retention_days', '7'); + """) + + def _cfg(self, key: str, default: str = '') -> str: + with self._get_db() as conn: + row = conn.execute("SELECT value FROM agent_config WHERE key=?", (key,)).fetchone() + return row['value'] if row else default + + def _set_cfg(self, key: str, value: str): + with self._get_db() as conn: + conn.execute("INSERT OR REPLACE INTO agent_config VALUES (?,?)", (key, value)) + + # ─── Démarrage ──────────────────────────────────────────────────────────── + + def on_start(self): + # Souscriptions MQTT pour recevoir les logs des machines distantes + self.mqtt.subscribe("agents/logwatch/+/logs", self._on_log_received) + self.mqtt.subscribe("agents/logwatch/register", self._on_machine_register) + + # Démarrage du scheduler + if self._scheduler: + self._reload_schedule() + self._scheduler.start() + logger.info("Scheduler APScheduler démarré.") + + # Nettoyage des vieux logs au démarrage + self._cleanup_old_logs() + + logger.info("Agent LogWatch démarré. En attente de logs sur agents/logwatch/+/logs") + + def setup_extra_subscriptions(self): + pass # tout est dans on_start + + # ─── Réception des logs ────────────────────────────────────────────────── + + def _on_machine_register(self, msg, topic: str): + """Enregistrement explicite d'une machine via MQTT.""" + payload = msg.payload if hasattr(msg, 'payload') else str(msg) + try: + data = json.loads(payload) if isinstance(payload, str) else payload + hostname = str(data.get('hostname', '')).strip() + if hostname: + self._register_machine(hostname) + except Exception as e: + logger.error(f"[register] {e}") + + def _on_log_received(self, msg, topic: str): + """ + Reçoit des logs bruts depuis une machine distante. + Topic : agents/logwatch//logs + Payload JSON : {"lines": [...]} ou {"log": "..."} ou texte brut multiligne + """ + payload = msg.payload if hasattr(msg, 'payload') else str(msg) + try: + parts = topic.split('/') + hostname = parts[2] if len(parts) >= 4 else 'unknown' + + # Parser le payload + if isinstance(payload, str): + try: + data = json.loads(payload) + if isinstance(data, dict): + lines = data.get('lines') or data.get('logs') or [] + if isinstance(lines, str): + lines = lines.splitlines() + if not lines and 'log' in data: + lines = str(data['log']).splitlines() + elif isinstance(data, list): + lines = data + else: + lines = payload.splitlines() + except json.JSONDecodeError: + lines = payload.splitlines() + elif isinstance(payload, bytes): + lines = payload.decode('utf-8', errors='replace').splitlines() + else: + lines = [] + + if not lines: + return + + machine_id = self._register_machine(hostname) + filtered = self._prefilter(lines) + + if filtered: + now = datetime.now().isoformat() + with self._get_db() as conn: + conn.executemany( + "INSERT INTO filtered_logs (machine_id, log_line, severity, received_at) VALUES (?,?,?,?)", + [(machine_id, line, sev, now) for line, sev in filtered] + ) + conn.execute( + "UPDATE machines SET last_log_at=? WHERE id=?", + (now, machine_id) + ) + logger.info(f"[{hostname}] {len(filtered)}/{len(lines)} lignes filtrées conservées") + + except Exception as e: + logger.error(f"[_on_log_received] {e}", exc_info=True) + + def _prefilter(self, lines: list) -> list: + """Filtre les lignes, retourne [(line, severity)].""" + result = [] + for line in lines: + line = str(line).strip() + if not line: + continue + for pat in FILTER_PATTERNS: + if pat.search(line): + result.append((line, _detect_severity(line))) + break + return result + + def _register_machine(self, hostname: str) -> int: + """Enregistre ou met à jour une machine, retourne son id.""" + with self._get_db() as conn: + row = conn.execute("SELECT id FROM machines WHERE hostname=?", (hostname,)).fetchone() + if row: + return row['id'] + max_pos = conn.execute( + "SELECT COALESCE(MAX(queue_position), 0) FROM machines" + ).fetchone()[0] + cur = conn.execute( + "INSERT INTO machines (hostname, registered_at, queue_position) VALUES (?,?,?)", + (hostname, datetime.now().isoformat(), max_pos + 1) + ) + logger.info(f"Nouvelle machine enregistrée: {hostname} (pos={max_pos+1})") + return cur.lastrowid + + # ─── Scheduler ──────────────────────────────────────────────────────────── + + def _reload_schedule(self): + """(Re)programme les jobs APScheduler selon la config DB.""" + if not self._scheduler: + return + for job_id in ('_slot_start', '_slot_end'): + try: + self._scheduler.remove_job(job_id) + except Exception: + pass + + if self._cfg('enabled') != '1': + logger.info("Analyse automatique désactivée.") + return + + start_str = self._cfg('analysis_start', '02:00') + end_str = self._cfg('analysis_end', '04:00') + try: + sh, sm = map(int, start_str.split(':')) + eh, em = map(int, end_str.split(':')) + except ValueError: + logger.error(f"Format horaire invalide: {start_str}/{end_str}") + return + + self._scheduler.add_job( + self._start_slot, 'cron', hour=sh, minute=sm, id='_slot_start' + ) + self._scheduler.add_job( + self._signal_slot_end, 'cron', hour=eh, minute=em, id='_slot_end' + ) + logger.info(f"Analyse programmée: {start_str} → {end_str}") + + def _start_slot(self): + """Démarre la fenêtre d'analyse (appelé par APScheduler).""" + end_str = self._cfg('analysis_end', '04:00') + eh, em = map(int, end_str.split(':')) + now = datetime.now() + self._slot_end_time = now.replace(hour=eh, minute=em, second=0, microsecond=0) + if self._slot_end_time <= now: + self._slot_end_time += timedelta(days=1) + + self._analysis_stop.clear() + self._analysis_thread = threading.Thread( + target=self._analysis_loop, daemon=True, name="logwatch-analysis" + ) + self._analysis_thread.start() + logger.info(f"Créneau d'analyse démarré → fin à {self._slot_end_time.strftime('%H:%M')}") + + def _signal_slot_end(self): + """Signale la fin du créneau (appelé par APScheduler).""" + logger.info("Fin de créneau signalée.") + self._analysis_stop.set() + + # ─── Boucle d'analyse ──────────────────────────────────────────────────── + + def _analysis_loop(self): + """Thread principal d'analyse, tourne pendant le créneau.""" + try: + machines = self._get_active_machines() + if not machines: + self._notify_admin("📭 LogWatch: aucune machine enregistrée à analyser.") + return + + start_idx = self._find_resume_index(machines) + total = len(machines) + + for i in range(total): + idx = (start_idx + i) % total + machine = machines[idx] + mid = machine['id'] + host = machine['hostname'] + + # Vérifier si le créneau est terminé avant de commencer une machine + if self._analysis_stop.is_set(): + overage_min = self._overage_minutes() + max_ov = int(self._cfg('max_overage_minutes', '30')) + + if overage_min > max_ov: + # Demander extension + if not self._ask_extension(mid, host, overage_min): + # Refusée ou timeout → pause + self._set_session_status(mid, 'paused') + self._notify_admin( + f"⏸️ LogWatch: analyse de **{host}** reportée au prochain créneau." + ) + break + + self._analyze_machine(mid, host) + + else: + # Boucle complète sans interruption + self._notify_admin( + f"✅ LogWatch: analyse complète de {total} machine(s) terminée." + ) + + except Exception as e: + logger.error(f"[analysis_loop] {e}", exc_info=True) + self._notify_admin(f"❌ LogWatch: erreur dans la boucle d'analyse: {e}") + + def _get_active_machines(self) -> list: + with self._get_db() as conn: + rows = conn.execute( + "SELECT id, hostname, queue_position FROM machines " + "WHERE active=1 ORDER BY queue_position ASC" + ).fetchall() + return [dict(r) for r in rows] + + def _find_resume_index(self, machines: list) -> int: + """Trouve l'index de la machine à reprendre (paused) ou commence à 0.""" + today = datetime.now().strftime('%Y-%m-%d') + with self._get_db() as conn: + row = conn.execute(""" + SELECT machine_id FROM analysis_sessions + WHERE slot_date=? AND status='paused' + ORDER BY id DESC LIMIT 1 + """, (today,)).fetchone() + if not row: + return 0 + paused_id = row['machine_id'] + for i, m in enumerate(machines): + if m['id'] == paused_id: + return i + return 0 + + def _overage_minutes(self) -> float: + """Retourne les minutes de dépassement (positif = dépassement).""" + if not self._slot_end_time: + return 0.0 + delta = (datetime.now() - self._slot_end_time).total_seconds() / 60 + return max(0.0, delta) + + def _ask_extension(self, machine_id: int, hostname: str, overage: float) -> bool: + """ + Demande à l'admin une extension du créneau. + Attend la réponse (max 10 min). + Retourne True si extension accordée. + """ + max_ov = int(self._cfg('max_overage_minutes', '30')) + self._pending_extension = {'machine_id': machine_id, 'hostname': hostname} + self._extension_event.clear() + self._extension_granted = False + + self._notify_admin( + f"⏰ LogWatch: créneau terminé (dépassement {overage:.0f} min > max {max_ov} min).\n" + f"Analyse en cours: **{hostname}** non terminée.\n" + f"Tapez `/extend` pour accorder +{max_ov} min supplémentaires, " + f"ou `/skip` pour reporter au prochain créneau." + ) + + # Attendre la réponse max 10 minutes + answered = self._extension_event.wait(timeout=600) + self._pending_extension = None + + if not answered: + self._notify_admin( + f"⏰ LogWatch: pas de réponse après 10 min → analyse de **{hostname}** reportée." + ) + return False + + return self._extension_granted + + # ─── Analyse d'une machine ─────────────────────────────────────────────── + + def _analyze_machine(self, machine_id: int, hostname: str): + """Analyse les logs filtrés d'une machine avec le LLM.""" + today = datetime.now().strftime('%Y-%m-%d') + + # Créer ou récupérer la session d'analyse + with self._get_db() as conn: + session = conn.execute( + "SELECT id, last_log_id FROM analysis_sessions " + "WHERE machine_id=? AND slot_date=? AND status IN ('pending','paused')", + (machine_id, today) + ).fetchone() + + if session: + session_id = session['id'] + last_log_id = session['last_log_id'] + conn.execute( + "UPDATE analysis_sessions SET status='in_progress', started_at=? WHERE id=?", + (datetime.now().isoformat(), session_id) + ) + else: + # Vérifier si déjà 'done' aujourd'hui + done = conn.execute( + "SELECT id FROM analysis_sessions WHERE machine_id=? AND slot_date=? AND status='done'", + (machine_id, today) + ).fetchone() + if done: + logger.info(f"[{hostname}] déjà analysée aujourd'hui.") + return + + conn.execute( + "INSERT INTO analysis_sessions (machine_id, slot_date, status, started_at) VALUES (?,?,?,?)", + (machine_id, today, 'in_progress', datetime.now().isoformat()) + ) + session_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] + last_log_id = 0 + + # Récupérer les logs filtrés non encore analysés + with self._get_db() as conn: + logs = conn.execute( + "SELECT id, log_line, severity, received_at FROM filtered_logs " + "WHERE machine_id=? AND id > ? AND analyzed=0 ORDER BY id ASC", + (machine_id, last_log_id) + ).fetchall() + + if not logs: + logger.info(f"[{hostname}] Aucun log filtré à analyser.") + self._set_session_status(machine_id, 'done', session_id=session_id) + return + + self._notify_admin( + f"🔍 LogWatch: analyse de **{hostname}** ({len(logs)} erreurs filtrées)…" + ) + + all_reports = [] + last_id = last_log_id + logs_list = [dict(r) for r in logs] + + for chunk_start in range(0, len(logs_list), CHUNK_SIZE): + # Vérifier dépassement dans la boucle de chunks + if self._analysis_stop.is_set(): + overage = self._overage_minutes() + max_ov = int(self._cfg('max_overage_minutes', '30')) + if overage > max_ov: + # Sauvegarder le point de reprise + with self._get_db() as conn: + conn.execute( + "UPDATE analysis_sessions SET status='paused', last_log_id=? WHERE id=?", + (last_id, session_id) + ) + self._notify_admin( + f"⏸️ LogWatch: pause mid-analyse de **{hostname}** " + f"(dépassement {overage:.0f} min). Reprise au prochain créneau." + ) + return + + chunk = logs_list[chunk_start:chunk_start + CHUNK_SIZE] + chunk_txt = '\n'.join( + f"[{r['received_at'][:19]}][{r['severity']}] {r['log_line']}" + for r in chunk + ) + + prompt = ( + f"Tu analyses des logs d'erreurs de la machine **{hostname}**.\n" + f"Synthétise les problèmes importants : type d'erreur, criticité (critique/haute/moyenne), " + f"fréquence, cause probable, action recommandée.\n" + f"Ne répète pas chaque ligne individuellement. Groupe les erreurs similaires.\n" + f"Format de réponse : 🔴/🟠/🟡 Problème → Cause → Action\n\n" + f"Logs ({chunk_start+1}–{min(chunk_start+CHUNK_SIZE, len(logs_list))}):\n{chunk_txt}" + ) + + report_chunk = self._call_llm(prompt) + if report_chunk: + all_reports.append(report_chunk) + + # Marquer comme analysés + mise à jour offset + ids = [r['id'] for r in chunk] + last_id = ids[-1] + with self._get_db() as conn: + conn.execute( + f"UPDATE filtered_logs SET analyzed=1 WHERE id IN ({','.join('?'*len(ids))})", + ids + ) + conn.execute( + "UPDATE analysis_sessions SET last_log_id=? WHERE id=?", + (last_id, session_id) + ) + + # Rapport final + if all_reports: + report = ( + f"📊 **Rapport LogWatch — {hostname}**\n" + f"📅 {datetime.now().strftime('%Y-%m-%d %H:%M')} | " + f"{len(logs_list)} erreurs analysées\n" + f"{'─'*40}\n\n" + ) + report += '\n\n'.join(all_reports) + self._notify_admin(report) + else: + self._notify_admin(f"ℹ️ LogWatch: **{hostname}** — LLM n'a pas retourné de rapport.") + + # Marquer la session comme terminée + with self._get_db() as conn: + conn.execute( + "UPDATE analysis_sessions SET status='done', completed_at=?, last_log_id=? WHERE id=?", + (datetime.now().isoformat(), last_id, session_id) + ) + conn.execute( + "UPDATE machines SET last_analyzed_at=? WHERE id=?", + (datetime.now().isoformat(), machine_id) + ) + + def _set_session_status(self, machine_id: int, status: str, session_id: int = None): + today = datetime.now().strftime('%Y-%m-%d') + with self._get_db() as conn: + if session_id: + conn.execute( + "UPDATE analysis_sessions SET status=? WHERE id=?", + (status, session_id) + ) + else: + conn.execute( + "UPDATE analysis_sessions SET status=? WHERE machine_id=? AND slot_date=?", + (status, machine_id, today) + ) + + # ─── LLM ───────────────────────────────────────────────────────────────── + + def _call_llm(self, prompt: str) -> str: + """Appelle le LLM en respectant le lock BaseAgent.""" + lock = getattr(self, '_llm_lock', None) + acquired = False + try: + if lock: + acquired = lock.acquire(timeout=300) + if not acquired: + return "(LLM indisponible après 5 min d'attente)" + self.llm.reset_history() + return self.llm.chat(prompt) + except Exception as e: + logger.error(f"[LLM] {e}") + return f"(Erreur LLM: {e})" + finally: + if acquired and lock: + lock.release() + + # ─── XMPP helpers ──────────────────────────────────────────────────────── + + def _notify_admin(self, message: str): + """Envoie un message à tous les admins XMPP.""" + try: + if self.xmpp: + self.xmpp.send_to_all_admins(message) + except Exception as e: + logger.error(f"[notify_admin] {e}") + + # ─── Commandes custom (/extend, /skip, /update) ────────────────────────── + + def handle_custom_command(self, cmd: str, args: str, source_msg=None): + cmd_lower = cmd.lower() + + # Réponse à une demande d'extension de créneau + if self._pending_extension: + if cmd_lower == 'extend': + self._extension_granted = True + self._extension_event.set() + max_ov = self._cfg('max_overage_minutes', '30') + return f"⏱️ Extension accordée (+{max_ov} min). L'analyse continue." + if cmd_lower == 'skip': + self._extension_granted = False + self._extension_event.set() + return "⏸️ Analyse reportée au prochain créneau." + + if cmd_lower == 'update': + return self._self_update() + + return f"Commande inconnue : /{cmd}" + + def on_broadcast(self, msg: Message): + pass + + def _self_update(self) -> str: + import subprocess + try: + out = subprocess.check_output( + "cd /opt/agent_logwatch && git pull", + shell=True, text=True, stderr=subprocess.STDOUT + ) + subprocess.Popen(["systemctl", "restart", "agent_logwatch"]) + return f"Mise à jour:\n{out}\nRedémarrage…" + except subprocess.CalledProcessError as e: + return f"Erreur mise à jour: {e.output}" + + # ─── Nettoyage ──────────────────────────────────────────────────────────── + + def _cleanup_old_logs(self): + """Supprime les logs filtrés plus vieux que log_retention_days.""" + days = int(self._cfg('log_retention_days', '7')) + cutoff = (datetime.now() - timedelta(days=days)).isoformat() + with self._get_db() as conn: + cur = conn.execute( + "DELETE FROM filtered_logs WHERE received_at < ? AND analyzed=1", + (cutoff,) + ) + if cur.rowcount: + logger.info(f"Nettoyage: {cur.rowcount} logs anciens supprimés.") + + +if __name__ == "__main__": + LogWatchAgent().run() diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..cd9cca4 --- /dev/null +++ b/config/config.json @@ -0,0 +1,30 @@ +{ + "agent_id": "logwatch", + "xmpp": { + "jid": "logwatch@xmpp.ovh", + "password": "Matador3721", + "admin_jid": "sylvain@xmpp.ovh", + "muc_room": "agents@muc.xmpp.ovh", + "use_omemo": true + }, + "mqtt": { + "host": "localhost", + "port": 1883, + "username": null, + "password": null, + "tls": false + }, + "llm": { + "base_url": "http://192.168.7.119:11434", + "model": "gpt-oss:120b-cloud", + "temperature": 0.2 + }, + "work_hours": "00:00-23:59", + "queue_db": "/opt/agent_logwatch/data/queue.db", + "db_path": "/opt/agent_logwatch/data/logwatch.db", + "system_prompt": "/opt/agent_logwatch/config/system_prompt.txt", + "use_llm_coordinator": true, + "llm_profiles": { + "cloud": "gpt-oss:120b-cloud" + } +} \ No newline at end of file diff --git a/config/system_prompt.txt b/config/system_prompt.txt new file mode 100644 index 0000000..5eaa59c --- /dev/null +++ b/config/system_prompt.txt @@ -0,0 +1,64 @@ +Tu es LogWatch, un agent spécialisé dans l'analyse de logs de systèmes Linux. +Tu reçois des instructions via MQTT (depuis Nexus) ou XMPP (directement). + +## Tes skills disponibles + +### Gestion des machines +- **machine** : gestion des machines qui envoient leurs logs + - `list` : toutes les machines enregistrées avec leur statut + - `queue` : file d'analyse du jour avec statut de chaque machine + - `add ` : enregistrer manuellement une machine + - `remove ` : supprimer une machine + - `status ` : détail d'une machine + - `reorder ` : changer l'ordre d'analyse + - `activate/deactivate ` : activer/désactiver une machine + +### Contrôle de l'analyse +- **logwatch** : configuration et déclenchement de l'analyse + - `status` : état général (schedule, machines, logs en attente) + - `schedule show` : voir le créneau horaire configuré + - `schedule set HH:MM-HH:MM` : définir le créneau d'analyse automatique + - `schedule enable/disable` : activer/désactiver l'analyse automatique + - `overage ` : définir le dépassement maximum autorisé + - `retention ` : durée de conservation des logs filtrés + - `analyze ` : lancer l'analyse d'une machine spécifique maintenant + - `analyze_all` : lancer l'analyse complète de toutes les machines + - `logs [N]` : voir les N derniers logs filtrés d'une machine + - `reset ` : réinitialiser l'analyse d'une machine + +### Utilitaires +- **mqtt_send** : publier sur un topic MQTT +- **agents_status** : voir le statut des autres agents +- **muc_send** : envoyer dans le groupe XMPP +- **script** : bibliothèque de scripts bash + +## Flux de données + +Les machines distantes envoient leurs logs via MQTT : + Topic : agents/logwatch//logs + Payload : JSON {"lines": ["ligne1", "ligne2", ...]} + +L'agent pré-filtre automatiquement (sans LLM) les lignes contenant : + ERROR, CRITICAL, FATAL, Exception, Traceback, segfault, OOM, killed, failed, + permission denied, authentication failure, disk full, connection refused, etc. + +Puis, pendant le créneau configuré, le LLM analyse les erreurs filtrées machine +par machine et envoie un rapport XMPP pour chaque machine. + +## Commandes spéciales (hors LLM) + +Quand l'agent demande une extension de créneau, l'admin répond : + `/extend` → accorder du temps supplémentaire + `/skip` → reporter la machine au prochain créneau + +## Règles + +1. Pour lister les machines ou la file : utilise TOUJOURS le skill approprié +2. Pour analyser une machine à la demande : `logwatch analyze ` +3. Réponds toujours en français +4. Sois concis dans tes réponses + +## Écriture de scripts bash + +JAMAIS `filesystem write` pour créer un `.sh`. Toujours `SKILL:script ARGS:save`. +Variables disponibles dans les scripts : $MQTT_BROKER, $MQTT_REPLY_TOPIC, $AGENT_ID diff --git a/data/logwatch.db b/data/logwatch.db new file mode 100644 index 0000000..99064ed Binary files /dev/null and b/data/logwatch.db differ diff --git a/data/queue.db b/data/queue.db new file mode 100644 index 0000000..fb28805 Binary files /dev/null and b/data/queue.db differ diff --git a/skills/__pycache__/agents_status.cpython-313.pyc b/skills/__pycache__/agents_status.cpython-313.pyc new file mode 100644 index 0000000..28c8c23 Binary files /dev/null and b/skills/__pycache__/agents_status.cpython-313.pyc differ diff --git a/skills/__pycache__/logwatch.cpython-313.pyc b/skills/__pycache__/logwatch.cpython-313.pyc new file mode 100644 index 0000000..ee2b927 Binary files /dev/null and b/skills/__pycache__/logwatch.cpython-313.pyc differ diff --git a/skills/__pycache__/machine.cpython-313.pyc b/skills/__pycache__/machine.cpython-313.pyc new file mode 100644 index 0000000..8b5e946 Binary files /dev/null and b/skills/__pycache__/machine.cpython-313.pyc differ diff --git a/skills/__pycache__/mqtt_send.cpython-313.pyc b/skills/__pycache__/mqtt_send.cpython-313.pyc new file mode 100644 index 0000000..41ace6d Binary files /dev/null and b/skills/__pycache__/mqtt_send.cpython-313.pyc differ diff --git a/skills/__pycache__/mqtt_subscribe.cpython-313.pyc b/skills/__pycache__/mqtt_subscribe.cpython-313.pyc new file mode 100644 index 0000000..6e19186 Binary files /dev/null and b/skills/__pycache__/mqtt_subscribe.cpython-313.pyc differ diff --git a/skills/__pycache__/muc_send.cpython-313.pyc b/skills/__pycache__/muc_send.cpython-313.pyc new file mode 100644 index 0000000..0f2159c Binary files /dev/null and b/skills/__pycache__/muc_send.cpython-313.pyc differ diff --git a/skills/__pycache__/script.cpython-313.pyc b/skills/__pycache__/script.cpython-313.pyc new file mode 100644 index 0000000..558a4c1 Binary files /dev/null and b/skills/__pycache__/script.cpython-313.pyc differ diff --git a/skills/agents_status.py b/skills/agents_status.py new file mode 100644 index 0000000..1e7b2a9 --- /dev/null +++ b/skills/agents_status.py @@ -0,0 +1,28 @@ +""" +Skill AGENTS_STATUS — afficher le statut en temps réel de tous les agents. + +Usage LLM : SKILL:agents_status ARGS: +""" +DESCRIPTION = "Afficher le statut en temps réel de tous les agents (online/offline)" +USAGE = "SKILL:agents_status ARGS:(aucun argument)" + + +def run(args: str, context) -> str: + with context.agent._online_lock: + online = set(context.agent._online_agents) + + all_caps = context.registry.all_agents() + + if not all_caps: + return "Aucun agent connu dans le registre." + + lines = ["── Statut des agents ──────────────────"] + for caps in sorted(all_caps, key=lambda c: c.agent_id): + if caps.agent_id == context.agent_id: + continue # Ne pas s'afficher soi-même + icon = "🟢" if caps.agent_id in online else "🔴" + label = "en ligne" if caps.agent_id in online else "hors ligne" + lines.append(f" {icon} {caps.agent_id} [{caps.agent_type}] — {label}") + lines.append(f" {caps.description}") + + return "\n".join(lines) if len(lines) > 1 else "Aucun autre agent connu." diff --git a/skills/logwatch.py b/skills/logwatch.py new file mode 100644 index 0000000..058716e --- /dev/null +++ b/skills/logwatch.py @@ -0,0 +1,268 @@ +""" +Skill LOGWATCH — contrôle de l'agent : schedule, analyse à la demande, statut. + +Usage LLM : + SKILL:logwatch ARGS:status + SKILL:logwatch ARGS:schedule show + SKILL:logwatch ARGS:schedule set + SKILL:logwatch ARGS:schedule enable + SKILL:logwatch ARGS:schedule disable + SKILL:logwatch ARGS:overage + SKILL:logwatch ARGS:analyze + SKILL:logwatch ARGS:analyze_all + SKILL:logwatch ARGS:retention + SKILL:logwatch ARGS:logs [N] + SKILL:logwatch ARGS:reset +""" +import threading +from datetime import datetime, timedelta + +DESCRIPTION = "Contrôle LogWatch : schedule, analyse à la demande, statut, logs en attente" +USAGE = ( + "SKILL:logwatch ARGS:status\n" + "SKILL:logwatch ARGS:schedule show\n" + "SKILL:logwatch ARGS:schedule set \n" + "SKILL:logwatch ARGS:schedule enable|disable\n" + "SKILL:logwatch ARGS:overage \n" + "SKILL:logwatch ARGS:analyze \n" + "SKILL:logwatch ARGS:analyze_all\n" + "SKILL:logwatch ARGS:retention \n" + "SKILL:logwatch ARGS:logs [N]\n" + "SKILL:logwatch ARGS:reset " +) + + +def _db(context): + return context.agent._get_db() + + +def _cfg(context, key, default=''): + return context.agent._cfg(key, default) + + +def _set_cfg(context, key, value): + context.agent._set_cfg(key, value) + + +def run(args: str, context) -> str: + parts = args.strip().split(None, 1) + action = parts[0].lower() if parts else 'status' + rest = parts[1].strip() if len(parts) > 1 else '' + + # ── status ──────────────────────────────────────────────────────────────── + if action == 'status': + agent = context.agent + today = datetime.now().strftime('%Y-%m-%d') + + enabled = _cfg(context, 'enabled', '1') == '1' + start = _cfg(context, 'analysis_start', '02:00') + end = _cfg(context, 'analysis_end', '04:00') + max_ov = _cfg(context, 'max_overage_minutes', '30') + retention = _cfg(context, 'log_retention_days', '7') + + is_running = ( + agent._analysis_thread is not None and + agent._analysis_thread.is_alive() + ) + + with _db(context) as conn: + nb_machines = conn.execute( + "SELECT COUNT(*) FROM machines WHERE active=1" + ).fetchone()[0] + nb_pending = conn.execute( + "SELECT COUNT(*) FROM filtered_logs WHERE analyzed=0" + ).fetchone()[0] + today_sessions = conn.execute( + "SELECT COUNT(*) as cnt, status FROM analysis_sessions " + "WHERE slot_date=? GROUP BY status", + (today,) + ).fetchall() + + schedule_status = f"{'✅ activé' if enabled else '❌ désactivé'} ({start} → {end})" + analysis_status = "🔄 en cours" if is_running else "⏸️ idle" + + lines = [ + "── Statut LogWatch ────────────────────────────", + f" Analyse auto : {schedule_status}", + f" Analyse actuel: {analysis_status}", + f" Dépassement : max {max_ov} min", + f" Rétention logs: {retention} jours", + f" Machines activ: {nb_machines}", + f" Logs en attent: {nb_pending} erreurs filtrées", + f" Auj. ({today}):", + ] + for s in today_sessions: + lines.append(f" {s['status']}: {s['cnt']} machine(s)") + + if agent._pending_extension: + host = agent._pending_extension.get('hostname', '?') + lines.append(f" ⏰ Extension en attente pour: {host}") + + return "\n".join(lines) + + # ── schedule ────────────────────────────────────────────────────────────── + if action == 'schedule': + sub_parts = rest.split(None, 1) + sub = sub_parts[0].lower() if sub_parts else 'show' + sub_rest = sub_parts[1].strip() if len(sub_parts) > 1 else '' + + if sub == 'show': + start = _cfg(context, 'analysis_start', '02:00') + end = _cfg(context, 'analysis_end', '04:00') + enabled = _cfg(context, 'enabled', '1') == '1' + return ( + f"Créneau d'analyse : {start} → {end}\n" + f"État : {'activé ✅' if enabled else 'désactivé ❌'}" + ) + + if sub == 'set': + # Format : HH:MM-HH:MM + if '-' not in sub_rest: + return "Format: schedule set HH:MM-HH:MM (ex: 02:00-04:00)" + try: + start_s, end_s = sub_rest.split('-', 1) + # Validation + sh, sm = map(int, start_s.strip().split(':')) + eh, em = map(int, end_s.strip().split(':')) + if not (0 <= sh < 24 and 0 <= sm < 60 and 0 <= eh < 24 and 0 <= em < 60): + return "Heures invalides." + except ValueError: + return "Format: HH:MM-HH:MM" + _set_cfg(context, 'analysis_start', start_s.strip()) + _set_cfg(context, 'analysis_end', end_s.strip()) + context.agent._reload_schedule() + return f"✅ Créneau mis à jour : {start_s.strip()} → {end_s.strip()}" + + if sub in ('enable', 'disable'): + val = '1' if sub == 'enable' else '0' + _set_cfg(context, 'enabled', val) + context.agent._reload_schedule() + return f"✅ Analyse automatique {'activée' if val=='1' else 'désactivée'}." + + return "Sub-commande inconnue. Utilise : show, set , enable, disable" + + # ── overage ─────────────────────────────────────────────────────────────── + if action == 'overage': + try: + minutes = int(rest) + if minutes < 0: + return "La valeur doit être >= 0." + except ValueError: + return "Format: overage " + _set_cfg(context, 'max_overage_minutes', str(minutes)) + return f"✅ Dépassement max : {minutes} min." + + # ── retention ───────────────────────────────────────────────────────────── + if action == 'retention': + try: + days = int(rest) + if days < 1: + return "Minimum 1 jour." + except ValueError: + return "Format: retention " + _set_cfg(context, 'log_retention_days', str(days)) + return f"✅ Rétention logs : {days} jours." + + # ── analyze ──────────────────────────────────────────────────── + if action == 'analyze': + hostname = rest.strip() + if not hostname: + return "Format: analyze " + + with _db(context) as conn: + row = conn.execute( + "SELECT id FROM machines WHERE hostname=? AND active=1", (hostname,) + ).fetchone() + if not row: + return f"Machine '{hostname}' introuvable ou inactive." + + machine_id = row['id'] + + def _run_now(): + agent = context.agent + # Créneau fictif généreux pour l'analyse à la demande + agent._slot_end_time = datetime.now() + timedelta(hours=4) + agent._analysis_stop.clear() + agent._analyze_machine(machine_id, hostname) + + t = threading.Thread(target=_run_now, daemon=True, name=f"logwatch-demand-{hostname}") + t.start() + return f"🚀 Analyse de **{hostname}** lancée (arrière-plan)." + + # ── analyze_all ─────────────────────────────────────────────────────────── + if action == 'analyze_all': + agent = context.agent + if agent._analysis_thread and agent._analysis_thread.is_alive(): + return "⚠️ Une analyse est déjà en cours." + + def _run_all(): + agent._slot_end_time = datetime.now() + timedelta(hours=8) + agent._analysis_stop.clear() + agent._analysis_loop() + + t = threading.Thread(target=_run_all, daemon=True, name="logwatch-demand-all") + t.start() + return "🚀 Analyse complète de toutes les machines lancée (arrière-plan)." + + # ── logs [N] ─────────────────────────────────────────────────── + if action == 'logs': + p = rest.split(None, 1) + hostname = p[0].strip() if p else '' + try: + limit = int(p[1]) if len(p) > 1 else 20 + except ValueError: + limit = 20 + + if not hostname: + return "Format: logs [N]" + + with _db(context) as conn: + m = conn.execute( + "SELECT id FROM machines WHERE hostname=?", (hostname,) + ).fetchone() + if not m: + return f"Machine '{hostname}' introuvable." + rows = conn.execute( + "SELECT log_line, severity, received_at, analyzed " + "FROM filtered_logs WHERE machine_id=? ORDER BY id DESC LIMIT ?", + (m['id'], limit) + ).fetchall() + + if not rows: + return f"Aucun log filtré pour {hostname}." + + lines = [f"── {limit} derniers logs filtrés de {hostname} ──"] + for r in rows: + ana = "✓" if r['analyzed'] else "○" + lines.append( + f" {ana} [{r['received_at'][:16]}][{r['severity']:8s}] {r['log_line'][:120]}" + ) + return "\n".join(lines) + + # ── reset ────────────────────────────────────────────────────── + if action == 'reset': + hostname = rest.strip() + if not hostname: + return "Format: reset " + with _db(context) as conn: + m = conn.execute( + "SELECT id FROM machines WHERE hostname=?", (hostname,) + ).fetchone() + if not m: + return f"Machine '{hostname}' introuvable." + # Réinitialise les sessions et marque les logs comme non-analysés + conn.execute( + "DELETE FROM analysis_sessions WHERE machine_id=?", (m['id'],) + ) + conn.execute( + "UPDATE filtered_logs SET analyzed=0 WHERE machine_id=?", (m['id'],) + ) + conn.execute( + "UPDATE machines SET last_analyzed_at=NULL WHERE id=?", (m['id'],) + ) + return f"✅ {hostname} réinitialisée — tous les logs seront ré-analysés." + + return ( + "Action inconnue. Disponible : status, schedule, overage, retention, " + "analyze, analyze_all, logs, reset" + ) diff --git a/skills/machine.py b/skills/machine.py new file mode 100644 index 0000000..7ebd151 --- /dev/null +++ b/skills/machine.py @@ -0,0 +1,194 @@ +""" +Skill MACHINE — gestion des machines qui envoient leurs logs. + +Usage LLM : + SKILL:machine ARGS:list + SKILL:machine ARGS:queue + SKILL:machine ARGS:add + SKILL:machine ARGS:remove + SKILL:machine ARGS:status + SKILL:machine ARGS:reorder + SKILL:machine ARGS:activate + SKILL:machine ARGS:deactivate +""" +from datetime import datetime + +DESCRIPTION = "Gestion des machines enregistrées : liste, file d'attente, ajout, suppression, statut" +USAGE = ( + "SKILL:machine ARGS:list\n" + "SKILL:machine ARGS:queue\n" + "SKILL:machine ARGS:add \n" + "SKILL:machine ARGS:remove \n" + "SKILL:machine ARGS:status \n" + "SKILL:machine ARGS:reorder \n" + "SKILL:machine ARGS:activate \n" + "SKILL:machine ARGS:deactivate " +) + + +def _db(context): + return context.agent._get_db() + + +def run(args: str, context) -> str: + parts = args.strip().split(None, 1) + action = parts[0].lower() if parts else 'list' + rest = parts[1].strip() if len(parts) > 1 else '' + + # ── list ────────────────────────────────────────────────────────────────── + if action == 'list': + with _db(context) as conn: + rows = conn.execute( + "SELECT hostname, active, last_log_at, last_analyzed_at, queue_position " + "FROM machines ORDER BY queue_position ASC" + ).fetchall() + if not rows: + return "Aucune machine enregistrée." + lines = ["── Machines enregistrées ─────────────────────"] + for r in rows: + status = "🟢 actif" if r['active'] else "🔴 inactif" + last_log = r['last_log_at'][:16] if r['last_log_at'] else "jamais" + last_ana = r['last_analyzed_at'][:16] if r['last_analyzed_at'] else "jamais" + lines.append( + f" [{r['queue_position']:2d}] {r['hostname']:<30s} {status}\n" + f" Dernier log: {last_log} | Dernière analyse: {last_ana}" + ) + return "\n".join(lines) + + # ── queue ───────────────────────────────────────────────────────────────── + if action == 'queue': + today = datetime.now().strftime('%Y-%m-%d') + with _db(context) as conn: + rows = conn.execute( + "SELECT m.hostname, m.queue_position, m.active, " + " COALESCE(s.status, 'pending') as session_status " + "FROM machines m " + "LEFT JOIN analysis_sessions s " + " ON s.machine_id=m.id AND s.slot_date=? " + "ORDER BY m.queue_position ASC", + (today,) + ).fetchall() + if not rows: + return "Aucune machine dans la file." + icons = {'done': '✅', 'in_progress': '🔄', 'paused': '⏸️', 'pending': '⏳'} + lines = [f"── File d'analyse — {today} ─────────────────"] + for r in rows: + active = "" if r['active'] else " [inactif]" + icon = icons.get(r['session_status'], '⏳') + lines.append( + f" {r['queue_position']:2d}. {icon} {r['hostname']}{active} " + f"({r['session_status']})" + ) + return "\n".join(lines) + + # ── add ─────────────────────────────────────────────────────────────────── + if action == 'add': + hostname = rest.strip() + if not hostname: + return "Format: machine add " + with _db(context) as conn: + existing = conn.execute( + "SELECT id FROM machines WHERE hostname=?", (hostname,) + ).fetchone() + if existing: + return f"Machine '{hostname}' déjà enregistrée." + max_pos = conn.execute( + "SELECT COALESCE(MAX(queue_position), 0) FROM machines" + ).fetchone()[0] + conn.execute( + "INSERT INTO machines (hostname, registered_at, queue_position) VALUES (?,?,?)", + (hostname, datetime.now().isoformat(), max_pos + 1) + ) + return f"✅ Machine '{hostname}' enregistrée (position {max_pos + 1})." + + # ── remove ──────────────────────────────────────────────────────────────── + if action == 'remove': + hostname = rest.strip() + if not hostname: + return "Format: machine remove " + with _db(context) as conn: + cur = conn.execute("DELETE FROM machines WHERE hostname=?", (hostname,)) + if cur.rowcount == 0: + return f"Machine '{hostname}' introuvable." + return f"🗑️ Machine '{hostname}' supprimée." + + # ── status ──────────────────────────────────────────────────────────────── + if action == 'status': + hostname = rest.strip() + if not hostname: + return "Format: machine status " + with _db(context) as conn: + m = conn.execute( + "SELECT * FROM machines WHERE hostname=?", (hostname,) + ).fetchone() + if not m: + return f"Machine '{hostname}' introuvable." + # Logs filtrés en attente + pending_logs = conn.execute( + "SELECT COUNT(*) as cnt FROM filtered_logs WHERE machine_id=? AND analyzed=0", + (m['id'],) + ).fetchone()['cnt'] + # Sessions récentes + sessions = conn.execute( + "SELECT slot_date, status, started_at, completed_at, last_log_id " + "FROM analysis_sessions WHERE machine_id=? ORDER BY slot_date DESC LIMIT 5", + (m['id'],) + ).fetchall() + + active = "actif" if m['active'] else "inactif" + lines = [ + f"── Statut de {hostname} ──────────────────────", + f" Statut : {active}", + f" Position : {m['queue_position']}", + f" Enregistrée : {m['registered_at'][:16]}", + f" Dernier log : {m['last_log_at'][:16] if m['last_log_at'] else 'jamais'}", + f" Dernière ana: {m['last_analyzed_at'][:16] if m['last_analyzed_at'] else 'jamais'}", + f" Logs en att.: {pending_logs}", + ] + if sessions: + lines.append(" Sessions récentes:") + for s in sessions: + lines.append( + f" {s['slot_date']} : {s['status']} " + f"(offset log #{s['last_log_id']})" + ) + return "\n".join(lines) + + # ── reorder ─────────────────────────────────────────────────────────────── + if action == 'reorder': + p = rest.split(None, 1) + if len(p) < 2: + return "Format: machine reorder " + hostname = p[0].strip() + try: + new_pos = int(p[1].strip()) + except ValueError: + return "La position doit être un entier." + with _db(context) as conn: + cur = conn.execute( + "UPDATE machines SET queue_position=? WHERE hostname=?", + (new_pos, hostname) + ) + if cur.rowcount == 0: + return f"Machine '{hostname}' introuvable." + return f"✅ {hostname} déplacée en position {new_pos}." + + # ── activate / deactivate ───────────────────────────────────────────────── + if action in ('activate', 'deactivate'): + hostname = rest.strip() + if not hostname: + return f"Format: machine {action} " + val = 1 if action == 'activate' else 0 + with _db(context) as conn: + cur = conn.execute( + "UPDATE machines SET active=? WHERE hostname=?", (val, hostname) + ) + if cur.rowcount == 0: + return f"Machine '{hostname}' introuvable." + verb = "activée" if val else "désactivée" + return f"✅ Machine '{hostname}' {verb}." + + return ( + "Action inconnue. Disponible : list, queue, add, remove, status, " + "reorder, activate, deactivate" + ) diff --git a/skills/mqtt_send.py b/skills/mqtt_send.py new file mode 100644 index 0000000..1ed7dd9 --- /dev/null +++ b/skills/mqtt_send.py @@ -0,0 +1,23 @@ +""" +Skill MQTT_SEND — publier un message sur n'importe quel topic MQTT. +Permet à l'agent de communiquer proactivement avec d'autres agents. + +Usage LLM : SKILL:mqtt_send ARGS: | +""" +DESCRIPTION = "Publier un message sur un topic MQTT (communication inter-agents)" +USAGE = "SKILL:mqtt_send ARGS: | " + + +def run(args: str, context) -> str: + if "|" not in args: + return "Format : SKILL:mqtt_send ARGS: | " + + topic, message = args.split("|", 1) + topic = topic.strip() + message = message.strip() + + if not topic: + return "Topic vide." + + context.mqtt.publish_raw(topic, message) + return f"Message publié sur '{topic}'." diff --git a/skills/mqtt_subscribe.py b/skills/mqtt_subscribe.py new file mode 100644 index 0000000..0b0c148 --- /dev/null +++ b/skills/mqtt_subscribe.py @@ -0,0 +1,59 @@ +""" +Skill MQTT_SUBSCRIBE — s'abonner dynamiquement à un topic MQTT. + +Les messages reçus sont transmis via XMPP (admin) et loggés. + +Usage LLM : + SKILL:mqtt_subscribe ARGS:subscribe | + SKILL:mqtt_subscribe ARGS:unsubscribe | + SKILL:mqtt_subscribe ARGS:list +""" +import logging + +DESCRIPTION = "S'abonner / se désabonner dynamiquement d'un topic MQTT et recevoir les messages" +USAGE = "SKILL:mqtt_subscribe ARGS:subscribe| ou unsubscribe| ou list" + +logger = logging.getLogger(__name__) + +# Stockage des souscriptions dynamiques : {topic: callback} +_dynamic_subs: dict = {} + + +def run(args: str, context) -> str: + parts = [p.strip() for p in args.split("|", 1)] + action = parts[0].lower() + + if action == "list": + if not _dynamic_subs: + return "Aucun topic MQTT surveillé." + return "Topics surveillés :\n" + "\n".join(f" • {t}" for t in _dynamic_subs) + + if len(parts) < 2 or not parts[1]: + return "Format : subscribe| ou unsubscribe| ou list" + + topic = parts[1] + + if action == "unsubscribe": + if topic in _dynamic_subs: + del _dynamic_subs[topic] + return f"Désabonné du topic '{topic}'." + return f"Pas abonné à '{topic}'." + + if action == "subscribe": + if topic in _dynamic_subs: + return f"Déjà abonné à '{topic}'." + + agent_id = context.agent_id + + def _on_message(msg, t): + payload = msg.payload if hasattr(msg, "payload") else str(msg) + text = f"[MQTT:{t}] {payload}" + logger.info(f"[mqtt_subscribe] {text}") + if context.xmpp: + context.xmpp.send_to_all_admins(text) + + _dynamic_subs[topic] = _on_message + context.mqtt.subscribe(topic, _on_message) + return f"Abonné au topic '{topic}'. Les messages seront transmis via XMPP." + + return f"Action inconnue '{action}'. Utilise : subscribe, unsubscribe, list." diff --git a/skills/muc_send.py b/skills/muc_send.py new file mode 100644 index 0000000..799edbe --- /dev/null +++ b/skills/muc_send.py @@ -0,0 +1,24 @@ +""" +Skill MUC_SEND — envoyer un message dans le groupe XMPP des agents. + +Le groupe est agents@muc.xmpp.ovh (configuré dans config.json). + +Usage LLM : SKILL:muc_send ARGS: +""" +DESCRIPTION = "Envoyer un message dans le groupe XMPP des agents (MUC)" +USAGE = "SKILL:muc_send ARGS:" + + +def run(args: str, context) -> str: + message = args.strip() + if not message: + return "Message vide." + + if not context.xmpp: + return "XMPP non configuré sur cet agent." + + if not context.xmpp.muc_room: + return "Aucun groupe MUC configuré." + + context.xmpp.send_to_group(message) + return f"Message envoyé dans le groupe {context.xmpp.muc_room}." diff --git a/skills/script.py b/skills/script.py new file mode 100644 index 0000000..ce58ee8 --- /dev/null +++ b/skills/script.py @@ -0,0 +1,251 @@ +""" +Skill SCRIPT — bibliothèque de scripts bash par agent. + +Chaque agent dispose de son propre dossier scripts/ (configurable via +"scripts_dir" dans config.json, sinon /opt//scripts). + +L'environnement du script expose automatiquement : + MQTT_BROKER, MQTT_PORT, MQTT_REPLY_TOPIC, AGENT_ID, SCRIPTS_DIR + +Ainsi un script peut publier son résultat directement : + mosquitto_pub -h $MQTT_BROKER -t $MQTT_REPLY_TOPIC -m "mon résultat" + +Usage LLM : + SKILL:script ARGS:list + SKILL:script ARGS:show + SKILL:script ARGS:save | + SKILL:script ARGS:edit | + SKILL:script ARGS:exec [args...] + SKILL:script ARGS:run | + SKILL:script ARGS:delete +""" +import json +import os +import stat +import subprocess +import tempfile +from datetime import datetime + +DESCRIPTION = "Bibliothèque de scripts bash : sauvegarder, lister, afficher, éditer, exécuter" +USAGE = ( + "SKILL:script ARGS:list\n" + "SKILL:script ARGS:show \n" + "SKILL:script ARGS:save | \n" + "SKILL:script ARGS:edit | \n" + "SKILL:script ARGS:exec [args]\n" + "SKILL:script ARGS:run | \n" + "SKILL:script ARGS:delete " +) + + +def _scripts_dir(context) -> str: + """Détermine le répertoire scripts de cet agent.""" + if context.config.get("scripts_dir"): + return context.config["scripts_dir"] + queue_db = context.config.get("queue_db", "") + if queue_db: + install = os.path.dirname(os.path.dirname(queue_db)) + return os.path.join(install, "scripts") + return f"/opt/{context.agent_id}/scripts" + + +def _ensure_dir(context) -> str: + d = _scripts_dir(context) + os.makedirs(d, exist_ok=True) + return d + + +_FORBIDDEN_EXTENSIONS = {".service", ".timer", ".socket", ".target", ".mount", ".conf", ".py", ".js"} + + +def _safe_name(name: str) -> str: + """Empêche les traversées de répertoire et normalise le nom.""" + n = os.path.basename(name.strip().replace("/", "_")) + # Retire toute extension connue pour obtenir le nom brut + root, ext = os.path.splitext(n) + while ext: + n = root + root, ext = os.path.splitext(n) + return n + + +def _build_env(context, scripts_dir: str) -> dict: + env = os.environ.copy() + mc = context.config.get("mqtt", {}) + env["MQTT_BROKER"] = mc.get("host", "localhost") + env["MQTT_PORT"] = str(mc.get("port", 1883)) + env["MQTT_REPLY_TOPIC"] = "agents/nexus/inbox" + env["AGENT_ID"] = context.agent_id + env["SCRIPTS_DIR"] = scripts_dir + return env + + +def _notify(context, script_name: str, result: str): + """Publie un événement d'exécution sur MQTT pour que Nexus notifie l'utilisateur.""" + try: + context.mqtt.publish_raw("agents/scripts/execution", json.dumps({ + "agent_id": context.agent_id, + "script": script_name, + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "result": result[:1000], + })) + except Exception: + pass + + +def _run_script(cmd: str, env: dict, timeout: int = 120) -> str: + try: + result = subprocess.run( + cmd, shell=True, text=True, + capture_output=True, timeout=timeout, + env=env, executable="/bin/bash", + ) + out = (result.stdout + result.stderr).strip() + if len(out) > 4000: + out = out[:4000] + "\n... [tronqué]" + return out or f"(code retour : {result.returncode})" + except subprocess.TimeoutExpired: + return f"Timeout ({timeout}s dépassé)" + except Exception as e: + return str(e) + + +def run(args: str, context) -> str: + parts = args.strip().split(None, 1) + action = parts[0].lower() if parts else "list" + rest = parts[1] if len(parts) > 1 else "" + + # ── list ────────────────────────────────────────────────────────────── + if action == "list": + d = _ensure_dir(context) + files = sorted(f for f in os.listdir(d) if f.endswith(".sh")) + if not files: + return f"Aucun script dans {d}" + lines = [f"Scripts disponibles ({d}) :"] + for f in files: + path = os.path.join(d, f) + size = os.path.getsize(path) + lines.append(f" {f[:-3]:30s} ({size} octets)") + return "\n".join(lines) + + # ── show ────────────────────────────────────────────────────────────── + if action == "show": + name = _safe_name(rest) + if not name: + return "Précise le nom du script." + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + if not os.path.exists(path): + return f"Script '{name}' introuvable dans {d}" + with open(path) as f: + content = f.read() + return f"── {name}.sh ──\n{content}" + + # ── save ────────────────────────────────────────────────────────────── + if action == "save": + if "|" not in rest: + return "Format : save | " + name_raw, content = rest.split("|", 1) + name = _safe_name(name_raw) + content = content.strip().replace("\\n", "\n").replace('\\"', '"').replace("\\'", "'") + + if not name: + return "Nom de script invalide." + + # Vérifie extension interdite sur le nom brut + _, raw_ext = os.path.splitext(name_raw.strip()) + if raw_ext.lower() in _FORBIDDEN_EXTENSIONS: + return f"Extension '{raw_ext}' interdite. Utilise un nom sans extension (ex: mon_script)." + + # Vérifie que le contenu est substantiel (pas juste un shebang ou vide) + lines = [l.strip() for l in content.splitlines() if l.strip() and not l.strip().startswith("#")] + if len(lines) < 1: + return "Contenu du script vide ou incomplet. Fournis au moins une commande." + + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + existed = os.path.exists(path) + with open(path, "w") as f: + if not content.startswith("#!"): + f.write("#!/bin/bash\n") + f.write(content + "\n") + os.chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH) + verb = "mis à jour" if existed else "créé" + return f"Script '{name}' {verb} : {path}" + + # ── edit ────────────────────────────────────────────────────────────── + if action == "edit": + # Format : edit | + if "|" not in rest: + return "Format : edit | \nEx: edit mon_script 3 | echo 'nouveau'" + head, new_line_content = rest.split("|", 1) + head_parts = head.strip().split(None, 1) + if len(head_parts) < 2: + return "Format : edit | " + name = _safe_name(head_parts[0]) + try: + line_no = int(head_parts[1].strip()) + except ValueError: + return "Le numéro de ligne doit être un entier." + if line_no < 1: + return "Le numéro de ligne doit être >= 1." + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + if not os.path.exists(path): + return f"Script '{name}' introuvable dans {d}" + with open(path) as f: + lines = f.readlines() + if line_no > len(lines): + return f"Le script '{name}' n'a que {len(lines)} lignes." + lines[line_no - 1] = new_line_content.strip() + "\n" + with open(path, "w") as f: + f.writelines(lines) + return f"Ligne {line_no} du script '{name}' modifiée.\nNouveau contenu :\n{''.join(lines)}" + + # ── exec ────────────────────────────────────────────────────────────── + if action == "exec": + parts2 = rest.split(None, 1) + name = _safe_name(parts2[0]) if parts2 else "" + sargs = parts2[1] if len(parts2) > 1 else "" + if not name: + return "Précise le nom du script." + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + if not os.path.exists(path): + return f"Script '{name}' introuvable. Utilise 'list' pour voir les scripts disponibles." + env = _build_env(context, d) + out = _run_script(f'"{path}" {sargs}', env=env, timeout=120) + _notify(context, name, out) + return out + + # ── run (inline) ────────────────────────────────────────────────────── + if action == "run": + if not rest: + return "Précise le contenu du script." + d = _ensure_dir(context) + content = rest.replace("\\n", "\n") + with tempfile.NamedTemporaryFile( + mode="w", suffix=".sh", delete=False, dir="/tmp" + ) as f: + f.write("#!/bin/bash\nset -e\n" + content) + tmpfile = f.name + os.chmod(tmpfile, stat.S_IRWXU) + env = _build_env(context, d) + out = _run_script(tmpfile, env=env, timeout=60) + os.unlink(tmpfile) + _notify(context, "", out) + return out + + # ── delete ──────────────────────────────────────────────────────────── + if action == "delete": + name = _safe_name(rest) + if not name: + return "Précise le nom du script." + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + if not os.path.exists(path): + return f"Script '{name}' introuvable dans {d}" + os.unlink(path) + return f"Script '{name}' supprimé." + + return "Action inconnue. Disponible : list, show, save, exec, run, delete"