Orchestration complète : planning, scheduling, CLI

- agent1.py : listener MQTT (agents/agent1/inbox), MAX_STEPS 10
- skills/plan.py : exécution séquentielle PLAN: avec contexte entre étapes
- skills/schedule_tasks.py : SCHEDULE: / PLAN_LIST: / PLAN_CANCEL: via APScheduler
- cli.py : interface CLI rich (MQTT, multi-agents, /plans, /agent)
- system_prompt.txt : mis à jour avec tous les nouveaux skills

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-07 13:13:42 +00:00
parent 3dfd621582
commit 305999d8bf
5 changed files with 458 additions and 43 deletions
+66 -27
View File
@@ -3,14 +3,14 @@
import asyncio import asyncio
import sys import sys
import threading
import requests import requests
import json import json
from pathlib import Path from pathlib import Path
from slixmpp import ClientXMPP from slixmpp import ClientXMPP
import paho.mqtt.client as mqtt
# Ajouter /opt/agent au path pour importer les skills
sys.path.insert(0, "/opt/agent") sys.path.insert(0, "/opt/agent")
from skills.loader import load_skills, run_skills from skills.loader import load_skills, run_skills
# ── CONFIG ─────────────────────────────────────────────────────────────── # ── CONFIG ───────────────────────────────────────────────────────────────
@@ -32,12 +32,15 @@ MODEL = cfg["model"]
XMPP_JID = cfg["xmpp_jid"] XMPP_JID = cfg["xmpp_jid"]
XMPP_PASS = cfg["xmpp_pass"] XMPP_PASS = cfg["xmpp_pass"]
ADMIN_JID = cfg["admin_jid"] ADMIN_JID = cfg["admin_jid"]
MQTT_HOST = cfg.get("mqtt_host", "localhost")
MQTT_PORT = int(cfg.get("mqtt_port", 1883))
MQTT_INBOX = "agents/agent1/inbox"
SYSTEM_PROMPT = load_system_prompt() SYSTEM_PROMPT = load_system_prompt()
# Charger les skills au démarrage
load_skills() load_skills()
conversation_history = [] conversation_history = []
xmpp_bot = None # référence globale pour répondre via XMPP depuis MQTT
# ── LLM ────────────────────────────────────────────────────────────────── # ── LLM ──────────────────────────────────────────────────────────────────
def call_ollama(messages: list) -> str: def call_ollama(messages: list) -> str:
@@ -48,38 +51,71 @@ def call_ollama(messages: list) -> str:
"options" : {"temperature": 0.3} "options" : {"temperature": 0.3}
} }
response = requests.post(OLLAMA_URL, json=payload, timeout=180) response = requests.post(OLLAMA_URL, json=payload, timeout=180)
data = response.json() return response.json()["message"]["content"]
return data["message"]["content"]
def ask_llm(user_message: str) -> str:
conversation_history.append({"role": "user", "content": user_message})
messages = [{"role": "system", "content": SYSTEM_PROMPT}] + conversation_history
def ask_llm(user_message: str, history: list = None) -> str:
if history is None:
history = conversation_history
history.append({"role": "user", "content": user_message})
messages = [{"role": "system", "content": SYSTEM_PROMPT}] + history
try: try:
# Boucle agentique : le LLM peut enchaîner plusieurs skills MAX_STEPS = 10
MAX_STEPS = 5
for _ in range(MAX_STEPS): for _ in range(MAX_STEPS):
reply = call_ollama(messages) reply = call_ollama(messages)
skill_triggered, result = run_skills(reply) skill_triggered, result = run_skills(reply)
if not skill_triggered: if not skill_triggered:
# Réponse finale sans commande history.append({"role": "assistant", "content": reply})
conversation_history.append({"role": "assistant", "content": reply})
return reply return reply
# Injecter le résultat du skill et relancer le LLM
messages.append({"role": "assistant", "content": reply}) messages.append({"role": "assistant", "content": reply})
messages.append({"role": "user", "content": "[Résultat skill]\n" + result}) messages.append({"role": "user", "content": "[Résultat skill]\n" + result})
# Sécurité : trop d'étapes
reply = call_ollama(messages) reply = call_ollama(messages)
conversation_history.append({"role": "assistant", "content": reply}) history.append({"role": "assistant", "content": reply})
return reply return reply
except Exception as e: except Exception as e:
error_reply = "Erreur : " + str(e) err = "Erreur : " + str(e)
conversation_history.append({"role": "assistant", "content": error_reply}) history.append({"role": "assistant", "content": err})
return error_reply return err
# ── MQTT LISTENER (pour CLI) ──────────────────────────────────────────────
mqtt_pub_client = None
def mqtt_publish(topic: str, message: str):
if mqtt_pub_client:
mqtt_pub_client.publish(topic, message)
def on_mqtt_message(client, userdata, msg):
raw = msg.payload.decode(errors="replace")
# Support JSON avec reply_to optionnel
reply_to = "agents/cli/outbox"
task = raw
try:
data = json.loads(raw)
task = data.get("task", raw)
reply_to = data.get("reply_to", reply_to)
except json.JSONDecodeError:
pass
print("[MQTT] Message CLI reçu : {}".format(task[:80]))
mqtt_history = []
reply = ask_llm(task, history=mqtt_history)
mqtt_publish(reply_to, reply)
print("[MQTT] Réponse envoyée sur {}".format(reply_to))
def start_mqtt_listener():
global mqtt_pub_client
mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id="agent1_pub")
mqtt_pub_client.connect(MQTT_HOST, MQTT_PORT)
mqtt_pub_client.loop_start()
sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_sub")
sub.on_message = on_mqtt_message
sub.connect(MQTT_HOST, MQTT_PORT)
sub.subscribe(MQTT_INBOX)
print("[MQTT] Agent1 écoute sur {}".format(MQTT_INBOX))
sub.loop_forever()
# ── BOT XMPP ───────────────────────────────────────────────────────────── # ── BOT XMPP ─────────────────────────────────────────────────────────────
class AgentBot(ClientXMPP): class AgentBot(ClientXMPP):
@@ -93,7 +129,7 @@ class AgentBot(ClientXMPP):
async def session_start(self, event): async def session_start(self, event):
self.send_presence() self.send_presence()
await self.get_roster() await self.get_roster()
self.send_message(mto=ADMIN_JID, mbody="Agent en ligne !", mtype='chat') self.send_message(mto=ADMIN_JID, mbody="Agent1 (orchestrateur) en ligne !", mtype='chat')
async def message(self, msg): async def message(self, msg):
if msg['type'] not in ('chat', 'normal'): if msg['type'] not in ('chat', 'normal'):
@@ -114,6 +150,9 @@ class AgentBot(ClientXMPP):
# ── MAIN ───────────────────────────────────────────────────────────────── # ── MAIN ─────────────────────────────────────────────────────────────────
if __name__ == "__main__": if __name__ == "__main__":
bot = AgentBot() mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True)
bot.connect() mqtt_thread.start()
bot.loop.run_forever()
xmpp_bot = AgentBot()
xmpp_bot.connect()
xmpp_bot.loop.run_forever()
+169
View File
@@ -0,0 +1,169 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CLI pour interagir avec les agents via MQTT.
Usage :
python3 cli.py # parle à agent1
python3 cli.py agent2_debian13 # parle directement à un agent
python3 cli.py --plans # voir les tâches planifiées
"""
import sys
import json
import time
import threading
import argparse
from pathlib import Path
import paho.mqtt.client as mqtt
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Prompt
from rich.live import Live
from rich.spinner import Spinner
from rich.text import Text
from rich.rule import Rule
from rich import print as rprint
# ── CONFIG ──────────────────────────────────────────────────────────────
CONFIG_FILE = Path("/opt/agent/config/config.json")
REGISTRY_FILE = Path("/opt/agent/config/agents_registry.json")
cfg = json.loads(CONFIG_FILE.read_text())
MQTT_HOST = cfg.get("mqtt_host", "localhost")
MQTT_PORT = int(cfg.get("mqtt_port", 1883))
AGENT_INBOXES = {
"agent1": "agents/agent1/inbox",
}
try:
registry = json.loads(REGISTRY_FILE.read_text())
for name, info in registry.items():
AGENT_INBOXES[name] = info["mqtt_inbox"]
except Exception:
pass
CLI_OUTBOX = "agents/cli/outbox"
console = Console()
# ── MQTT ────────────────────────────────────────────────────────────────
response_event = threading.Event()
response_container = []
pub_client = None
sub_client = None
def on_message(client, userdata, msg):
response_container.clear()
response_container.append(msg.payload.decode(errors="replace"))
response_event.set()
def connect_mqtt():
global pub_client, sub_client
pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="cli_pub")
pub_client.connect(MQTT_HOST, MQTT_PORT)
pub_client.loop_start()
sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="cli_sub")
sub_client.on_message = on_message
sub_client.connect(MQTT_HOST, MQTT_PORT)
sub_client.subscribe(CLI_OUTBOX)
sub_client.loop_start()
def send_and_wait(agent: str, message: str, timeout: int = 180) -> str:
inbox = AGENT_INBOXES.get(agent)
if not inbox:
return "[Erreur] Agent inconnu : {}. Disponibles : {}".format(
agent, ", ".join(AGENT_INBOXES.keys()))
payload = json.dumps({"task": message, "reply_to": CLI_OUTBOX, "from": "cli"})
response_event.clear()
pub_client.publish(inbox, payload)
received = response_event.wait(timeout=timeout)
if received and response_container:
return response_container[0]
return "[Timeout] Pas de réponse de {} après {}s.".format(agent, timeout)
# ── AFFICHAGE ────────────────────────────────────────────────────────────
def print_response(agent: str, response: str):
console.print(Panel(
response,
title="[bold cyan]{}[/bold cyan]".format(agent),
border_style="cyan",
padding=(1, 2)
))
def print_user(message: str):
console.print(Panel(
"[bold white]{}[/bold white]".format(message),
title="[bold green]vous[/bold green]",
border_style="green",
padding=(0, 2)
))
def show_plans():
"""Affiche les tâches planifiées via agent1."""
connect_mqtt()
with console.status("[bold yellow]Récupération des plans...[/bold yellow]"):
result = send_and_wait("agent1", "PLAN_LIST:", timeout=30)
print_response("agent1 / plans", result)
# ── BOUCLE PRINCIPALE ────────────────────────────────────────────────────
def main_loop(agent: str):
connect_mqtt()
console.print(Rule("[bold blue]Agent CLI[/bold blue]"))
console.print("[dim]Agent cible : [bold]{}[/bold] | /reset | /plans | /quit[/dim]\n".format(agent))
while True:
try:
user_input = Prompt.ask("[bold green]>[/bold green]").strip()
except (KeyboardInterrupt, EOFError):
console.print("\n[dim]Au revoir.[/dim]")
break
if not user_input:
continue
if user_input == "/quit":
console.print("[dim]Au revoir.[/dim]")
break
if user_input == "/reset":
send_and_wait(agent, "!reset", timeout=10)
console.print("[dim]Conversation réinitialisée.[/dim]")
continue
if user_input == "/plans":
with console.status("[bold yellow]Récupération...[/bold yellow]"):
result = send_and_wait("agent1", "PLAN_LIST:", timeout=30)
print_response("plans", result)
continue
if user_input.startswith("/agent "):
agent = user_input.split(" ", 1)[1].strip()
console.print("[dim]Agent changé : [bold]{}[/bold][/dim]".format(agent))
continue
print_user(user_input)
with console.status("[bold yellow]En attente de {}...[/bold yellow]".format(agent)):
response = send_and_wait(agent, user_input)
print_response(agent, response)
# ── MAIN ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CLI agents MQTT")
parser.add_argument("agent", nargs="?", default="agent1",
help="Agent cible (défaut: agent1)")
parser.add_argument("--plans", action="store_true",
help="Afficher les tâches planifiées")
args = parser.parse_args()
if args.plans:
show_plans()
else:
main_loop(args.agent)
+29 -16
View File
@@ -1,32 +1,45 @@
Tu es agent1, chef d'orchestre d'un réseau d'agents autonomes spécialisés. Tu es agent1, chef d'orchestre d'un réseau d'agents autonomes spécialisés.
Tu reçois les instructions de sylvain et tu décides de les traiter toi-même ou de les déléguer au bon agent spécialisé. Tu reçois les instructions de sylvain (via XMPP ou CLI) et tu décides de les traiter toi-même ou de les déléguer.
Les agents ne peuvent pas travailler en parallèle : tu exécutes les tâches séquentiellement.
Agents disponibles sous tes ordres : Agents disponibles sous tes ordres :
- agent2_debian13 : Administration Debian (apt, systemd, conteneurs LXC/Docker, KVM, réseau, sécurité système) - agent2_debian13 : Administration Debian (apt, systemd, conteneurs LXC/Docker, KVM, réseau, sécurité, exécution de commandes système)
Formats de commandes disponibles : Commandes disponibles :
DELEGATE: <agent> | <tâche> DELEGATE: <agent> | <tâche>
→ Déléguer une tâche à un agent spécialisé et attendre sa réponse → Déléguer une tâche unique à un agent spécialisé
→ Exemple : DELEGATE: agent2_debian13 | Comment mettre à jour les paquets Debian ? → Exemple : DELEGATE: agent2_debian13 | Vérifie l'espace disque
SEARCH: <requête web> PLAN: <agent> | <tâche1> ;; <agent> | <tâche2> ;; ...
Recherche web DuckDuckGo (max 5 résultats) Exécuter un plan de tâches séquentiel (le résultat de chaque étape est transmis à la suivante)
→ Exemple : PLAN: agent2_debian13 | apt update ;; agent2_debian13 | apt upgrade -y
READ: <url> SCHEDULE: <fréquence> | <agent> | <tâche>
Lire et convertir une page web en markdown Planifier une tâche récurrente
→ Fréquences : daily HH:MM | every Xh | every Xmin | weekly <lun|mar|mer|jeu|ven|sam|dim> HH:MM
→ Exemple : SCHEDULE: daily 03:00 | agent2_debian13 | apt update && apt upgrade -y
PLAN_LIST:
→ Afficher toutes les tâches planifiées
PLAN_CANCEL: <job_id>
→ Annuler une tâche planifiée
SEARCH: <requête>
→ Recherche web DuckDuckGo
REMEMBER: <clé> | <valeur> REMEMBER: <clé> | <valeur>
→ Mémoriser une information en base SQLite → Mémoriser une information
RECALL: <clé> RECALL: <clé>
→ Récupérer une information mémorisée → Récupérer une information mémorisée
⚠ RÈGLES : ⚠ RÈGLES :
- Si la demande concerne Debian, Linux, des conteneurs, des VMs ou l'administration système : utilise DELEGATE: agent2_debian13 - Tâche Debian/système → DELEGATE: agent2_debian13 (ou PLAN: pour plusieurs étapes)
- Si la demande concerne l'actualité, des événements récents ou des faits changeants : utilise SEARCH: - Tâche récurrente → SCHEDULE:
- Ne JAMAIS répondre de mémoire à une question d'actualité - Actualité/info récente → SEARCH:
- Les agents ne peuvent pas travailler en parallèle : délègue une tâche à la fois - Un seul agent à la fois (pas de parallélisme)
- Synthétise et transmets la réponse de l'agent spécialisé à sylvain - Transmets toujours le résultat des agents à l'utilisateur avec un résumé clair
- Réponds toujours en français. Sois concis mais précis. - Réponds toujours en français
+45
View File
@@ -0,0 +1,45 @@
"""
Skill : PLAN
Exécute un plan de tâches séquentiel entre plusieurs agents.
Les résultats de chaque étape sont passés en contexte à la suivante.
Format :
PLAN: <agent> | <tâche> ;; <agent> | <tâche> ;; ...
Exemple :
PLAN: agent2_debian13 | Vérifier l'espace disque ;; agent2_debian13 | Nettoyer les paquets inutiles
"""
from pathlib import Path
SKILL_NAME = "plan"
TRIGGER = "PLAN:"
def execute(args: str) -> str:
from skills.delegate import execute as delegate_exec
steps_raw = [s.strip() for s in args.split(";;") if s.strip()]
if not steps_raw:
return "Erreur : plan vide. Format : PLAN: <agent> | <tâche> ;; <agent> | <tâche>"
steps = []
for s in steps_raw:
if "|" not in s:
return "Erreur étape «{}» : format attendu <agent> | <tâche>".format(s)
agent, _, task = s.partition("|")
steps.append((agent.strip(), task.strip()))
report = ["Plan d'exécution ({} étape(s)) :".format(len(steps))]
context = ""
for i, (agent, task) in enumerate(steps, 1):
full_task = task
if context:
full_task = "{}\n[Contexte étape précédente]\n{}".format(task, context)
report.append("\n── Étape {}/{} → [{}] ──".format(i, len(steps), agent))
result = delegate_exec("{} | {}".format(agent, full_task))
report.append(result)
context = result # passe le résultat à l'étape suivante
report.append("\n── Plan terminé ──")
return "\n".join(report)
+149
View File
@@ -0,0 +1,149 @@
"""
Skill : SCHEDULE / PLAN_LIST / PLAN_CANCEL
Planification de tâches récurrentes entre agents.
Formats :
SCHEDULE: daily HH:MM | <agent> | <tâche>
SCHEDULE: every Xh | <agent> | <tâche>
SCHEDULE: every Xmin | <agent> | <tâche>
SCHEDULE: weekly <lun|mar|mer|jeu|ven|sam|dim> HH:MM | <agent> | <tâche>
PLAN_LIST:
PLAN_CANCEL: <job_id>
"""
import re
import json
from pathlib import Path
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
SKILL_NAME = "schedule_tasks"
TRIGGER = None
TRIGGERS = {
"SCHEDULE:": "schedule",
"PLAN_LIST:": "plan_list",
"PLAN_CANCEL:": "plan_cancel",
}
DB_PATH = Path("/opt/agent/scheduler.db")
_scheduler = None
DAYS_FR = {
"lun": "mon", "mar": "tue", "mer": "wed",
"jeu": "thu", "ven": "fri", "sam": "sat", "dim": "sun"
}
def _get_scheduler():
global _scheduler
if _scheduler is None:
jobstores = {"default": SQLAlchemyJobStore(url="sqlite:///{}".format(DB_PATH))}
_scheduler = BackgroundScheduler(jobstores=jobstores)
_scheduler.start()
return _scheduler
def _run_delegated_task(agent: str, task: str):
"""Exécutée par le scheduler : délègue la tâche à l'agent."""
from skills.delegate import execute as delegate_exec
import paho.mqtt.publish as publish
import json as _json
result = delegate_exec("{} | {}".format(agent, task))
print("[SCHEDULE] Tâche exécutée [{}{}] : {}".format(
datetime.now().strftime("%Y-%m-%d %H:%M"), agent, task[:60]))
# Notifier via MQTT sur le topic de notification
try:
cfg = _json.loads(Path("/opt/agent/config/config.json").read_text())
publish.single(
"agents/scheduler/notifications",
payload="[{}] {}\n{}".format(agent, task, result),
hostname=cfg.get("mqtt_host", "localhost"),
port=int(cfg.get("mqtt_port", 1883))
)
except Exception:
pass
def _parse_trigger(expr: str):
"""Parse l'expression de planification et retourne un trigger APScheduler."""
expr = expr.strip().lower()
# every Xh
m = re.match(r"every (\d+)h$", expr)
if m:
return IntervalTrigger(hours=int(m.group(1))), "toutes les {}h".format(m.group(1))
# every Xmin
m = re.match(r"every (\d+)min$", expr)
if m:
return IntervalTrigger(minutes=int(m.group(1))), "toutes les {}min".format(m.group(1))
# daily HH:MM
m = re.match(r"daily (\d{1,2}):(\d{2})$", expr)
if m:
h, mn = m.group(1), m.group(2)
return CronTrigger(hour=h, minute=mn), "tous les jours à {}:{}".format(h, mn)
# weekly <jour> HH:MM
m = re.match(r"weekly (\w+) (\d{1,2}):(\d{2})$", expr)
if m:
day_fr = m.group(1)
day_en = DAYS_FR.get(day_fr, day_fr)
h, mn = m.group(2), m.group(3)
return CronTrigger(day_of_week=day_en, hour=h, minute=mn), \
"chaque {} à {}:{}".format(day_fr, h, mn)
return None, None
def schedule(args: str) -> str:
parts = [p.strip() for p in args.split("|")]
if len(parts) < 3:
return ("Erreur : format attendu :\n"
"SCHEDULE: daily HH:MM | <agent> | <tâche>\n"
"SCHEDULE: every Xh | <agent> | <tâche>\n"
"SCHEDULE: weekly lun HH:MM | <agent> | <tâche>")
expr, agent, task = parts[0], parts[1], "|".join(parts[2:])
trigger, label = _parse_trigger(expr)
if trigger is None:
return "Expression invalide : «{}»\nFormats : daily HH:MM | every Xh | every Xmin | weekly <jour> HH:MM".format(expr)
sched = _get_scheduler()
job = sched.add_job(
_run_delegated_task,
trigger=trigger,
args=[agent, task],
name="{}{}".format(agent, task[:40])
)
return "Tâche planifiée [ID: {}]\nAgent : {}\nTâche : {}\nFréquence : {}\nProchain : {}".format(
job.id, agent, task, label,
job.next_run_time.strftime("%Y-%m-%d %H:%M") if job.next_run_time else "N/A"
)
def plan_list(args: str) -> str:
sched = _get_scheduler()
jobs = sched.get_jobs()
if not jobs:
return "Aucune tâche planifiée."
lines = ["Tâches planifiées ({}) :".format(len(jobs))]
for j in jobs:
next_run = j.next_run_time.strftime("%Y-%m-%d %H:%M") if j.next_run_time else "N/A"
lines.append("- [{}] {} | Prochain : {}".format(j.id[:8], j.name, next_run))
return "\n".join(lines)
def plan_cancel(args: str) -> str:
job_id = args.strip()
if not job_id:
return "Erreur : ID manquant. Utilisez PLAN_LIST: pour voir les IDs."
sched = _get_scheduler()
# Recherche par ID complet ou préfixe
for job in sched.get_jobs():
if job.id == job_id or job.id.startswith(job_id):
name = job.name
job.remove()
return "Tâche [{}] annulée : {}".format(job_id[:8], name)
return "Aucune tâche trouvée avec l'ID : {}".format(job_id)