#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CLI pour interagir avec les agents via MQTT. Usage : python3 cli.py # parle à agent1 python3 cli.py agent2_debian13 # parle directement à un agent python3 cli.py --plans # voir les tâches planifiées """ import sys import json import time import threading import argparse from pathlib import Path import paho.mqtt.client as mqtt from rich.console import Console from rich.panel import Panel from rich.prompt import Prompt from rich.live import Live from rich.spinner import Spinner from rich.text import Text from rich.rule import Rule from rich import print as rprint # ── CONFIG ────────────────────────────────────────────────────────────── CONFIG_FILE = Path("/opt/agent/config/config.json") REGISTRY_FILE = Path("/opt/agent/config/agents_registry.json") cfg = json.loads(CONFIG_FILE.read_text()) MQTT_HOST = cfg.get("mqtt_host", "localhost") MQTT_PORT = int(cfg.get("mqtt_port", 1883)) AGENT_INBOXES = { "agent1": "agents/agent1/inbox", } try: registry = json.loads(REGISTRY_FILE.read_text()) for name, info in registry.items(): AGENT_INBOXES[name] = info["mqtt_inbox"] except Exception: pass CLI_OUTBOX = "agents/cli/outbox" console = Console() # ── MQTT ──────────────────────────────────────────────────────────────── response_event = threading.Event() response_container = [] pub_client = None sub_client = None def on_message(client, userdata, msg): response_container.clear() response_container.append(msg.payload.decode(errors="replace")) response_event.set() def connect_mqtt(): global pub_client, sub_client pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="cli_pub") pub_client.connect(MQTT_HOST, MQTT_PORT) pub_client.loop_start() sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="cli_sub") sub_client.on_message = on_message sub_client.connect(MQTT_HOST, MQTT_PORT) sub_client.subscribe(CLI_OUTBOX) sub_client.loop_start() def send_and_wait(agent: str, message: str, timeout: int = 180) -> str: inbox = AGENT_INBOXES.get(agent) if not inbox: return "[Erreur] Agent inconnu : {}. Disponibles : {}".format( agent, ", ".join(AGENT_INBOXES.keys())) payload = json.dumps({"task": message, "reply_to": CLI_OUTBOX, "from": "cli"}) response_event.clear() pub_client.publish(inbox, payload) received = response_event.wait(timeout=timeout) if received and response_container: return response_container[0] return "[Timeout] Pas de réponse de {} après {}s.".format(agent, timeout) # ── AFFICHAGE ──────────────────────────────────────────────────────────── def print_response(agent: str, response: str): console.print(Panel( response, title="[bold cyan]{}[/bold cyan]".format(agent), border_style="cyan", padding=(1, 2) )) def print_user(message: str): console.print(Panel( "[bold white]{}[/bold white]".format(message), title="[bold green]vous[/bold green]", border_style="green", padding=(0, 2) )) def show_plans(): """Affiche les tâches planifiées via agent1.""" connect_mqtt() with console.status("[bold yellow]Récupération des plans...[/bold yellow]"): result = send_and_wait("agent1", "PLAN_LIST:", timeout=30) print_response("agent1 / plans", result) # ── BOUCLE PRINCIPALE ──────────────────────────────────────────────────── def main_loop(agent: str): connect_mqtt() console.print(Rule("[bold blue]Agent CLI[/bold blue]")) console.print("[dim]Agent cible : [bold]{}[/bold] | /reset | /plans | /report | /errors | /agent | /quit[/dim]\n".format(agent)) while True: try: user_input = Prompt.ask("[bold green]>[/bold green]").strip() except (KeyboardInterrupt, EOFError): console.print("\n[dim]Au revoir.[/dim]") break if not user_input: continue if user_input == "/quit": console.print("[dim]Au revoir.[/dim]") break if user_input == "/reset": send_and_wait(agent, "!reset", timeout=10) console.print("[dim]Conversation réinitialisée.[/dim]") continue if user_input == "/plans": with console.status("[bold yellow]Récupération...[/bold yellow]"): result = send_and_wait("agent1", "PLAN_LIST:", timeout=30) print_response("plans", result) continue if user_input == "/report": with console.status("[bold yellow]Génération rapport...[/bold yellow]"): result = send_and_wait("agent1", "REPORT:", timeout=30) print_response("rapport", result) continue if user_input == "/errors": with console.status("[bold yellow]Récupération erreurs...[/bold yellow]"): result = send_and_wait("agent1", "REPORT_ERRORS:", timeout=30) print_response("erreurs", result) continue if user_input.startswith("/agent "): agent = user_input.split(" ", 1)[1].strip() console.print("[dim]Agent changé : [bold]{}[/bold][/dim]".format(agent)) continue print_user(user_input) with console.status("[bold yellow]En attente de {}...[/bold yellow]".format(agent)): response = send_and_wait(agent, user_input) print_response(agent, response) # ── MAIN ───────────────────────────────────────────────────────────────── if __name__ == "__main__": parser = argparse.ArgumentParser(description="CLI agents MQTT") parser.add_argument("agent", nargs="?", default="agent1", help="Agent cible (défaut: agent1)") parser.add_argument("--plans", action="store_true", help="Afficher les tâches planifiées") args = parser.parse_args() if args.plans: show_plans() else: main_loop(args.agent)