diff --git a/skills/script.py b/skills/script.py index 9baf4f0..bd4e4f2 100644 --- a/skills/script.py +++ b/skills/script.py @@ -1,132 +1,179 @@ """ -Skill SCRIPT — créer et exécuter un script bash, avec renvoi du résultat via MQTT. +Skill SCRIPT — bibliothèque de scripts bash par agent. + +Chaque agent dispose de son propre dossier scripts/ (configurable via +"scripts_dir" dans config.json, sinon /opt//scripts). L'environnement du script expose automatiquement : - MQTT_BROKER, MQTT_REPLY_TOPIC, AGENT_ID + MQTT_BROKER, MQTT_PORT, MQTT_REPLY_TOPIC, AGENT_ID, SCRIPTS_DIR Ainsi un script peut publier son résultat directement : mosquitto_pub -h $MQTT_BROKER -t $MQTT_REPLY_TOPIC -m "mon résultat" Usage LLM : - SKILL:script ARGS:run | - SKILL:script ARGS:save | - SKILL:script ARGS:exec [args] SKILL:script ARGS:list SKILL:script ARGS:show + SKILL:script ARGS:save | + SKILL:script ARGS:exec [args...] + SKILL:script ARGS:run | SKILL:script ARGS:delete """ import os -import subprocess import stat +import subprocess +import tempfile -DESCRIPTION = "Créer/exécuter des scripts bash avec renvoi du résultat via MQTT" -USAGE = "SKILL:script ARGS:run| | save | | exec | list | show " - -SCRIPTS_DIR = "/opt/agent_debian/scripts" +DESCRIPTION = "Bibliothèque de scripts bash : sauvegarder, lister, afficher, exécuter" +USAGE = ( + "SKILL:script ARGS:list\n" + "SKILL:script ARGS:show \n" + "SKILL:script ARGS:save | \n" + "SKILL:script ARGS:exec [args]\n" + "SKILL:script ARGS:run | \n" + "SKILL:script ARGS:delete " +) -def _ensure_dir(): - os.makedirs(SCRIPTS_DIR, exist_ok=True) +def _scripts_dir(context) -> str: + """Détermine le répertoire scripts de cet agent.""" + if context.config.get("scripts_dir"): + return context.config["scripts_dir"] + queue_db = context.config.get("queue_db", "") + if queue_db: + install = os.path.dirname(os.path.dirname(queue_db)) + return os.path.join(install, "scripts") + return f"/opt/{context.agent_id}/scripts" -def _run(cmd: str, env: dict = None, timeout: int = 60) -> str: - try: - result = subprocess.run( - cmd, shell=True, text=True, - capture_output=True, timeout=timeout, - env=env, executable="/bin/bash" - ) - out = (result.stdout + result.stderr).strip() - if len(out) > 4000: - out = out[:4000] + "\n... [tronqué]" - return out or f"(code retour : {result.returncode})" - except subprocess.TimeoutExpired: - return f"Timeout ({timeout}s)" - except Exception as e: - return str(e) +def _ensure_dir(context) -> str: + d = _scripts_dir(context) + os.makedirs(d, exist_ok=True) + return d -def _build_env(context) -> dict: - """Environnement injecté dans chaque script.""" +def _safe_name(name: str) -> str: + """Empêche les traversées de répertoire.""" + return os.path.basename(name.strip().replace("/", "_")) + + +def _build_env(context, scripts_dir: str) -> dict: env = os.environ.copy() mc = context.config.get("mqtt", {}) env["MQTT_BROKER"] = mc.get("host", "localhost") env["MQTT_PORT"] = str(mc.get("port", 1883)) env["MQTT_REPLY_TOPIC"] = "agents/nexus/inbox" env["AGENT_ID"] = context.agent_id + env["SCRIPTS_DIR"] = scripts_dir return env +def _run_script(cmd: str, env: dict, timeout: int = 120) -> str: + try: + result = subprocess.run( + cmd, shell=True, text=True, + capture_output=True, timeout=timeout, + env=env, executable="/bin/bash", + ) + out = (result.stdout + result.stderr).strip() + if len(out) > 4000: + out = out[:4000] + "\n... [tronqué]" + return out or f"(code retour : {result.returncode})" + except subprocess.TimeoutExpired: + return f"Timeout ({timeout}s dépassé)" + except Exception as e: + return str(e) + + def run(args: str, context) -> str: - _ensure_dir() - parts = args.strip().split(None, 1) - action = parts[0].lower() if parts else "run" + parts = args.strip().split(None, 1) + action = parts[0].lower() if parts else "list" rest = parts[1] if len(parts) > 1 else "" + # ── list ────────────────────────────────────────────────────────────── + if action == "list": + d = _ensure_dir(context) + files = sorted(f for f in os.listdir(d) if f.endswith(".sh")) + if not files: + return f"Aucun script dans {d}" + lines = [f"Scripts disponibles ({d}) :"] + for f in files: + path = os.path.join(d, f) + size = os.path.getsize(path) + lines.append(f" {f[:-3]:30s} ({size} octets)") + return "\n".join(lines) + + # ── show ────────────────────────────────────────────────────────────── + if action == "show": + name = _safe_name(rest) + if not name: + return "Précise le nom du script." + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + if not os.path.exists(path): + return f"Script '{name}' introuvable dans {d}" + with open(path) as f: + content = f.read() + return f"── {name}.sh ──\n{content}" + + # ── save ────────────────────────────────────────────────────────────── + if action == "save": + if "|" not in rest: + return "Format : save | " + name_raw, content = rest.split("|", 1) + name = _safe_name(name_raw) + content = content.strip().replace("\\n", "\n") + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + existed = os.path.exists(path) + with open(path, "w") as f: + if not content.startswith("#!"): + f.write("#!/bin/bash\n") + f.write(content + "\n") + os.chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH) + verb = "mis à jour" if existed else "créé" + return f"Script '{name}' {verb} : {path}" + + # ── exec ────────────────────────────────────────────────────────────── + if action == "exec": + parts2 = rest.split(None, 1) + name = _safe_name(parts2[0]) if parts2 else "" + sargs = parts2[1] if len(parts2) > 1 else "" + if not name: + return "Précise le nom du script." + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + if not os.path.exists(path): + return f"Script '{name}' introuvable. Utilise 'list' pour voir les scripts disponibles." + env = _build_env(context, d) + return _run_script(f'"{path}" {sargs}', env=env, timeout=120) + + # ── run (inline) ────────────────────────────────────────────────────── if action == "run": - # Exécution directe d'un script inline if not rest: return "Précise le contenu du script." + d = _ensure_dir(context) content = rest.replace("\\n", "\n") - # Fichier temporaire - import tempfile with tempfile.NamedTemporaryFile( mode="w", suffix=".sh", delete=False, dir="/tmp" ) as f: f.write("#!/bin/bash\nset -e\n" + content) tmpfile = f.name os.chmod(tmpfile, stat.S_IRWXU) - env = _build_env(context) - out = _run(tmpfile, env=env, timeout=60) + env = _build_env(context, d) + out = _run_script(tmpfile, env=env, timeout=60) os.unlink(tmpfile) return out - if action == "save": - if "|" not in rest: - return "Format : save | " - name, content = rest.split("|", 1) - name = name.strip().replace("/", "_") # Sécurité - content = content.strip().replace("\\n", "\n") - path = os.path.join(SCRIPTS_DIR, name + ".sh") - with open(path, "w") as f: - f.write("#!/bin/bash\n" + content) - os.chmod(path, stat.S_IRWXU) - return f"Script sauvegardé : {path}" - - if action == "exec": - parts2 = rest.split(None, 1) - name = parts2[0] if parts2 else "" - sargs = parts2[1] if len(parts2) > 1 else "" - if not name: - return "Précise le nom du script." - path = os.path.join(SCRIPTS_DIR, name + ".sh") - if not os.path.exists(path): - return f"Script '{name}' introuvable dans {SCRIPTS_DIR}" - env = _build_env(context) - return _run(f"{path} {sargs}", env=env, timeout=120) - - if action == "list": - files = [f for f in os.listdir(SCRIPTS_DIR) if f.endswith(".sh")] - return "\n".join(files) if files else "Aucun script sauvegardé." - - if action == "show": - name = rest.strip() - if not name: - return "Précise le nom du script." - path = os.path.join(SCRIPTS_DIR, name + ".sh") - if not os.path.exists(path): - return f"Script '{name}' introuvable." - with open(path) as f: - return f.read() - + # ── delete ──────────────────────────────────────────────────────────── if action == "delete": - name = rest.strip() + name = _safe_name(rest) if not name: return "Précise le nom du script." - path = os.path.join(SCRIPTS_DIR, name + ".sh") - if os.path.exists(path): - os.unlink(path) - return f"Script '{name}' supprimé." - return f"Script '{name}' introuvable." + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + if not os.path.exists(path): + return f"Script '{name}' introuvable dans {d}" + os.unlink(path) + return f"Script '{name}' supprimé." - return "Action inconnue. Disponible : run, save, exec, list, show, delete" + return "Action inconnue. Disponible : list, show, save, exec, run, delete"