feat: collecte automatique des logs locaux au début de chaque créneau
- _collect_local_logs() appelle journalctl en local au démarrage du slot - collect_local_logs(since=) accessible comme méthode publique - Skill logwatch collect [since] pour collecte manuelle à la demande - Config: local_log_since, local_log_units, local_hostname Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Binary file not shown.
@@ -324,6 +324,9 @@ class LogWatchAgent(BaseAgent):
|
||||
if self._slot_end_time <= now:
|
||||
self._slot_end_time += timedelta(days=1)
|
||||
|
||||
# Collecter les logs locaux avant de commencer l'analyse
|
||||
self._collect_local_logs()
|
||||
|
||||
self._analysis_stop.clear()
|
||||
self._analysis_thread = threading.Thread(
|
||||
target=self._analysis_loop, daemon=True, name="logwatch-analysis"
|
||||
@@ -336,6 +339,72 @@ class LogWatchAgent(BaseAgent):
|
||||
logger.info("Fin de créneau signalée.")
|
||||
self._analysis_stop.set()
|
||||
|
||||
# ─── Collecte locale ─────────────────────────────────────────────────────
|
||||
|
||||
def collect_local_logs(self, since: str = 'yesterday') -> str:
|
||||
"""
|
||||
Collecte les logs de la machine locale via journalctl et les pré-filtre.
|
||||
Appelé automatiquement au début de chaque créneau, ou manuellement.
|
||||
Retourne un résumé de ce qui a été collecté.
|
||||
"""
|
||||
import subprocess
|
||||
import socket
|
||||
|
||||
local_hostname = self.config.get('local_hostname') or socket.getfqdn()
|
||||
units = self.config.get('local_log_units', []) # [] = tous les services
|
||||
since_str = since or self.config.get('local_log_since', 'yesterday')
|
||||
|
||||
cmd = ['journalctl', '--no-pager', '--output=short-iso', f'--since={since_str}']
|
||||
for unit in units:
|
||||
cmd += ['-u', unit]
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=60
|
||||
)
|
||||
raw_lines = result.stdout.splitlines()
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.error("[local_logs] journalctl timeout")
|
||||
return "Erreur: journalctl timeout (60s)"
|
||||
except FileNotFoundError:
|
||||
logger.warning("[local_logs] journalctl non disponible sur cette machine")
|
||||
return "journalctl non disponible."
|
||||
except Exception as e:
|
||||
logger.error(f"[local_logs] {e}")
|
||||
return f"Erreur collecte locale: {e}"
|
||||
|
||||
if not raw_lines:
|
||||
return f"Aucun log local depuis '{since_str}'."
|
||||
|
||||
machine_id = self._register_machine(local_hostname)
|
||||
filtered = self._prefilter(raw_lines)
|
||||
|
||||
if filtered:
|
||||
now = datetime.now().isoformat()
|
||||
with self._get_db() as conn:
|
||||
conn.executemany(
|
||||
"INSERT INTO filtered_logs (machine_id, log_line, severity, received_at) VALUES (?,?,?,?)",
|
||||
[(machine_id, line, sev, now) for line, sev in filtered]
|
||||
)
|
||||
conn.execute(
|
||||
"UPDATE machines SET last_log_at=? WHERE id=?",
|
||||
(now, machine_id)
|
||||
)
|
||||
|
||||
msg = (
|
||||
f"[local] {local_hostname}: {len(filtered)}/{len(raw_lines)} lignes filtrées"
|
||||
+ (f" ({', '.join(units)})" if units else " (tous services)")
|
||||
)
|
||||
logger.info(msg)
|
||||
return msg
|
||||
|
||||
def _collect_local_logs(self):
|
||||
"""Wrapper silencieux appelé au début du slot."""
|
||||
try:
|
||||
self.collect_local_logs()
|
||||
except Exception as e:
|
||||
logger.error(f"[_collect_local_logs] {e}")
|
||||
|
||||
# ─── Boucle d'analyse ────────────────────────────────────────────────────
|
||||
|
||||
def _analysis_loop(self):
|
||||
|
||||
@@ -24,6 +24,9 @@
|
||||
"db_path": "/opt/agent_logwatch/data/logwatch.db",
|
||||
"system_prompt": "/opt/agent_logwatch/config/system_prompt.txt",
|
||||
"use_llm_coordinator": true,
|
||||
"local_log_since": "yesterday",
|
||||
"local_log_units": [],
|
||||
"local_hostname": "",
|
||||
"llm_profiles": {
|
||||
"cloud": "gpt-oss:120b-cloud"
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ Tu reçois des instructions via MQTT (depuis Nexus) ou XMPP (directement).
|
||||
- `retention <jours>` : durée de conservation des logs filtrés
|
||||
- `analyze <hostname>` : lancer l'analyse d'une machine spécifique maintenant
|
||||
- `analyze_all` : lancer l'analyse complète de toutes les machines
|
||||
- `collect [since]` : collecter maintenant les logs locaux (ex: collect "1 hour ago")
|
||||
- `logs <hostname> [N]` : voir les N derniers logs filtrés d'une machine
|
||||
- `reset <hostname>` : réinitialiser l'analyse d'une machine
|
||||
|
||||
|
||||
Binary file not shown.
+8
-1
@@ -17,7 +17,7 @@ Usage LLM :
|
||||
import threading
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
DESCRIPTION = "Contrôle LogWatch : schedule, analyse à la demande, statut, logs en attente"
|
||||
DESCRIPTION = "Contrôle LogWatch : schedule, analyse à la demande, statut, logs en attente, collecte locale"
|
||||
USAGE = (
|
||||
"SKILL:logwatch ARGS:status\n"
|
||||
"SKILL:logwatch ARGS:schedule show\n"
|
||||
@@ -26,6 +26,7 @@ USAGE = (
|
||||
"SKILL:logwatch ARGS:overage <minutes>\n"
|
||||
"SKILL:logwatch ARGS:analyze <hostname>\n"
|
||||
"SKILL:logwatch ARGS:analyze_all\n"
|
||||
"SKILL:logwatch ARGS:collect [since]\n"
|
||||
"SKILL:logwatch ARGS:retention <jours>\n"
|
||||
"SKILL:logwatch ARGS:logs <hostname> [N]\n"
|
||||
"SKILL:logwatch ARGS:reset <hostname>"
|
||||
@@ -239,6 +240,12 @@ def run(args: str, context) -> str:
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
# ── collect [since] ───────────────────────────────────────────────────────
|
||||
if action == 'collect':
|
||||
since = rest.strip() or 'yesterday'
|
||||
result = context.agent.collect_local_logs(since=since)
|
||||
return f"✅ Collecte locale terminée:\n{result}"
|
||||
|
||||
# ── reset <hostname> ──────────────────────────────────────────────────────
|
||||
if action == 'reset':
|
||||
hostname = rest.strip()
|
||||
|
||||
Reference in New Issue
Block a user