Files
nexus/scheduler.py
T
sylvain dc7f395aa8 feat: script scheduling, execution notifications, LLM coordinator
- Add /script command (run/schedule/unschedule/schedules/list)
- Add scheduler.add_script_job() with 'once HH:MM' one-shot support
- Subscribe to agents/scripts/execution and notify admins via XMPP
- Integrate LLMCoordinator (local acquire/release for Nexus)
- Update /help with script commands

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 19:48:59 +00:00

193 lines
7.1 KiB
Python

"""
Scheduler de Nexus — gère les tâches planifiées et les rapports automatiques.
Basé sur APScheduler.
"""
import logging
import uuid
from datetime import datetime, timedelta
from typing import Callable, Optional
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
logger = logging.getLogger(__name__)
class NexusScheduler:
def __init__(
self,
send_task_callback: Callable[[str, str], None],
request_report_callback: Callable[[str], None],
send_script_callback: Optional[Callable[[str, str], None]] = None,
):
self._scheduler = BackgroundScheduler(timezone="Europe/Paris")
self._send_task = send_task_callback
self._request_report = request_report_callback
self._send_script = send_script_callback
self._jobs: dict[str, dict] = {} # job_id → metadata
def start(self, config: dict):
"""Démarre le scheduler avec la config initiale."""
self._scheduler.start()
# Tâches planifiées initiales depuis config
for job in config.get("scheduled_tasks", []):
try:
self.add_job(
frequency=job["frequency"],
agent_id=job["agent"],
task=job["task"],
job_id=job.get("id"),
)
except Exception as e:
logger.error(f"[Scheduler] Erreur chargement tâche {job}: {e}")
# Rapports automatiques
for report in config.get("daily_reports", []):
try:
self._add_report_job(
agent_id=report["agent"],
time_str=report["time"], # "08:00"
)
except Exception as e:
logger.error(f"[Scheduler] Erreur rapport {report}: {e}")
logger.info(f"[Scheduler] {len(self._jobs)} job(s) chargé(s)")
def add_job(self, frequency: str, agent_id: str, task: str,
job_id: Optional[str] = None) -> str:
"""
Ajoute une tâche planifiée.
Formats de fréquence supportés :
daily HH:MM → tous les jours à HH:MM
every Xh → toutes les X heures
every Xmin → toutes les X minutes
weekly <day> HH:MM → une fois par semaine (lun/mar/...)
"""
job_id = job_id or str(uuid.uuid4())[:8]
trigger = self._parse_frequency(frequency)
self._scheduler.add_job(
func=self._send_task,
trigger=trigger,
args=[agent_id, task],
id=job_id,
replace_existing=True,
)
self._jobs[job_id] = {
"id": job_id,
"frequency": frequency,
"agent": agent_id,
"task": task,
}
logger.info(f"[Scheduler] Job {job_id} : [{frequency}] @{agent_id}{task}")
return job_id
def _add_report_job(self, agent_id: str, time_str: str) -> str:
"""Planifie une demande de rapport quotidien."""
job_id = f"report_{agent_id}"
hour, minute = map(int, time_str.split(":"))
self._scheduler.add_job(
func=self._request_report,
trigger=CronTrigger(hour=hour, minute=minute),
args=[agent_id],
id=job_id,
replace_existing=True,
)
self._jobs[job_id] = {
"id": job_id,
"frequency": f"daily {time_str}",
"agent": agent_id,
"task": "[rapport quotidien]",
}
return job_id
def add_script_job(self, frequency: str, agent_id: str,
script_name: str, script_args: str = "",
job_id: Optional[str] = None) -> str:
"""
Planifie l'exécution d'un script sur un agent.
Utilise la même syntaxe de fréquence que add_job + 'once HH:MM'.
"""
if self._send_script is None:
raise RuntimeError("send_script_callback non configuré")
job_id = job_id or f"script_{agent_id}_{script_name}_{str(uuid.uuid4())[:4]}"
trigger = self._parse_frequency(frequency)
args_str = script_args.strip()
self._scheduler.add_job(
func=self._send_script,
trigger=trigger,
args=[agent_id, f"{script_name} {args_str}".strip()],
id=job_id,
replace_existing=True,
)
self._jobs[job_id] = {
"id": job_id,
"frequency": frequency,
"agent": agent_id,
"task": f"[script] {script_name} {args_str}".strip(),
"type": "script",
}
logger.info(f"[Scheduler] Script job {job_id} : [{frequency}] @{agent_id}{script_name}")
return job_id
def cancel_job(self, job_id: str) -> bool:
try:
self._scheduler.remove_job(job_id)
self._jobs.pop(job_id, None)
return True
except Exception:
return False
def list_jobs(self) -> str:
if not self._jobs:
return "Aucune tâche planifiée."
lines = ["── Tâches planifiées ────────────────"]
for j in self._jobs.values():
lines.append(f" [{j['id']}] {j['frequency']} → @{j['agent']} : {j['task']}")
return "\n".join(lines)
def _parse_frequency(self, frequency: str):
"""Parse une fréquence en trigger APScheduler."""
parts = frequency.strip().split()
# daily HH:MM
if parts[0] == "daily" and len(parts) >= 2:
hour, minute = map(int, parts[1].split(":"))
return CronTrigger(hour=hour, minute=minute)
# weekly lun HH:MM
if parts[0] == "weekly" and len(parts) >= 3:
day_map = {
"lun": "mon", "mar": "tue", "mer": "wed",
"jeu": "thu", "ven": "fri", "sam": "sat", "dim": "sun",
}
day = day_map.get(parts[1].lower(), parts[1])
hour, minute = map(int, parts[2].split(":"))
return CronTrigger(day_of_week=day, hour=hour, minute=minute)
# every Xh
if parts[0] == "every" and len(parts) >= 2:
val = parts[1]
if val.endswith("h"):
return IntervalTrigger(hours=int(val[:-1]))
if val.endswith("min"):
return IntervalTrigger(minutes=int(val[:-3]))
if val.endswith("m"):
return IntervalTrigger(minutes=int(val[:-1]))
# once HH:MM → exécution unique à la prochaine occurrence de HH:MM
if parts[0] == "once" and len(parts) >= 2:
hour, minute = map(int, parts[1].split(":"))
now = datetime.now()
run_at = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if run_at <= now:
run_at += timedelta(days=1)
return DateTrigger(run_date=run_at)
raise ValueError(f"Format de fréquence non reconnu : '{frequency}'")