Initial commit — agent_deploy v2.0

This commit is contained in:
2026-03-09 09:01:34 +00:00
commit f2da35e072
10 changed files with 768 additions and 0 deletions
+10
View File
@@ -0,0 +1,10 @@
venv/
__pycache__/
*.pyc
*.pyo
*.db
*.log
data/
*.egg-info/
.vault_pass
config/config.json
+66
View File
@@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""
Agent Deploy — déploiement d'agents sur machines distantes ou locales via SSH.
Le nom de l'agent cible est choisi par l'utilisateur au déploiement.
"""
import os
import sys
import logging
sys.path.insert(0, "/opt")
from agents_core import BaseAgent, AgentContext, Message, MessageType
logger = logging.getLogger(__name__)
class AgentDeploy(BaseAgent):
AGENT_TYPE = "deploy"
DESCRIPTION = (
"Déploiement d'agents sur machines distantes ou locales via SSH. "
"Installe, configure et enregistre de nouveaux agents dans le système."
)
DEFAULT_CONFIG_PATH = "/opt/agent_deploy/config/config.json"
def get_skills_dir(self) -> str:
return os.path.join(os.path.dirname(__file__), "skills")
def on_start(self):
self.mqtt.send_to("nexus", f"Agent Deploy ({self.agent_id}) en ligne.")
def setup_extra_subscriptions(self):
self.mqtt.subscribe(
f"agents/{self.agent_id}/control",
self._on_control,
)
def _on_control(self, msg, topic: str):
from agents_core.message_bus import Message as Msg
payload = msg.payload if isinstance(msg, Msg) else str(msg)
result = self._handle_system_command(payload)
if result and isinstance(msg, Msg):
self.mqtt.reply(msg, result)
def handle_custom_command(self, cmd: str, args: str, source_msg=None):
if cmd == "report":
return self._build_report()
if cmd == "catalog":
from deployer import AgentCatalog
catalog = AgentCatalog(
self.config.get("catalog", "/opt/agent_deploy/config/catalog.json")
)
return catalog.summary()
return f"Commande inconnue : /{cmd}"
def _build_report(self) -> str:
stats = self.queue.daily_stats()
return (
f"── Rapport {self.agent_id} ──\n"
f"Déploiements : {stats['total']} total / "
f"{stats['completed']} OK / {stats['failed']} erreurs / "
f"durée moy. {stats['avg_duration_s']}s"
)
if __name__ == "__main__":
AgentDeploy().run()
+18
View File
@@ -0,0 +1,18 @@
[Unit]
Description=Agent Deploy — Déploiement d'agents
After=network.target mosquitto.service
Wants=mosquitto.service
[Service]
Type=simple
User=root
WorkingDirectory=/opt/agent_deploy
ExecStart=/opt/agent_deploy/venv/bin/python /opt/agent_deploy/agent_deploy.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
SyslogIdentifier=agent-deploy
[Install]
WantedBy=multi-user.target
+35
View File
@@ -0,0 +1,35 @@
Tu es un agent de déploiement. Tu installes et configures de nouveaux agents sur des machines distantes ou locales.
## Workflow de déploiement
Quand l'utilisateur veut déployer un agent, collecte ces informations :
1. **Type d'agent** : debian, ansible, deploy (ou autre si dans le catalogue)
2. **Nom** : nom unique choisi par l'utilisateur — ce sera son identifiant permanent (@nom dans les commandes)
3. **Cible** : IP ou hostname (ou "local" pour la machine courante)
4. **SSH** : utilisateur + méthode (password ou clé) + credential
5. **XMPP** : JID et mot de passe pour le nouvel agent
6. **MQTT** : adresse du broker
Une fois toutes les infos collectées, lance le déploiement avec SKILL:deploy.
## Skills disponibles
- **deploy** : lancer un déploiement complet (SSH ou local)
- **ssh** : exécuter une commande sur une machine distante
- **catalog** : gérer le catalogue des types d'agents déployables
- **mqtt_send** : communiquer avec d'autres agents
## Exemple de déploiement
Utilisateur : "Déploie un agent debian sur 192.168.1.50, nom 'debian-srv2'"
Tu collectes les infos manquantes (user SSH, password, JID XMPP...) puis :
SKILL:deploy ARGS:start debian debian-srv2 192.168.1.50 root password MonMDP debian-srv2@xmpp.ovh MonXMPP localhost
## Règles
- Le nom choisi est définitif — il sera utilisé pour appeler l'agent (@nom)
- Vérifie toujours la connectivité SSH avant de lancer un déploiement complet
- Les notifications de progression sont envoyées en temps réel à Nexus
- Réponds en français
- Demande confirmation avant de lancer un déploiement sur une machine de production
+341
View File
@@ -0,0 +1,341 @@
"""
Moteur de déploiement SSH — installe un agent sur une machine distante ou locale.
Le nom de l'agent est fourni par l'utilisateur et devient son identifiant permanent.
"""
import json
import logging
import os
import socket
import time
from dataclasses import dataclass, field
from typing import Optional, Callable
logger = logging.getLogger(__name__)
CATALOG_PATH = "/opt/agent_deploy/config/catalog.json"
@dataclass
class DeployConfig:
"""Paramètres d'un déploiement."""
agent_type: str # type d'agent : debian, ansible, deploy...
agent_name: str # nom choisi par l'utilisateur (ex: "debian-prod")
host: str # IP ou hostname cible
ssh_user: str # utilisateur SSH
ssh_auth: str # "password" ou "key"
ssh_credential: str # mot de passe ou chemin de clé privée
xmpp_jid: str
xmpp_password: str
mqtt_host: str
mqtt_port: int = 1883
local: bool = False # installation locale (pas de SSH)
extra: dict = field(default_factory=dict)
class AgentCatalog:
"""Catalogue des types d'agents déployables."""
def __init__(self, path: str = CATALOG_PATH):
self.path = path
self._data = self._load()
def _load(self) -> dict:
if not os.path.exists(self.path):
return self._default_catalog()
with open(self.path) as f:
return json.load(f)
def _default_catalog(self) -> dict:
return {
"nexus": {
"description": "Orchestrateur principal — coordonne tous les agents",
"repo": "https://github.com/youruser/nexus.git",
"install_path_tpl": "/opt/{agent_name}",
"service_name_tpl": "{agent_name}",
"main_script": "nexus.py",
"apt_deps": ["python3", "python3-pip", "python3-venv", "git", "mosquitto-clients"],
"pip_deps": ["apscheduler", "duckduckgo-search", "beautifulsoup4"],
"config_template": "nexus",
"extra_config": {
"schedules": {"scheduled_tasks": [], "daily_reports": []}
},
},
"debian": {
"description": "Agent d'administration système Debian",
"repo": "https://github.com/youruser/agent_debian.git",
"install_path_tpl": "/opt/{agent_name}",
"service_name_tpl": "{agent_name}",
"main_script": "agent_debian.py",
"apt_deps": ["python3", "python3-pip", "python3-venv", "git", "mosquitto-clients"],
"pip_deps": [],
"config_template": "debian",
"extra_config": {"work_hours": "07:00-23:00"},
},
"ansible": {
"description": "Agent d'automatisation Ansible",
"repo": "https://github.com/youruser/agent_ansible.git",
"install_path_tpl": "/opt/{agent_name}",
"service_name_tpl": "{agent_name}",
"main_script": "agent_ansible.py",
"apt_deps": ["python3", "python3-pip", "python3-venv", "git", "ansible", "mosquitto-clients"],
"pip_deps": [],
"config_template": "ansible",
"extra_config": {"work_hours": "07:00-23:00"},
},
"deploy": {
"description": "Agent de déploiement d'autres agents",
"repo": "https://github.com/youruser/agent_deploy.git",
"install_path_tpl": "/opt/{agent_name}",
"service_name_tpl": "{agent_name}",
"main_script": "agent_deploy.py",
"apt_deps": ["python3", "python3-pip", "python3-venv", "git", "python3-paramiko", "mosquitto-clients", "sshpass"],
"pip_deps": ["paramiko"],
"config_template": "deploy",
"extra_config": {
"work_hours": "08:00-20:00",
"catalog": "/opt/{agent_name}/config/catalog.json"
},
},
}
def get(self, agent_type: str) -> Optional[dict]:
return self._data.get(agent_type)
def list_types(self) -> list[str]:
return list(self._data.keys())
def summary(self) -> str:
lines = ["── Agents déployables ──────────────"]
for name, info in self._data.items():
lines.append(f" [{name}] {info['description']}")
return "\n".join(lines)
class Deployer:
"""
Orchestre le déploiement complet d'un agent.
Supporte SSH (distant) et local.
"""
def __init__(self, config: DeployConfig, progress_cb: Optional[Callable] = None):
self.cfg = config
self.catalog = AgentCatalog()
self._cb = progress_cb or (lambda msg: logger.info(msg))
self._ssh = None
def deploy(self) -> tuple[bool, str]:
"""Lance le déploiement. Retourne (succès, message)."""
agent_info = self.catalog.get(self.cfg.agent_type)
if not agent_info:
return False, f"Type d'agent inconnu : '{self.cfg.agent_type}'. Disponibles : {self.catalog.list_types()}"
install_path = agent_info["install_path_tpl"].format(agent_name=self.cfg.agent_name)
service_name = agent_info["service_name_tpl"].format(agent_name=self.cfg.agent_name)
main_script = agent_info["main_script"]
steps = [
("Connexion SSH", lambda: self._connect()),
("Dépendances système", lambda: self._install_apt(agent_info["apt_deps"])),
("Clonage du dépôt", lambda: self._clone_repo(agent_info["repo"], install_path)),
("Environnement Python", lambda: self._setup_venv(install_path, agent_info.get("pip_deps", []))),
("Configuration", lambda: self._write_config(install_path, agent_info["config_template"])),
("Service systemd", lambda: self._setup_service(install_path, service_name, main_script)),
("Démarrage", lambda: self._start_service(service_name)),
]
for step_name, step_fn in steps:
self._cb(f"{step_name}...")
try:
ok, msg = step_fn()
if not ok:
self._cb(f"{step_name} échoué : {msg}")
self._disconnect()
return False, f"Échec à l'étape '{step_name}' : {msg}"
self._cb(f"{step_name}")
except Exception as e:
self._disconnect()
return False, f"Exception lors de '{step_name}' : {e}"
self._disconnect()
return True, (
f"Agent '{self.cfg.agent_name}' ({self.cfg.agent_type}) déployé avec succès sur {self.cfg.host}.\n"
f"Service : {service_name}\n"
f"Chemin : {install_path}\n"
f"JID XMPP : {self.cfg.xmpp_jid}"
)
# ──────────────────────────────────────────────
# Étapes de déploiement
# ──────────────────────────────────────────────
def _connect(self) -> tuple[bool, str]:
if self.cfg.local:
return True, "local"
try:
import paramiko
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
kwargs = dict(
hostname=self.cfg.host,
username=self.cfg.ssh_user,
timeout=15,
)
if self.cfg.ssh_auth == "password":
kwargs["password"] = self.cfg.ssh_credential
else:
kwargs["key_filename"] = self.cfg.ssh_credential
self._ssh.connect(**kwargs)
return True, "ok"
except Exception as e:
return False, str(e)
def _run_remote(self, cmd: str, timeout: int = 120) -> tuple[bool, str]:
if self.cfg.local:
import subprocess
result = subprocess.run(
cmd, shell=True, text=True,
capture_output=True, timeout=timeout
)
out = (result.stdout + result.stderr).strip()
return result.returncode == 0, out
else:
stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
out = stdout.read().decode() + stderr.read().decode()
return exit_code == 0, out.strip()
def _install_apt(self, packages: list) -> tuple[bool, str]:
if not packages:
return True, "aucune dépendance"
pkgs = " ".join(packages)
ok, out = self._run_remote(
f"DEBIAN_FRONTEND=noninteractive apt-get install -y -q {pkgs}",
timeout=300
)
return ok, out[-500:] if not ok else "ok"
def _clone_repo(self, repo: str, install_path: str) -> tuple[bool, str]:
cmd = (
f"if [ -d '{install_path}/.git' ]; then "
f" cd {install_path} && git pull; "
f"else "
f" git clone {repo} {install_path}; "
f"fi"
)
return self._run_remote(cmd, timeout=120)
def _setup_venv(self, install_path: str, extra_pip: list) -> tuple[bool, str]:
venv = f"{install_path}/venv"
cmds = [
f"python3 -m venv {venv}",
f"{venv}/bin/pip install --quiet --upgrade pip",
f"{venv}/bin/pip install --quiet -r {install_path}/requirements.txt",
]
if extra_pip:
cmds.append(f"{venv}/bin/pip install --quiet {' '.join(extra_pip)}")
for cmd in cmds:
ok, out = self._run_remote(cmd, timeout=180)
if not ok:
return False, out[-500:]
return True, "ok"
def _write_config(self, install_path: str, template: str) -> tuple[bool, str]:
"""Écrit config.json avec les paramètres fournis par l'utilisateur."""
# Récupère les extra_config du catalogue pour ce type d'agent
agent_info = self.catalog.get(self.cfg.agent_type) or {}
catalog_extra = agent_info.get("extra_config", {})
# Résout les templates dans les valeurs (ex: /opt/{agent_name}/config/catalog.json)
resolved_extra = {}
for k, v in catalog_extra.items():
if isinstance(v, str):
resolved_extra[k] = v.format(agent_name=self.cfg.agent_name)
else:
resolved_extra[k] = v
config = {
"agent_id": self.cfg.agent_name,
"xmpp": {
"jid": self.cfg.xmpp_jid,
"password": self.cfg.xmpp_password,
"admin_jids": self.cfg.extra.get("admin_jids", [self.cfg.extra.get("admin_jid", "")]),
"muc_room": self.cfg.extra.get("muc_room", "agents@conference.xmpp.ovh"),
"use_omemo": False,
},
"mqtt": {
"host": self.cfg.mqtt_host,
"port": self.cfg.mqtt_port,
"username": self.cfg.extra.get("mqtt_username"),
"password": self.cfg.extra.get("mqtt_password"),
"tls": self.cfg.extra.get("mqtt_tls", False),
},
"llm": {
"base_url": self.cfg.extra.get("ollama_url", "http://localhost:11434"),
"model": self.cfg.extra.get("model", "mistral"),
"temperature": 0.3,
},
"queue_db": f"{install_path}/data/queue.db",
"system_prompt": f"{install_path}/config/system_prompt.txt",
}
# Fusionne les extra_config du catalogue
config.update(resolved_extra)
# Les extra de l'utilisateur ont priorité
for k, v in self.cfg.extra.items():
if k not in ("admin_jid", "admin_jids", "muc_room", "mqtt_username",
"mqtt_password", "mqtt_tls", "ollama_url", "model"):
config[k] = v
config_json = json.dumps(config, indent=2, ensure_ascii=False)
# Écrit via heredoc pour éviter les problèmes d'échappement
cmd = (
f"mkdir -p {install_path}/config {install_path}/data && "
f"cat > {install_path}/config/config.json << 'DEPLOYEOF'\n"
f"{config_json}\n"
f"DEPLOYEOF"
)
return self._run_remote(cmd)
def _setup_service(self, install_path: str, service_name: str, main_script: str) -> tuple[bool, str]:
service_content = f"""[Unit]
Description=Agent {service_name}
After=network.target mosquitto.service
Wants=mosquitto.service
[Service]
Type=simple
User=root
WorkingDirectory={install_path}
ExecStart={install_path}/venv/bin/python {install_path}/{main_script}
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
SyslogIdentifier={service_name}
[Install]
WantedBy=multi-user.target
"""
cmd = (
f"cat > /etc/systemd/system/{service_name}.service << 'SVCEOF'\n"
f"{service_content}\n"
f"SVCEOF\n"
f"systemctl daemon-reload && "
f"systemctl enable {service_name}"
)
return self._run_remote(cmd)
def _start_service(self, service_name: str) -> tuple[bool, str]:
ok, out = self._run_remote(f"systemctl start {service_name}")
if not ok:
return False, out
time.sleep(3)
ok2, status = self._run_remote(f"systemctl is-active {service_name}")
return status.strip() == "active", status
def _disconnect(self):
if self._ssh:
try:
self._ssh.close()
except Exception:
pass
self._ssh = None
+3
View File
@@ -0,0 +1,3 @@
agents_core @ file:///opt/agents_core
paramiko>=3.0
requests>=2.28
+71
View File
@@ -0,0 +1,71 @@
"""
Skill CATALOG — gérer le catalogue des agents déployables.
Usage LLM :
SKILL:catalog ARGS:list
SKILL:catalog ARGS:show <type>
SKILL:catalog ARGS:add <type> | <json_config>
SKILL:catalog ARGS:remove <type>
"""
import json
import os
import sys
sys.path.insert(0, "/opt/agent_deploy")
from deployer import AgentCatalog, CATALOG_PATH
DESCRIPTION = "Gérer le catalogue des types d'agents déployables"
USAGE = "SKILL:catalog ARGS:list | show <type> | add <type>|<json> | remove <type>"
def run(args: str, context) -> str:
parts = args.strip().split(None, 1)
action = parts[0].lower() if parts else "list"
rest = parts[1] if len(parts) > 1 else ""
catalog = AgentCatalog()
if action == "list":
return catalog.summary()
if action == "show":
agent_type = rest.strip()
info = catalog.get(agent_type)
if not info:
return f"Type '{agent_type}' inconnu."
return json.dumps(info, indent=2, ensure_ascii=False)
if action == "add":
if "|" not in rest:
return "Format : add <type> | <json_config>"
agent_type, config_json = rest.split("|", 1)
agent_type = agent_type.strip()
try:
config = json.loads(config_json.strip())
# Charge le fichier existant et ajoute
if os.path.exists(CATALOG_PATH):
with open(CATALOG_PATH) as f:
data = json.load(f)
else:
data = catalog._default_catalog()
data[agent_type] = config
with open(CATALOG_PATH, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return f"Type '{agent_type}' ajouté au catalogue."
except json.JSONDecodeError as e:
return f"JSON invalide : {e}"
if action == "remove":
agent_type = rest.strip()
if os.path.exists(CATALOG_PATH):
with open(CATALOG_PATH) as f:
data = json.load(f)
if agent_type in data:
del data[agent_type]
with open(CATALOG_PATH, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return f"Type '{agent_type}' supprimé du catalogue."
return f"Type '{agent_type}' introuvable."
return "Action inconnue. Disponible : list, show, add, remove"
+125
View File
@@ -0,0 +1,125 @@
"""
Skill DEPLOY — déployer un agent sur une machine distante ou locale.
Le LLM collecte les paramètres via une conversation guidée, puis lance le déploiement.
Usage LLM :
SKILL:deploy ARGS:start <type> <nom> <host> <user> <auth:password|key> <credential> <xmpp_jid> <xmpp_pass> <mqtt_host>
SKILL:deploy ARGS:local <type> <nom> <xmpp_jid> <xmpp_pass> <mqtt_host>
SKILL:deploy ARGS:catalog
SKILL:deploy ARGS:status <host>
"""
import json
import os
import sys
sys.path.insert(0, "/opt/agent_deploy")
from deployer import Deployer, DeployConfig, AgentCatalog
DESCRIPTION = "Déployer un agent sur une machine distante (SSH) ou locale"
USAGE = (
"SKILL:deploy ARGS:catalog — liste les agents déployables\n"
"SKILL:deploy ARGS:start <type> <nom> <host> <user> password <mdp> <xmpp_jid> <xmpp_pass> <mqtt_host>\n"
"SKILL:deploy ARGS:local <type> <nom> <xmpp_jid> <xmpp_pass> <mqtt_host>"
)
def run(args: str, context) -> str:
parts = args.strip().split(None, 1)
action = parts[0].lower() if parts else "catalog"
rest = parts[1] if len(parts) > 1 else ""
if action == "catalog":
catalog = AgentCatalog()
return catalog.summary()
if action == "local":
# ARGS: local <type> <nom> <xmpp_jid> <xmpp_pass> <mqtt_host>
p = rest.split()
if len(p) < 5:
return "Format : local <type> <nom> <xmpp_jid> <xmpp_pass> <mqtt_host>"
agent_type, agent_name, xmpp_jid, xmpp_pass, mqtt_host = p[0], p[1], p[2], p[3], p[4]
cfg = DeployConfig(
agent_type=agent_type,
agent_name=agent_name,
host="localhost",
ssh_user="root",
ssh_auth="",
ssh_credential="",
xmpp_jid=xmpp_jid,
xmpp_password=xmpp_pass,
mqtt_host=mqtt_host,
local=True,
)
return _do_deploy(cfg, context)
if action == "start":
# ARGS: start <type> <nom> <host> <user> <auth> <credential> <xmpp_jid> <xmpp_pass> <mqtt_host>
p = rest.split()
if len(p) < 9:
return (
"Format : start <type> <nom> <host> <user> password|key <credential> "
"<xmpp_jid> <xmpp_pass> <mqtt_host>"
)
cfg = DeployConfig(
agent_type=p[0],
agent_name=p[1],
host=p[2],
ssh_user=p[3],
ssh_auth=p[4],
ssh_credential=p[5],
xmpp_jid=p[6],
xmpp_password=p[7],
mqtt_host=p[8],
mqtt_port=int(p[9]) if len(p) > 9 else 1883,
)
return _do_deploy(cfg, context)
if action == "status":
host = rest.strip()
if not host:
return "Précise l'hôte."
import subprocess
try:
result = subprocess.run(
f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no {host} 'systemctl list-units --type=service --state=active | grep agent'",
shell=True, text=True, capture_output=True, timeout=15
)
return result.stdout.strip() or "Aucun agent actif trouvé sur cet hôte."
except Exception as e:
return str(e)
return "Action inconnue. Disponible : catalog, start, local, status"
def _do_deploy(cfg: DeployConfig, context) -> str:
"""Lance le déploiement et notifie via MQTT."""
messages = []
def progress(msg: str):
messages.append(msg)
# Notifie en temps réel via MQTT
context.mqtt.send_to(
"nexus",
f"[Deploy {cfg.agent_name}] {msg}",
msg_type="result",
)
deployer = Deployer(cfg, progress_cb=progress)
success, result = deployer.deploy()
# Notification finale
if success:
# Notifie Nexus de l'enregistrement du nouvel agent
registration = json.dumps({
"agent_id": cfg.agent_name,
"agent_type": cfg.agent_type,
"host": cfg.host,
"xmpp_jid": cfg.xmpp_jid,
"mqtt_inbox": f"agents/{cfg.agent_name}/inbox",
})
context.mqtt.publish_raw("agents/nexus/inbox", registration)
return result
+13
View File
@@ -0,0 +1,13 @@
"""
Skill MQTT_SEND — publier sur un topic MQTT.
"""
DESCRIPTION = "Publier un message sur un topic MQTT (communication inter-agents)"
USAGE = "SKILL:mqtt_send ARGS:<topic> | <message>"
def run(args: str, context) -> str:
if "|" not in args:
return "Format : SKILL:mqtt_send ARGS:<topic> | <message>"
topic, message = args.split("|", 1)
context.mqtt.publish_raw(topic.strip(), message.strip())
return f"Message publié sur '{topic.strip()}'."
+86
View File
@@ -0,0 +1,86 @@
"""
Skill SSH — exécuter des commandes sur une machine distante via SSH.
Usage LLM :
SKILL:ssh ARGS:<host> <user> password <mdp> | <commande>
SKILL:ssh ARGS:<host> <user> key <chemin_cle> | <commande>
SKILL:ssh ARGS:copy <host> <user> password <mdp> | <fichier_local> <chemin_distant>
"""
import subprocess
DESCRIPTION = "Exécuter des commandes SSH sur une machine distante"
USAGE = "SKILL:ssh ARGS:<host> <user> password <mdp> | <commande> ou key <chemin_cle> | <commande>"
def _ssh_cmd(host: str, user: str, auth: str, credential: str, cmd: str, timeout: int = 30) -> str:
if auth == "password":
full_cmd = (
f"sshpass -p '{credential}' ssh "
f"-o StrictHostKeyChecking=no "
f"-o ConnectTimeout=10 "
f"{user}@{host} '{cmd}'"
)
else:
full_cmd = (
f"ssh -i {credential} "
f"-o StrictHostKeyChecking=no "
f"-o ConnectTimeout=10 "
f"{user}@{host} '{cmd}'"
)
try:
result = subprocess.run(
full_cmd, shell=True, text=True,
capture_output=True, timeout=timeout
)
out = (result.stdout + result.stderr).strip()
return out[:4000] if out else f"(code retour : {result.returncode})"
except subprocess.TimeoutExpired:
return f"Timeout ({timeout}s)"
except Exception as e:
return str(e)
def run(args: str, context) -> str:
if "|" not in args:
return "Format : <host> <user> password|key <credential> | <commande>"
left, command = args.split("|", 1)
command = command.strip()
parts = left.strip().split()
if len(parts) < 4:
return "Format : <host> <user> password|key <credential> | <commande>"
host, user, auth, credential = parts[0], parts[1], parts[2], parts[3]
if auth not in ("password", "key"):
return "Auth doit être 'password' ou 'key'"
# Sous-commande copy
if command.startswith("COPY "):
file_parts = command[5:].split()
if len(file_parts) < 2:
return "Format copy : COPY <fichier_local> <chemin_distant>"
local_file, remote_path = file_parts[0], file_parts[1]
if auth == "password":
scp_cmd = (
f"sshpass -p '{credential}' scp "
f"-o StrictHostKeyChecking=no "
f"{local_file} {user}@{host}:{remote_path}"
)
else:
scp_cmd = (
f"scp -i {credential} "
f"-o StrictHostKeyChecking=no "
f"{local_file} {user}@{host}:{remote_path}"
)
try:
result = subprocess.run(
scp_cmd, shell=True, text=True,
capture_output=True, timeout=60
)
return (result.stdout + result.stderr).strip() or f"Fichier copié vers {host}:{remote_path}"
except Exception as e:
return str(e)
return _ssh_cmd(host, user, auth, credential, command)