""" Moteur de déploiement SSH — installe un agent sur une machine distante ou locale. Le nom de l'agent est fourni par l'utilisateur et devient son identifiant permanent. """ import json import logging import os import socket import time from dataclasses import dataclass, field from typing import Optional, Callable logger = logging.getLogger(__name__) CATALOG_PATH = "/opt/agent_deploy/config/catalog.json" @dataclass class DeployConfig: """Paramètres d'un déploiement.""" agent_type: str # type d'agent : debian, ansible, deploy... ou "custom" agent_name: str # nom choisi par l'utilisateur (ex: "debian-prod") host: str # IP ou hostname cible ssh_user: str # utilisateur SSH ssh_auth: str # "password" ou "key" ssh_credential: str # mot de passe ou chemin de clé privée xmpp_jid: str xmpp_password: str mqtt_host: str mqtt_port: int = 1883 local: bool = False # installation locale (pas de SSH) extra: dict = field(default_factory=dict) git_url: Optional[str] = None # bypass catalogue : URL git directe main_script: Optional[str] = None # script principal (None = auto-détection) apt_deps: list = field(default_factory=list) # override des dépendances apt class AgentCatalog: """Catalogue des types d'agents déployables.""" def __init__(self, path: str = CATALOG_PATH): self.path = path self._data = self._load() def _load(self) -> dict: if not os.path.exists(self.path): return self._default_catalog() with open(self.path) as f: return json.load(f) def _default_catalog(self) -> dict: return { "nexus": { "description": "Orchestrateur principal — coordonne tous les agents", "repo": "https://git.piaf.im/sylvain/nexus.git", "install_path_tpl": "/opt/{agent_name}", "service_name_tpl": "{agent_name}", "main_script": "nexus.py", "apt_deps": ["python3", "python3-pip", "python3-venv", "git", "mosquitto-clients"], "pip_deps": ["apscheduler", "duckduckgo-search", "beautifulsoup4"], "config_template": "nexus", "extra_config": { "schedules": {"scheduled_tasks": [], "daily_reports": []} }, }, "debian": { "description": "Agent d'administration système Debian", "repo": "https://git.piaf.im/sylvain/agent_debian.git", "install_path_tpl": "/opt/{agent_name}", "service_name_tpl": "{agent_name}", "main_script": "agent_debian.py", "apt_deps": ["python3", "python3-pip", "python3-venv", "git", "mosquitto-clients"], "pip_deps": [], "config_template": "debian", "extra_config": {"work_hours": "07:00-23:00"}, }, "ansible": { "description": "Agent d'automatisation Ansible", "repo": "https://git.piaf.im/sylvain/agent_ansible.git", "install_path_tpl": "/opt/{agent_name}", "service_name_tpl": "{agent_name}", "main_script": "agent_ansible.py", "apt_deps": ["python3", "python3-pip", "python3-venv", "git", "ansible", "mosquitto-clients"], "pip_deps": [], "config_template": "ansible", "extra_config": {"work_hours": "07:00-23:00"}, }, "deploy": { "description": "Agent de déploiement d'autres agents", "repo": "https://git.piaf.im/sylvain/agent_deploy.git", "install_path_tpl": "/opt/{agent_name}", "service_name_tpl": "{agent_name}", "main_script": "agent_deploy.py", "apt_deps": ["python3", "python3-pip", "python3-venv", "git", "python3-paramiko", "mosquitto-clients", "sshpass"], "pip_deps": ["paramiko"], "config_template": "deploy", "extra_config": { "work_hours": "08:00-20:00", "catalog": "/opt/{agent_name}/config/catalog.json" }, }, "hal": { "description": "Agent HAL — contrôle système complet + édition code/fichiers", "repo": "https://git.piaf.im/sylvain/agent_hal.git", "install_path_tpl": "/opt/{agent_name}", "service_name_tpl": "{agent_name}", "main_script": "agent_hal.py", "apt_deps": ["python3", "python3-pip", "python3-venv", "git", "mosquitto-clients", "sshpass"], "pip_deps": ["slixmpp-omemo"], "config_template": "debian", "extra_config": {"work_hours": "00:00-23:59"}, }, } def get(self, agent_type: str) -> Optional[dict]: return self._data.get(agent_type) def list_types(self) -> list[str]: return list(self._data.keys()) def summary(self) -> str: lines = ["── Agents déployables ──────────────"] for name, info in self._data.items(): lines.append(f" [{name}] {info['description']}") return "\n".join(lines) class Deployer: """ Orchestre le déploiement complet d'un agent. Supporte SSH (distant) et local. """ def __init__(self, config: DeployConfig, progress_cb: Optional[Callable] = None): self.cfg = config self.catalog = AgentCatalog() self._cb = progress_cb or (lambda msg: logger.info(msg)) self._ssh = None # Dépendances apt minimales pour tout agent Python custom _DEFAULT_APT_DEPS = ["python3", "python3-pip", "python3-venv", "git"] def deploy(self) -> tuple[bool, str]: """Lance le déploiement. Retourne (succès, message).""" # ── Mode "from_git" : repo arbitraire, sans catalogue ────────────── if self.cfg.git_url: return self._deploy_from_git() # ── Mode catalogue ────────────────────────────────────────────────── agent_info = self.catalog.get(self.cfg.agent_type) if not agent_info: return False, f"Type d'agent inconnu : '{self.cfg.agent_type}'. Disponibles : {self.catalog.list_types()}" install_path = agent_info["install_path_tpl"].format(agent_name=self.cfg.agent_name) service_name = agent_info["service_name_tpl"].format(agent_name=self.cfg.agent_name) main_script = agent_info["main_script"] steps = [ ("Connexion SSH", lambda: self._connect()), ("Dépendances système", lambda: self._install_apt(agent_info["apt_deps"])), ("Clonage du dépôt", lambda: self._clone_repo(agent_info["repo"], install_path)), ("Environnement Python", lambda: self._setup_venv(install_path, agent_info.get("pip_deps", []))), ("Configuration", lambda: self._write_config(install_path, agent_info["config_template"])), ("Service systemd", lambda: self._setup_service(install_path, service_name, main_script)), ("Démarrage", lambda: self._start_service(service_name)), ] for step_name, step_fn in steps: self._cb(f"⏳ {step_name}...") try: ok, msg = step_fn() if not ok: self._cb(f"❌ {step_name} échoué : {msg}") self._disconnect() return False, f"Échec à l'étape '{step_name}' : {msg}" self._cb(f"✓ {step_name}") except Exception as e: self._disconnect() return False, f"Exception lors de '{step_name}' : {e}" self._disconnect() return True, ( f"Agent '{self.cfg.agent_name}' ({self.cfg.agent_type}) déployé avec succès sur {self.cfg.host}.\n" f"Service : {service_name}\n" f"Chemin : {install_path}\n" f"JID XMPP : {self.cfg.xmpp_jid}" ) def _deploy_from_git(self) -> tuple[bool, str]: """Déploie un agent depuis une URL git arbitraire (sans catalogue).""" install_path = f"/opt/{self.cfg.agent_name}" service_name = self.cfg.agent_name apt_deps = self.cfg.apt_deps or self._DEFAULT_APT_DEPS # Le main_script sera détecté après le clone si non fourni main_script_holder = [self.cfg.main_script] def detect_script(): if main_script_holder[0]: return True, main_script_holder[0] ok, result = self._detect_main_script(install_path) if ok: main_script_holder[0] = result return ok, result steps = [ ("Connexion SSH", lambda: self._connect()), ("Dépendances système", lambda: self._install_apt(apt_deps)), ("Clonage du dépôt", lambda: self._clone_repo(self.cfg.git_url, install_path)), ("Détection script", detect_script), ("Environnement Python", lambda: self._setup_venv(install_path, [])), ("Configuration", lambda: self._write_config(install_path, "custom")), ("Service systemd", lambda: self._setup_service(install_path, service_name, main_script_holder[0])), ("Démarrage", lambda: self._start_service(service_name)), ] for step_name, step_fn in steps: self._cb(f"⏳ {step_name}...") try: ok, msg = step_fn() if not ok: self._cb(f"❌ {step_name} échoué : {msg}") self._disconnect() return False, f"Échec à l'étape '{step_name}' : {msg}" self._cb(f"✓ {step_name}" + (f" → {msg}" if step_name == "Détection script" else "")) except Exception as e: self._disconnect() return False, f"Exception lors de '{step_name}' : {e}" self._disconnect() return True, ( f"Agent '{self.cfg.agent_name}' déployé depuis {self.cfg.git_url}\n" f"Script : {main_script_holder[0]}\n" f"Service : {service_name}\n" f"Chemin : {install_path}\n" f"JID XMPP : {self.cfg.xmpp_jid}" ) def _detect_main_script(self, install_path: str) -> tuple[bool, str]: """Détecte automatiquement le script principal dans le repo cloné.""" # Priorité : agent_*.py > main.py > premier .py avec __main__ cmd = ( f"cd {install_path} && (" f"ls agent_*.py 2>/dev/null | head -1 || " f"ls main.py 2>/dev/null || " f"grep -rl '__main__' *.py 2>/dev/null | head -1 || " f"ls *.py 2>/dev/null | grep -v 'setup.py\\|conf' | head -1" f")" ) ok, out = self._run_remote(cmd) script = out.strip().splitlines()[0].strip() if out.strip() else "" if not script: return False, "Aucun script Python trouvé à la racine du dépôt" return True, script # ────────────────────────────────────────────── # Étapes de déploiement # ────────────────────────────────────────────── def _connect(self) -> tuple[bool, str]: if self.cfg.local: return True, "local" try: import paramiko self._ssh = paramiko.SSHClient() self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) kwargs = dict( hostname=self.cfg.host, username=self.cfg.ssh_user, timeout=15, ) if self.cfg.ssh_auth == "password": kwargs["password"] = self.cfg.ssh_credential else: kwargs["key_filename"] = self.cfg.ssh_credential self._ssh.connect(**kwargs) return True, "ok" except Exception as e: return False, str(e) def _run_remote(self, cmd: str, timeout: int = 120) -> tuple[bool, str]: if self.cfg.local: import subprocess result = subprocess.run( cmd, shell=True, text=True, capture_output=True, timeout=timeout ) out = (result.stdout + result.stderr).strip() return result.returncode == 0, out else: stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout) exit_code = stdout.channel.recv_exit_status() out = stdout.read().decode() + stderr.read().decode() return exit_code == 0, out.strip() def _install_apt(self, packages: list) -> tuple[bool, str]: if not packages: return True, "aucune dépendance" pkgs = " ".join(packages) ok, out = self._run_remote( f"DEBIAN_FRONTEND=noninteractive apt-get install -y -q {pkgs}", timeout=300 ) return ok, out[-500:] if not ok else "ok" def _clone_repo(self, repo: str, install_path: str) -> tuple[bool, str]: cmd = ( f"if [ -d '{install_path}/.git' ]; then " f" cd {install_path} && git pull; " f"else " f" git clone {repo} {install_path}; " f"fi" ) return self._run_remote(cmd, timeout=120) def _setup_venv(self, install_path: str, extra_pip: list) -> tuple[bool, str]: venv = f"{install_path}/venv" cmds = [ f"python3 -m venv {venv}", f"{venv}/bin/pip install --quiet --upgrade pip", f"{venv}/bin/pip install --quiet -r {install_path}/requirements.txt", ] if extra_pip: cmds.append(f"{venv}/bin/pip install --quiet {' '.join(extra_pip)}") for cmd in cmds: ok, out = self._run_remote(cmd, timeout=180) if not ok: return False, out[-500:] return True, "ok" def _write_config(self, install_path: str, template: str) -> tuple[bool, str]: """Écrit config.json avec les paramètres fournis par l'utilisateur.""" # Pour un déploiement custom (from_git), pas d'infos catalogue if template == "custom": agent_info, catalog_extra = {}, {} else: # Récupère les extra_config du catalogue pour ce type d'agent agent_info = self.catalog.get(self.cfg.agent_type) or {} catalog_extra = agent_info.get("extra_config", {}) # Résout les templates dans les valeurs (ex: /opt/{agent_name}/config/catalog.json) resolved_extra = {} for k, v in catalog_extra.items(): if isinstance(v, str): resolved_extra[k] = v.format(agent_name=self.cfg.agent_name) else: resolved_extra[k] = v config = { "agent_id": self.cfg.agent_name, "xmpp": { "jid": self.cfg.xmpp_jid, "password": self.cfg.xmpp_password, "admin_jids": self.cfg.extra.get("admin_jids", [self.cfg.extra.get("admin_jid", "")]), "muc_room": self.cfg.extra.get("muc_room", "agents@muc.xmpp.ovh"), "use_omemo": False, }, "mqtt": { "host": self.cfg.mqtt_host, "port": self.cfg.mqtt_port, "username": self.cfg.extra.get("mqtt_username"), "password": self.cfg.extra.get("mqtt_password"), "tls": self.cfg.extra.get("mqtt_tls", False), }, "llm": { "base_url": self.cfg.extra.get("ollama_url", "http://localhost:11434"), "model": self.cfg.extra.get("model", "mistral"), "temperature": 0.3, }, "queue_db": f"{install_path}/data/queue.db", "system_prompt": f"{install_path}/config/system_prompt.txt", } # Fusionne les extra_config du catalogue config.update(resolved_extra) # Les extra de l'utilisateur ont priorité for k, v in self.cfg.extra.items(): if k not in ("admin_jid", "admin_jids", "muc_room", "mqtt_username", "mqtt_password", "mqtt_tls", "ollama_url", "model"): config[k] = v config_json = json.dumps(config, indent=2, ensure_ascii=False) # Écrit via heredoc pour éviter les problèmes d'échappement cmd = ( f"mkdir -p {install_path}/config {install_path}/data && " f"cat > {install_path}/config/config.json << 'DEPLOYEOF'\n" f"{config_json}\n" f"DEPLOYEOF" ) return self._run_remote(cmd) def _setup_service(self, install_path: str, service_name: str, main_script: str) -> tuple[bool, str]: service_content = f"""[Unit] Description=Agent {service_name} After=network.target mosquitto.service Wants=mosquitto.service [Service] Type=simple User=root WorkingDirectory={install_path} ExecStart={install_path}/venv/bin/python {install_path}/{main_script} Restart=always RestartSec=5 StandardOutput=journal StandardError=journal SyslogIdentifier={service_name} [Install] WantedBy=multi-user.target """ cmd = ( f"cat > /etc/systemd/system/{service_name}.service << 'SVCEOF'\n" f"{service_content}\n" f"SVCEOF\n" f"systemctl daemon-reload && " f"systemctl enable {service_name}" ) return self._run_remote(cmd) def _start_service(self, service_name: str) -> tuple[bool, str]: ok, out = self._run_remote(f"systemctl start {service_name}") if not ok: return False, out time.sleep(3) ok2, status = self._run_remote(f"systemctl is-active {service_name}") return status.strip() == "active", status def _disconnect(self): if self._ssh: try: self._ssh.close() except Exception: pass self._ssh = None