From 305999d8bfad8610ee25818716334aaa5132bbc7 Mon Sep 17 00:00:00 2001 From: sylvain Date: Sat, 7 Mar 2026 13:13:42 +0000 Subject: [PATCH] =?UTF-8?q?Orchestration=20compl=C3=A8te=20:=20planning,?= =?UTF-8?q?=20scheduling,=20CLI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - agent1.py : listener MQTT (agents/agent1/inbox), MAX_STEPS 10 - skills/plan.py : exécution séquentielle PLAN: avec contexte entre étapes - skills/schedule_tasks.py : SCHEDULE: / PLAN_LIST: / PLAN_CANCEL: via APScheduler - cli.py : interface CLI rich (MQTT, multi-agents, /plans, /agent) - system_prompt.txt : mis à jour avec tous les nouveaux skills Co-Authored-By: Claude Sonnet 4.6 --- agent1.py | 93 ++++++++++++++------- cli.py | 169 +++++++++++++++++++++++++++++++++++++++ config/system_prompt.txt | 45 +++++++---- skills/plan.py | 45 +++++++++++ skills/schedule_tasks.py | 149 ++++++++++++++++++++++++++++++++++ 5 files changed, 458 insertions(+), 43 deletions(-) create mode 100644 cli.py create mode 100644 skills/plan.py create mode 100644 skills/schedule_tasks.py diff --git a/agent1.py b/agent1.py index 0764daa..f482599 100644 --- a/agent1.py +++ b/agent1.py @@ -3,14 +3,14 @@ import asyncio import sys +import threading import requests import json from pathlib import Path from slixmpp import ClientXMPP +import paho.mqtt.client as mqtt -# Ajouter /opt/agent au path pour importer les skills sys.path.insert(0, "/opt/agent") - from skills.loader import load_skills, run_skills # ── CONFIG ─────────────────────────────────────────────────────────────── @@ -32,12 +32,15 @@ MODEL = cfg["model"] XMPP_JID = cfg["xmpp_jid"] XMPP_PASS = cfg["xmpp_pass"] ADMIN_JID = cfg["admin_jid"] +MQTT_HOST = cfg.get("mqtt_host", "localhost") +MQTT_PORT = int(cfg.get("mqtt_port", 1883)) +MQTT_INBOX = "agents/agent1/inbox" SYSTEM_PROMPT = load_system_prompt() -# Charger les skills au démarrage load_skills() conversation_history = [] +xmpp_bot = None # référence globale pour répondre via XMPP depuis MQTT # ── LLM ────────────────────────────────────────────────────────────────── def call_ollama(messages: list) -> str: @@ -48,38 +51,71 @@ def call_ollama(messages: list) -> str: "options" : {"temperature": 0.3} } response = requests.post(OLLAMA_URL, json=payload, timeout=180) - data = response.json() - return data["message"]["content"] - -def ask_llm(user_message: str) -> str: - conversation_history.append({"role": "user", "content": user_message}) - messages = [{"role": "system", "content": SYSTEM_PROMPT}] + conversation_history + return response.json()["message"]["content"] +def ask_llm(user_message: str, history: list = None) -> str: + if history is None: + history = conversation_history + history.append({"role": "user", "content": user_message}) + messages = [{"role": "system", "content": SYSTEM_PROMPT}] + history try: - # Boucle agentique : le LLM peut enchaîner plusieurs skills - MAX_STEPS = 5 + MAX_STEPS = 10 for _ in range(MAX_STEPS): reply = call_ollama(messages) skill_triggered, result = run_skills(reply) - if not skill_triggered: - # Réponse finale sans commande - conversation_history.append({"role": "assistant", "content": reply}) + history.append({"role": "assistant", "content": reply}) return reply - - # Injecter le résultat du skill et relancer le LLM messages.append({"role": "assistant", "content": reply}) messages.append({"role": "user", "content": "[Résultat skill]\n" + result}) - - # Sécurité : trop d'étapes reply = call_ollama(messages) - conversation_history.append({"role": "assistant", "content": reply}) + history.append({"role": "assistant", "content": reply}) return reply - except Exception as e: - error_reply = "Erreur : " + str(e) - conversation_history.append({"role": "assistant", "content": error_reply}) - return error_reply + err = "Erreur : " + str(e) + history.append({"role": "assistant", "content": err}) + return err + +# ── MQTT LISTENER (pour CLI) ────────────────────────────────────────────── +mqtt_pub_client = None + +def mqtt_publish(topic: str, message: str): + if mqtt_pub_client: + mqtt_pub_client.publish(topic, message) + +def on_mqtt_message(client, userdata, msg): + raw = msg.payload.decode(errors="replace") + + # Support JSON avec reply_to optionnel + reply_to = "agents/cli/outbox" + task = raw + try: + data = json.loads(raw) + task = data.get("task", raw) + reply_to = data.get("reply_to", reply_to) + except json.JSONDecodeError: + pass + + print("[MQTT] Message CLI reçu : {}".format(task[:80])) + mqtt_history = [] + reply = ask_llm(task, history=mqtt_history) + mqtt_publish(reply_to, reply) + print("[MQTT] Réponse envoyée sur {}".format(reply_to)) + +def start_mqtt_listener(): + global mqtt_pub_client + + mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, + client_id="agent1_pub") + mqtt_pub_client.connect(MQTT_HOST, MQTT_PORT) + mqtt_pub_client.loop_start() + + sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_sub") + sub.on_message = on_mqtt_message + sub.connect(MQTT_HOST, MQTT_PORT) + sub.subscribe(MQTT_INBOX) + print("[MQTT] Agent1 écoute sur {}".format(MQTT_INBOX)) + sub.loop_forever() # ── BOT XMPP ───────────────────────────────────────────────────────────── class AgentBot(ClientXMPP): @@ -93,7 +129,7 @@ class AgentBot(ClientXMPP): async def session_start(self, event): self.send_presence() await self.get_roster() - self.send_message(mto=ADMIN_JID, mbody="Agent en ligne !", mtype='chat') + self.send_message(mto=ADMIN_JID, mbody="Agent1 (orchestrateur) en ligne !", mtype='chat') async def message(self, msg): if msg['type'] not in ('chat', 'normal'): @@ -114,6 +150,9 @@ class AgentBot(ClientXMPP): # ── MAIN ───────────────────────────────────────────────────────────────── if __name__ == "__main__": - bot = AgentBot() - bot.connect() - bot.loop.run_forever() + mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True) + mqtt_thread.start() + + xmpp_bot = AgentBot() + xmpp_bot.connect() + xmpp_bot.loop.run_forever() diff --git a/cli.py b/cli.py new file mode 100644 index 0000000..15689e1 --- /dev/null +++ b/cli.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +CLI pour interagir avec les agents via MQTT. + +Usage : + python3 cli.py # parle à agent1 + python3 cli.py agent2_debian13 # parle directement à un agent + python3 cli.py --plans # voir les tâches planifiées +""" + +import sys +import json +import time +import threading +import argparse +from pathlib import Path + +import paho.mqtt.client as mqtt +from rich.console import Console +from rich.panel import Panel +from rich.prompt import Prompt +from rich.live import Live +from rich.spinner import Spinner +from rich.text import Text +from rich.rule import Rule +from rich import print as rprint + +# ── CONFIG ────────────────────────────────────────────────────────────── +CONFIG_FILE = Path("/opt/agent/config/config.json") +REGISTRY_FILE = Path("/opt/agent/config/agents_registry.json") + +cfg = json.loads(CONFIG_FILE.read_text()) +MQTT_HOST = cfg.get("mqtt_host", "localhost") +MQTT_PORT = int(cfg.get("mqtt_port", 1883)) + +AGENT_INBOXES = { + "agent1": "agents/agent1/inbox", +} +try: + registry = json.loads(REGISTRY_FILE.read_text()) + for name, info in registry.items(): + AGENT_INBOXES[name] = info["mqtt_inbox"] +except Exception: + pass + +CLI_OUTBOX = "agents/cli/outbox" +console = Console() + +# ── MQTT ──────────────────────────────────────────────────────────────── +response_event = threading.Event() +response_container = [] +pub_client = None +sub_client = None + +def on_message(client, userdata, msg): + response_container.clear() + response_container.append(msg.payload.decode(errors="replace")) + response_event.set() + +def connect_mqtt(): + global pub_client, sub_client + + pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="cli_pub") + pub_client.connect(MQTT_HOST, MQTT_PORT) + pub_client.loop_start() + + sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="cli_sub") + sub_client.on_message = on_message + sub_client.connect(MQTT_HOST, MQTT_PORT) + sub_client.subscribe(CLI_OUTBOX) + sub_client.loop_start() + +def send_and_wait(agent: str, message: str, timeout: int = 180) -> str: + inbox = AGENT_INBOXES.get(agent) + if not inbox: + return "[Erreur] Agent inconnu : {}. Disponibles : {}".format( + agent, ", ".join(AGENT_INBOXES.keys())) + + payload = json.dumps({"task": message, "reply_to": CLI_OUTBOX, "from": "cli"}) + response_event.clear() + pub_client.publish(inbox, payload) + + received = response_event.wait(timeout=timeout) + if received and response_container: + return response_container[0] + return "[Timeout] Pas de réponse de {} après {}s.".format(agent, timeout) + +# ── AFFICHAGE ──────────────────────────────────────────────────────────── +def print_response(agent: str, response: str): + console.print(Panel( + response, + title="[bold cyan]{}[/bold cyan]".format(agent), + border_style="cyan", + padding=(1, 2) + )) + +def print_user(message: str): + console.print(Panel( + "[bold white]{}[/bold white]".format(message), + title="[bold green]vous[/bold green]", + border_style="green", + padding=(0, 2) + )) + +def show_plans(): + """Affiche les tâches planifiées via agent1.""" + connect_mqtt() + with console.status("[bold yellow]Récupération des plans...[/bold yellow]"): + result = send_and_wait("agent1", "PLAN_LIST:", timeout=30) + print_response("agent1 / plans", result) + +# ── BOUCLE PRINCIPALE ──────────────────────────────────────────────────── +def main_loop(agent: str): + connect_mqtt() + + console.print(Rule("[bold blue]Agent CLI[/bold blue]")) + console.print("[dim]Agent cible : [bold]{}[/bold] | /reset | /plans | /quit[/dim]\n".format(agent)) + + while True: + try: + user_input = Prompt.ask("[bold green]>[/bold green]").strip() + except (KeyboardInterrupt, EOFError): + console.print("\n[dim]Au revoir.[/dim]") + break + + if not user_input: + continue + + if user_input == "/quit": + console.print("[dim]Au revoir.[/dim]") + break + + if user_input == "/reset": + send_and_wait(agent, "!reset", timeout=10) + console.print("[dim]Conversation réinitialisée.[/dim]") + continue + + if user_input == "/plans": + with console.status("[bold yellow]Récupération...[/bold yellow]"): + result = send_and_wait("agent1", "PLAN_LIST:", timeout=30) + print_response("plans", result) + continue + + if user_input.startswith("/agent "): + agent = user_input.split(" ", 1)[1].strip() + console.print("[dim]Agent changé : [bold]{}[/bold][/dim]".format(agent)) + continue + + print_user(user_input) + + with console.status("[bold yellow]En attente de {}...[/bold yellow]".format(agent)): + response = send_and_wait(agent, user_input) + + print_response(agent, response) + +# ── MAIN ───────────────────────────────────────────────────────────────── +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="CLI agents MQTT") + parser.add_argument("agent", nargs="?", default="agent1", + help="Agent cible (défaut: agent1)") + parser.add_argument("--plans", action="store_true", + help="Afficher les tâches planifiées") + args = parser.parse_args() + + if args.plans: + show_plans() + else: + main_loop(args.agent) diff --git a/config/system_prompt.txt b/config/system_prompt.txt index d6915fa..1d069a6 100644 --- a/config/system_prompt.txt +++ b/config/system_prompt.txt @@ -1,32 +1,45 @@ Tu es agent1, chef d'orchestre d'un réseau d'agents autonomes spécialisés. -Tu reçois les instructions de sylvain et tu décides de les traiter toi-même ou de les déléguer au bon agent spécialisé. +Tu reçois les instructions de sylvain (via XMPP ou CLI) et tu décides de les traiter toi-même ou de les déléguer. +Les agents ne peuvent pas travailler en parallèle : tu exécutes les tâches séquentiellement. Agents disponibles sous tes ordres : -- agent2_debian13 : Administration Debian (apt, systemd, conteneurs LXC/Docker, KVM, réseau, sécurité système) +- agent2_debian13 : Administration Debian (apt, systemd, conteneurs LXC/Docker, KVM, réseau, sécurité, exécution de commandes système) -Formats de commandes disponibles : +Commandes disponibles : DELEGATE: | - → Déléguer une tâche à un agent spécialisé et attendre sa réponse - → Exemple : DELEGATE: agent2_debian13 | Comment mettre à jour les paquets Debian ? + → Déléguer une tâche unique à un agent spécialisé + → Exemple : DELEGATE: agent2_debian13 | Vérifie l'espace disque -SEARCH: - → Recherche web DuckDuckGo (max 5 résultats) +PLAN: | ;; | ;; ... + → Exécuter un plan de tâches séquentiel (le résultat de chaque étape est transmis à la suivante) + → Exemple : PLAN: agent2_debian13 | apt update ;; agent2_debian13 | apt upgrade -y -READ: - → Lire et convertir une page web en markdown +SCHEDULE: | | + → Planifier une tâche récurrente + → Fréquences : daily HH:MM | every Xh | every Xmin | weekly HH:MM + → Exemple : SCHEDULE: daily 03:00 | agent2_debian13 | apt update && apt upgrade -y + +PLAN_LIST: + → Afficher toutes les tâches planifiées + +PLAN_CANCEL: + → Annuler une tâche planifiée + +SEARCH: + → Recherche web DuckDuckGo REMEMBER: | - → Mémoriser une information en base SQLite + → Mémoriser une information RECALL: → Récupérer une information mémorisée ⚠ RÈGLES : -- Si la demande concerne Debian, Linux, des conteneurs, des VMs ou l'administration système : utilise DELEGATE: agent2_debian13 -- Si la demande concerne l'actualité, des événements récents ou des faits changeants : utilise SEARCH: -- Ne JAMAIS répondre de mémoire à une question d'actualité -- Les agents ne peuvent pas travailler en parallèle : délègue une tâche à la fois -- Synthétise et transmets la réponse de l'agent spécialisé à sylvain -- Réponds toujours en français. Sois concis mais précis. +- Tâche Debian/système → DELEGATE: agent2_debian13 (ou PLAN: pour plusieurs étapes) +- Tâche récurrente → SCHEDULE: +- Actualité/info récente → SEARCH: +- Un seul agent à la fois (pas de parallélisme) +- Transmets toujours le résultat des agents à l'utilisateur avec un résumé clair +- Réponds toujours en français diff --git a/skills/plan.py b/skills/plan.py new file mode 100644 index 0000000..a28e10e --- /dev/null +++ b/skills/plan.py @@ -0,0 +1,45 @@ +""" +Skill : PLAN +Exécute un plan de tâches séquentiel entre plusieurs agents. +Les résultats de chaque étape sont passés en contexte à la suivante. + +Format : + PLAN: | ;; | ;; ... + +Exemple : + PLAN: agent2_debian13 | Vérifier l'espace disque ;; agent2_debian13 | Nettoyer les paquets inutiles +""" +from pathlib import Path + +SKILL_NAME = "plan" +TRIGGER = "PLAN:" + +def execute(args: str) -> str: + from skills.delegate import execute as delegate_exec + + steps_raw = [s.strip() for s in args.split(";;") if s.strip()] + if not steps_raw: + return "Erreur : plan vide. Format : PLAN: | ;; | " + + steps = [] + for s in steps_raw: + if "|" not in s: + return "Erreur étape «{}» : format attendu | ".format(s) + agent, _, task = s.partition("|") + steps.append((agent.strip(), task.strip())) + + report = ["Plan d'exécution ({} étape(s)) :".format(len(steps))] + context = "" + + for i, (agent, task) in enumerate(steps, 1): + full_task = task + if context: + full_task = "{}\n[Contexte étape précédente]\n{}".format(task, context) + + report.append("\n── Étape {}/{} → [{}] ──".format(i, len(steps), agent)) + result = delegate_exec("{} | {}".format(agent, full_task)) + report.append(result) + context = result # passe le résultat à l'étape suivante + + report.append("\n── Plan terminé ──") + return "\n".join(report) diff --git a/skills/schedule_tasks.py b/skills/schedule_tasks.py new file mode 100644 index 0000000..10d4499 --- /dev/null +++ b/skills/schedule_tasks.py @@ -0,0 +1,149 @@ +""" +Skill : SCHEDULE / PLAN_LIST / PLAN_CANCEL +Planification de tâches récurrentes entre agents. + +Formats : + SCHEDULE: daily HH:MM | | + SCHEDULE: every Xh | | + SCHEDULE: every Xmin | | + SCHEDULE: weekly HH:MM | | + + PLAN_LIST: + PLAN_CANCEL: +""" +import re +import json +from pathlib import Path +from datetime import datetime + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger + +SKILL_NAME = "schedule_tasks" +TRIGGER = None +TRIGGERS = { + "SCHEDULE:": "schedule", + "PLAN_LIST:": "plan_list", + "PLAN_CANCEL:": "plan_cancel", +} + +DB_PATH = Path("/opt/agent/scheduler.db") +_scheduler = None + +DAYS_FR = { + "lun": "mon", "mar": "tue", "mer": "wed", + "jeu": "thu", "ven": "fri", "sam": "sat", "dim": "sun" +} + +def _get_scheduler(): + global _scheduler + if _scheduler is None: + jobstores = {"default": SQLAlchemyJobStore(url="sqlite:///{}".format(DB_PATH))} + _scheduler = BackgroundScheduler(jobstores=jobstores) + _scheduler.start() + return _scheduler + +def _run_delegated_task(agent: str, task: str): + """Exécutée par le scheduler : délègue la tâche à l'agent.""" + from skills.delegate import execute as delegate_exec + import paho.mqtt.publish as publish + import json as _json + + result = delegate_exec("{} | {}".format(agent, task)) + print("[SCHEDULE] Tâche exécutée [{} → {}] : {}".format( + datetime.now().strftime("%Y-%m-%d %H:%M"), agent, task[:60])) + + # Notifier via MQTT sur le topic de notification + try: + cfg = _json.loads(Path("/opt/agent/config/config.json").read_text()) + publish.single( + "agents/scheduler/notifications", + payload="[{}] {}\n{}".format(agent, task, result), + hostname=cfg.get("mqtt_host", "localhost"), + port=int(cfg.get("mqtt_port", 1883)) + ) + except Exception: + pass + +def _parse_trigger(expr: str): + """Parse l'expression de planification et retourne un trigger APScheduler.""" + expr = expr.strip().lower() + + # every Xh + m = re.match(r"every (\d+)h$", expr) + if m: + return IntervalTrigger(hours=int(m.group(1))), "toutes les {}h".format(m.group(1)) + + # every Xmin + m = re.match(r"every (\d+)min$", expr) + if m: + return IntervalTrigger(minutes=int(m.group(1))), "toutes les {}min".format(m.group(1)) + + # daily HH:MM + m = re.match(r"daily (\d{1,2}):(\d{2})$", expr) + if m: + h, mn = m.group(1), m.group(2) + return CronTrigger(hour=h, minute=mn), "tous les jours à {}:{}".format(h, mn) + + # weekly HH:MM + m = re.match(r"weekly (\w+) (\d{1,2}):(\d{2})$", expr) + if m: + day_fr = m.group(1) + day_en = DAYS_FR.get(day_fr, day_fr) + h, mn = m.group(2), m.group(3) + return CronTrigger(day_of_week=day_en, hour=h, minute=mn), \ + "chaque {} à {}:{}".format(day_fr, h, mn) + + return None, None + +def schedule(args: str) -> str: + parts = [p.strip() for p in args.split("|")] + if len(parts) < 3: + return ("Erreur : format attendu :\n" + "SCHEDULE: daily HH:MM | | \n" + "SCHEDULE: every Xh | | \n" + "SCHEDULE: weekly lun HH:MM | | ") + + expr, agent, task = parts[0], parts[1], "|".join(parts[2:]) + trigger, label = _parse_trigger(expr) + if trigger is None: + return "Expression invalide : «{}»\nFormats : daily HH:MM | every Xh | every Xmin | weekly HH:MM".format(expr) + + sched = _get_scheduler() + job = sched.add_job( + _run_delegated_task, + trigger=trigger, + args=[agent, task], + name="{} → {}".format(agent, task[:40]) + ) + + return "Tâche planifiée [ID: {}]\nAgent : {}\nTâche : {}\nFréquence : {}\nProchain : {}".format( + job.id, agent, task, label, + job.next_run_time.strftime("%Y-%m-%d %H:%M") if job.next_run_time else "N/A" + ) + +def plan_list(args: str) -> str: + sched = _get_scheduler() + jobs = sched.get_jobs() + if not jobs: + return "Aucune tâche planifiée." + lines = ["Tâches planifiées ({}) :".format(len(jobs))] + for j in jobs: + next_run = j.next_run_time.strftime("%Y-%m-%d %H:%M") if j.next_run_time else "N/A" + lines.append("- [{}] {} | Prochain : {}".format(j.id[:8], j.name, next_run)) + return "\n".join(lines) + +def plan_cancel(args: str) -> str: + job_id = args.strip() + if not job_id: + return "Erreur : ID manquant. Utilisez PLAN_LIST: pour voir les IDs." + sched = _get_scheduler() + # Recherche par ID complet ou préfixe + for job in sched.get_jobs(): + if job.id == job_id or job.id.startswith(job_id): + name = job.name + job.remove() + return "Tâche [{}] annulée : {}".format(job_id[:8], name) + return "Aucune tâche trouvée avec l'ID : {}".format(job_id)