144f481320
- skills/reporting.py : REPORT: / REPORT_ERRORS: avec historique SQLite - skills/delegate.py : log des exécutions + détection erreurs + notification MQTT - skills/schedule_tasks.py : log des tâches planifiées - agent1.py : abonnement agents/errors + agents/scheduler/notifications → alerte XMPP - cli.py : commandes /report et /errors - system_prompt.txt : REPORT: et REPORT_ERRORS: ajoutés Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
157 lines
5.5 KiB
Python
157 lines
5.5 KiB
Python
"""
|
|
Skill : SCHEDULE / PLAN_LIST / PLAN_CANCEL
|
|
Planification de tâches récurrentes entre agents.
|
|
|
|
Formats :
|
|
SCHEDULE: daily HH:MM | <agent> | <tâche>
|
|
SCHEDULE: every Xh | <agent> | <tâche>
|
|
SCHEDULE: every Xmin | <agent> | <tâche>
|
|
SCHEDULE: weekly <lun|mar|mer|jeu|ven|sam|dim> HH:MM | <agent> | <tâche>
|
|
|
|
PLAN_LIST:
|
|
PLAN_CANCEL: <job_id>
|
|
"""
|
|
import re
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
SKILL_NAME = "schedule_tasks"
|
|
TRIGGER = None
|
|
TRIGGERS = {
|
|
"SCHEDULE:": "schedule",
|
|
"PLAN_LIST:": "plan_list",
|
|
"PLAN_CANCEL:": "plan_cancel",
|
|
}
|
|
|
|
DB_PATH = Path("/opt/agent/scheduler.db")
|
|
_scheduler = None
|
|
|
|
DAYS_FR = {
|
|
"lun": "mon", "mar": "tue", "mer": "wed",
|
|
"jeu": "thu", "ven": "fri", "sam": "sat", "dim": "sun"
|
|
}
|
|
|
|
def _get_scheduler():
|
|
global _scheduler
|
|
if _scheduler is None:
|
|
jobstores = {"default": SQLAlchemyJobStore(url="sqlite:///{}".format(DB_PATH))}
|
|
_scheduler = BackgroundScheduler(jobstores=jobstores)
|
|
_scheduler.start()
|
|
return _scheduler
|
|
|
|
def _run_delegated_task(agent: str, task: str):
|
|
"""Exécutée par le scheduler : délègue la tâche à l'agent."""
|
|
import time as _time
|
|
from skills.delegate import execute as delegate_exec
|
|
from skills.reporting import log_execution
|
|
import paho.mqtt.publish as publish
|
|
import json as _json
|
|
|
|
start = _time.time()
|
|
result = delegate_exec("{} | {}".format(agent, task))
|
|
duration = _time.time() - start
|
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
|
|
|
|
status = "error" if "erreur" in result.lower() or "timeout" in result.lower() else "success"
|
|
log_execution("schedule", agent, task, status, result, duration)
|
|
print("[SCHEDULE] Tâche exécutée [{} → {}] statut={} : {}".format(ts, agent, status, task[:60]))
|
|
|
|
# Notifier via MQTT
|
|
try:
|
|
cfg = _json.loads(Path("/opt/agent/config/config.json").read_text())
|
|
host = cfg.get("mqtt_host", "localhost")
|
|
port = int(cfg.get("mqtt_port", 1883))
|
|
payload = _json.dumps({"agent": agent, "task": task, "status": status,
|
|
"result": result[:500], "timestamp": ts})
|
|
publish.single("agents/scheduler/notifications", payload=payload,
|
|
hostname=host, port=port)
|
|
except Exception:
|
|
pass
|
|
|
|
def _parse_trigger(expr: str):
|
|
"""Parse l'expression de planification et retourne un trigger APScheduler."""
|
|
expr = expr.strip().lower()
|
|
|
|
# every Xh
|
|
m = re.match(r"every (\d+)h$", expr)
|
|
if m:
|
|
return IntervalTrigger(hours=int(m.group(1))), "toutes les {}h".format(m.group(1))
|
|
|
|
# every Xmin
|
|
m = re.match(r"every (\d+)min$", expr)
|
|
if m:
|
|
return IntervalTrigger(minutes=int(m.group(1))), "toutes les {}min".format(m.group(1))
|
|
|
|
# daily HH:MM
|
|
m = re.match(r"daily (\d{1,2}):(\d{2})$", expr)
|
|
if m:
|
|
h, mn = m.group(1), m.group(2)
|
|
return CronTrigger(hour=h, minute=mn), "tous les jours à {}:{}".format(h, mn)
|
|
|
|
# weekly <jour> HH:MM
|
|
m = re.match(r"weekly (\w+) (\d{1,2}):(\d{2})$", expr)
|
|
if m:
|
|
day_fr = m.group(1)
|
|
day_en = DAYS_FR.get(day_fr, day_fr)
|
|
h, mn = m.group(2), m.group(3)
|
|
return CronTrigger(day_of_week=day_en, hour=h, minute=mn), \
|
|
"chaque {} à {}:{}".format(day_fr, h, mn)
|
|
|
|
return None, None
|
|
|
|
def schedule(args: str) -> str:
|
|
parts = [p.strip() for p in args.split("|")]
|
|
if len(parts) < 3:
|
|
return ("Erreur : format attendu :\n"
|
|
"SCHEDULE: daily HH:MM | <agent> | <tâche>\n"
|
|
"SCHEDULE: every Xh | <agent> | <tâche>\n"
|
|
"SCHEDULE: weekly lun HH:MM | <agent> | <tâche>")
|
|
|
|
expr, agent, task = parts[0], parts[1], "|".join(parts[2:])
|
|
trigger, label = _parse_trigger(expr)
|
|
if trigger is None:
|
|
return "Expression invalide : «{}»\nFormats : daily HH:MM | every Xh | every Xmin | weekly <jour> HH:MM".format(expr)
|
|
|
|
sched = _get_scheduler()
|
|
job = sched.add_job(
|
|
_run_delegated_task,
|
|
trigger=trigger,
|
|
args=[agent, task],
|
|
name="{} → {}".format(agent, task[:40])
|
|
)
|
|
|
|
return "Tâche planifiée [ID: {}]\nAgent : {}\nTâche : {}\nFréquence : {}\nProchain : {}".format(
|
|
job.id, agent, task, label,
|
|
job.next_run_time.strftime("%Y-%m-%d %H:%M") if job.next_run_time else "N/A"
|
|
)
|
|
|
|
def plan_list(args: str) -> str:
|
|
sched = _get_scheduler()
|
|
jobs = sched.get_jobs()
|
|
if not jobs:
|
|
return "Aucune tâche planifiée."
|
|
lines = ["Tâches planifiées ({}) :".format(len(jobs))]
|
|
for j in jobs:
|
|
next_run = j.next_run_time.strftime("%Y-%m-%d %H:%M") if j.next_run_time else "N/A"
|
|
lines.append("- [{}] {} | Prochain : {}".format(j.id[:8], j.name, next_run))
|
|
return "\n".join(lines)
|
|
|
|
def plan_cancel(args: str) -> str:
|
|
job_id = args.strip()
|
|
if not job_id:
|
|
return "Erreur : ID manquant. Utilisez PLAN_LIST: pour voir les IDs."
|
|
sched = _get_scheduler()
|
|
# Recherche par ID complet ou préfixe
|
|
for job in sched.get_jobs():
|
|
if job.id == job_id or job.id.startswith(job_id):
|
|
name = job.name
|
|
job.remove()
|
|
return "Tâche [{}] annulée : {}".format(job_id[:8], name)
|
|
return "Aucune tâche trouvée avec l'ID : {}".format(job_id)
|