""" Skill SCRIPT — bibliothèque de scripts bash par agent. Chaque agent dispose de son propre dossier scripts/ (configurable via "scripts_dir" dans config.json, sinon /opt//scripts). L'environnement du script expose automatiquement : MQTT_BROKER, MQTT_PORT, MQTT_REPLY_TOPIC, AGENT_ID, SCRIPTS_DIR Ainsi un script peut publier son résultat directement : mosquitto_pub -h $MQTT_BROKER -t $MQTT_REPLY_TOPIC -m "mon résultat" Usage LLM : SKILL:script ARGS:list SKILL:script ARGS:show SKILL:script ARGS:save | SKILL:script ARGS:exec [args...] SKILL:script ARGS:run | SKILL:script ARGS:delete """ import os import stat import subprocess import tempfile DESCRIPTION = "Bibliothèque de scripts bash : sauvegarder, lister, afficher, exécuter" USAGE = ( "SKILL:script ARGS:list\n" "SKILL:script ARGS:show \n" "SKILL:script ARGS:save | \n" "SKILL:script ARGS:exec [args]\n" "SKILL:script ARGS:run | \n" "SKILL:script ARGS:delete " ) def _scripts_dir(context) -> str: """Détermine le répertoire scripts de cet agent.""" if context.config.get("scripts_dir"): return context.config["scripts_dir"] queue_db = context.config.get("queue_db", "") if queue_db: install = os.path.dirname(os.path.dirname(queue_db)) return os.path.join(install, "scripts") return f"/opt/{context.agent_id}/scripts" def _ensure_dir(context) -> str: d = _scripts_dir(context) os.makedirs(d, exist_ok=True) return d def _safe_name(name: str) -> str: """Empêche les traversées de répertoire.""" return os.path.basename(name.strip().replace("/", "_")) def _build_env(context, scripts_dir: str) -> dict: env = os.environ.copy() mc = context.config.get("mqtt", {}) env["MQTT_BROKER"] = mc.get("host", "localhost") env["MQTT_PORT"] = str(mc.get("port", 1883)) env["MQTT_REPLY_TOPIC"] = "agents/nexus/inbox" env["AGENT_ID"] = context.agent_id env["SCRIPTS_DIR"] = scripts_dir return env def _run_script(cmd: str, env: dict, timeout: int = 120) -> str: try: result = subprocess.run( cmd, shell=True, text=True, capture_output=True, timeout=timeout, env=env, executable="/bin/bash", ) out = (result.stdout + result.stderr).strip() if len(out) > 4000: out = out[:4000] + "\n... [tronqué]" return out or f"(code retour : {result.returncode})" except subprocess.TimeoutExpired: return f"Timeout ({timeout}s dépassé)" except Exception as e: return str(e) def run(args: str, context) -> str: parts = args.strip().split(None, 1) action = parts[0].lower() if parts else "list" rest = parts[1] if len(parts) > 1 else "" # ── list ────────────────────────────────────────────────────────────── if action == "list": d = _ensure_dir(context) files = sorted(f for f in os.listdir(d) if f.endswith(".sh")) if not files: return f"Aucun script dans {d}" lines = [f"Scripts disponibles ({d}) :"] for f in files: path = os.path.join(d, f) size = os.path.getsize(path) lines.append(f" {f[:-3]:30s} ({size} octets)") return "\n".join(lines) # ── show ────────────────────────────────────────────────────────────── if action == "show": name = _safe_name(rest) if not name: return "Précise le nom du script." d = _ensure_dir(context) path = os.path.join(d, name + ".sh") if not os.path.exists(path): return f"Script '{name}' introuvable dans {d}" with open(path) as f: content = f.read() return f"── {name}.sh ──\n{content}" # ── save ────────────────────────────────────────────────────────────── if action == "save": if "|" not in rest: return "Format : save | " name_raw, content = rest.split("|", 1) name = _safe_name(name_raw) content = content.strip().replace("\\n", "\n") d = _ensure_dir(context) path = os.path.join(d, name + ".sh") existed = os.path.exists(path) with open(path, "w") as f: if not content.startswith("#!"): f.write("#!/bin/bash\n") f.write(content + "\n") os.chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH) verb = "mis à jour" if existed else "créé" return f"Script '{name}' {verb} : {path}" # ── exec ────────────────────────────────────────────────────────────── if action == "exec": parts2 = rest.split(None, 1) name = _safe_name(parts2[0]) if parts2 else "" sargs = parts2[1] if len(parts2) > 1 else "" if not name: return "Précise le nom du script." d = _ensure_dir(context) path = os.path.join(d, name + ".sh") if not os.path.exists(path): return f"Script '{name}' introuvable. Utilise 'list' pour voir les scripts disponibles." env = _build_env(context, d) return _run_script(f'"{path}" {sargs}', env=env, timeout=120) # ── run (inline) ────────────────────────────────────────────────────── if action == "run": if not rest: return "Précise le contenu du script." d = _ensure_dir(context) content = rest.replace("\\n", "\n") with tempfile.NamedTemporaryFile( mode="w", suffix=".sh", delete=False, dir="/tmp" ) as f: f.write("#!/bin/bash\nset -e\n" + content) tmpfile = f.name os.chmod(tmpfile, stat.S_IRWXU) env = _build_env(context, d) out = _run_script(tmpfile, env=env, timeout=60) os.unlink(tmpfile) return out # ── delete ──────────────────────────────────────────────────────────── if action == "delete": name = _safe_name(rest) if not name: return "Précise le nom du script." d = _ensure_dir(context) path = os.path.join(d, name + ".sh") if not os.path.exists(path): return f"Script '{name}' introuvable dans {d}" os.unlink(path) return f"Script '{name}' supprimé." return "Action inconnue. Disponible : list, show, save, exec, run, delete"