""" Skill : SCHEDULE / PLAN_LIST / PLAN_CANCEL Planification de tâches récurrentes entre agents. Formats : SCHEDULE: daily HH:MM | | SCHEDULE: every Xh | | SCHEDULE: every Xmin | | SCHEDULE: weekly HH:MM | | PLAN_LIST: PLAN_CANCEL: """ import re import json from pathlib import Path from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger SKILL_NAME = "schedule_tasks" TRIGGER = None TRIGGERS = { "SCHEDULE:": "schedule", "PLAN_LIST:": "plan_list", "PLAN_CANCEL:": "plan_cancel", } DB_PATH = Path("/opt/agent/scheduler.db") _scheduler = None DAYS_FR = { "lun": "mon", "mar": "tue", "mer": "wed", "jeu": "thu", "ven": "fri", "sam": "sat", "dim": "sun" } def _get_scheduler(): global _scheduler if _scheduler is None: jobstores = {"default": SQLAlchemyJobStore(url="sqlite:///{}".format(DB_PATH))} _scheduler = BackgroundScheduler(jobstores=jobstores) _scheduler.start() return _scheduler def _run_delegated_task(agent: str, task: str): """Exécutée par le scheduler : délègue la tâche à l'agent.""" import time as _time from skills.delegate import execute as delegate_exec from skills.reporting import log_execution import paho.mqtt.publish as publish import json as _json start = _time.time() result = delegate_exec("{} | {}".format(agent, task)) duration = _time.time() - start ts = datetime.now().strftime("%Y-%m-%d %H:%M") status = "error" if "erreur" in result.lower() or "timeout" in result.lower() else "success" log_execution("schedule", agent, task, status, result, duration) print("[SCHEDULE] Tâche exécutée [{} → {}] statut={} : {}".format(ts, agent, status, task[:60])) # Notifier via MQTT try: cfg = _json.loads(Path("/opt/agent/config/config.json").read_text()) host = cfg.get("mqtt_host", "localhost") port = int(cfg.get("mqtt_port", 1883)) payload = _json.dumps({"agent": agent, "task": task, "status": status, "result": result[:500], "timestamp": ts}) publish.single("agents/scheduler/notifications", payload=payload, hostname=host, port=port) except Exception: pass def _parse_trigger(expr: str): """Parse l'expression de planification et retourne un trigger APScheduler.""" expr = expr.strip().lower() # every Xh m = re.match(r"every (\d+)h$", expr) if m: return IntervalTrigger(hours=int(m.group(1))), "toutes les {}h".format(m.group(1)) # every Xmin m = re.match(r"every (\d+)min$", expr) if m: return IntervalTrigger(minutes=int(m.group(1))), "toutes les {}min".format(m.group(1)) # daily HH:MM m = re.match(r"daily (\d{1,2}):(\d{2})$", expr) if m: h, mn = m.group(1), m.group(2) return CronTrigger(hour=h, minute=mn), "tous les jours à {}:{}".format(h, mn) # weekly HH:MM m = re.match(r"weekly (\w+) (\d{1,2}):(\d{2})$", expr) if m: day_fr = m.group(1) day_en = DAYS_FR.get(day_fr, day_fr) h, mn = m.group(2), m.group(3) return CronTrigger(day_of_week=day_en, hour=h, minute=mn), \ "chaque {} à {}:{}".format(day_fr, h, mn) return None, None def schedule(args: str) -> str: parts = [p.strip() for p in args.split("|")] if len(parts) < 3: return ("Erreur : format attendu :\n" "SCHEDULE: daily HH:MM | | \n" "SCHEDULE: every Xh | | \n" "SCHEDULE: weekly lun HH:MM | | ") expr, agent, task = parts[0], parts[1], "|".join(parts[2:]) trigger, label = _parse_trigger(expr) if trigger is None: return "Expression invalide : «{}»\nFormats : daily HH:MM | every Xh | every Xmin | weekly HH:MM".format(expr) sched = _get_scheduler() job = sched.add_job( _run_delegated_task, trigger=trigger, args=[agent, task], name="{} → {}".format(agent, task[:40]) ) return "Tâche planifiée [ID: {}]\nAgent : {}\nTâche : {}\nFréquence : {}\nProchain : {}".format( job.id, agent, task, label, job.next_run_time.strftime("%Y-%m-%d %H:%M") if job.next_run_time else "N/A" ) def plan_list(args: str) -> str: sched = _get_scheduler() jobs = sched.get_jobs() if not jobs: return "Aucune tâche planifiée." lines = ["Tâches planifiées ({}) :".format(len(jobs))] for j in jobs: next_run = j.next_run_time.strftime("%Y-%m-%d %H:%M") if j.next_run_time else "N/A" lines.append("- [{}] {} | Prochain : {}".format(j.id[:8], j.name, next_run)) return "\n".join(lines) def plan_cancel(args: str) -> str: job_id = args.strip() if not job_id: return "Erreur : ID manquant. Utilisez PLAN_LIST: pour voir les IDs." sched = _get_scheduler() # Recherche par ID complet ou préfixe for job in sched.get_jobs(): if job.id == job_id or job.id.startswith(job_id): name = job.name job.remove() return "Tâche [{}] annulée : {}".format(job_id[:8], name) return "Aucune tâche trouvée avec l'ID : {}".format(job_id)