diff --git a/skills/script.py b/skills/script.py new file mode 100644 index 0000000..bd4e4f2 --- /dev/null +++ b/skills/script.py @@ -0,0 +1,179 @@ +""" +Skill SCRIPT — bibliothèque de scripts bash par agent. + +Chaque agent dispose de son propre dossier scripts/ (configurable via +"scripts_dir" dans config.json, sinon /opt//scripts). + +L'environnement du script expose automatiquement : + MQTT_BROKER, MQTT_PORT, MQTT_REPLY_TOPIC, AGENT_ID, SCRIPTS_DIR + +Ainsi un script peut publier son résultat directement : + mosquitto_pub -h $MQTT_BROKER -t $MQTT_REPLY_TOPIC -m "mon résultat" + +Usage LLM : + SKILL:script ARGS:list + SKILL:script ARGS:show + SKILL:script ARGS:save | + SKILL:script ARGS:exec [args...] + SKILL:script ARGS:run | + SKILL:script ARGS:delete +""" +import os +import stat +import subprocess +import tempfile + +DESCRIPTION = "Bibliothèque de scripts bash : sauvegarder, lister, afficher, exécuter" +USAGE = ( + "SKILL:script ARGS:list\n" + "SKILL:script ARGS:show \n" + "SKILL:script ARGS:save | \n" + "SKILL:script ARGS:exec [args]\n" + "SKILL:script ARGS:run | \n" + "SKILL:script ARGS:delete " +) + + +def _scripts_dir(context) -> str: + """Détermine le répertoire scripts de cet agent.""" + if context.config.get("scripts_dir"): + return context.config["scripts_dir"] + queue_db = context.config.get("queue_db", "") + if queue_db: + install = os.path.dirname(os.path.dirname(queue_db)) + return os.path.join(install, "scripts") + return f"/opt/{context.agent_id}/scripts" + + +def _ensure_dir(context) -> str: + d = _scripts_dir(context) + os.makedirs(d, exist_ok=True) + return d + + +def _safe_name(name: str) -> str: + """Empêche les traversées de répertoire.""" + return os.path.basename(name.strip().replace("/", "_")) + + +def _build_env(context, scripts_dir: str) -> dict: + env = os.environ.copy() + mc = context.config.get("mqtt", {}) + env["MQTT_BROKER"] = mc.get("host", "localhost") + env["MQTT_PORT"] = str(mc.get("port", 1883)) + env["MQTT_REPLY_TOPIC"] = "agents/nexus/inbox" + env["AGENT_ID"] = context.agent_id + env["SCRIPTS_DIR"] = scripts_dir + return env + + +def _run_script(cmd: str, env: dict, timeout: int = 120) -> str: + try: + result = subprocess.run( + cmd, shell=True, text=True, + capture_output=True, timeout=timeout, + env=env, executable="/bin/bash", + ) + out = (result.stdout + result.stderr).strip() + if len(out) > 4000: + out = out[:4000] + "\n... [tronqué]" + return out or f"(code retour : {result.returncode})" + except subprocess.TimeoutExpired: + return f"Timeout ({timeout}s dépassé)" + except Exception as e: + return str(e) + + +def run(args: str, context) -> str: + parts = args.strip().split(None, 1) + action = parts[0].lower() if parts else "list" + rest = parts[1] if len(parts) > 1 else "" + + # ── list ────────────────────────────────────────────────────────────── + if action == "list": + d = _ensure_dir(context) + files = sorted(f for f in os.listdir(d) if f.endswith(".sh")) + if not files: + return f"Aucun script dans {d}" + lines = [f"Scripts disponibles ({d}) :"] + for f in files: + path = os.path.join(d, f) + size = os.path.getsize(path) + lines.append(f" {f[:-3]:30s} ({size} octets)") + return "\n".join(lines) + + # ── show ────────────────────────────────────────────────────────────── + if action == "show": + name = _safe_name(rest) + if not name: + return "Précise le nom du script." + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + if not os.path.exists(path): + return f"Script '{name}' introuvable dans {d}" + with open(path) as f: + content = f.read() + return f"── {name}.sh ──\n{content}" + + # ── save ────────────────────────────────────────────────────────────── + if action == "save": + if "|" not in rest: + return "Format : save | " + name_raw, content = rest.split("|", 1) + name = _safe_name(name_raw) + content = content.strip().replace("\\n", "\n") + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + existed = os.path.exists(path) + with open(path, "w") as f: + if not content.startswith("#!"): + f.write("#!/bin/bash\n") + f.write(content + "\n") + os.chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH) + verb = "mis à jour" if existed else "créé" + return f"Script '{name}' {verb} : {path}" + + # ── exec ────────────────────────────────────────────────────────────── + if action == "exec": + parts2 = rest.split(None, 1) + name = _safe_name(parts2[0]) if parts2 else "" + sargs = parts2[1] if len(parts2) > 1 else "" + if not name: + return "Précise le nom du script." + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + if not os.path.exists(path): + return f"Script '{name}' introuvable. Utilise 'list' pour voir les scripts disponibles." + env = _build_env(context, d) + return _run_script(f'"{path}" {sargs}', env=env, timeout=120) + + # ── run (inline) ────────────────────────────────────────────────────── + if action == "run": + if not rest: + return "Précise le contenu du script." + d = _ensure_dir(context) + content = rest.replace("\\n", "\n") + with tempfile.NamedTemporaryFile( + mode="w", suffix=".sh", delete=False, dir="/tmp" + ) as f: + f.write("#!/bin/bash\nset -e\n" + content) + tmpfile = f.name + os.chmod(tmpfile, stat.S_IRWXU) + env = _build_env(context, d) + out = _run_script(tmpfile, env=env, timeout=60) + os.unlink(tmpfile) + return out + + # ── delete ──────────────────────────────────────────────────────────── + if action == "delete": + name = _safe_name(rest) + if not name: + return "Précise le nom du script." + d = _ensure_dir(context) + path = os.path.join(d, name + ".sh") + if not os.path.exists(path): + return f"Script '{name}' introuvable dans {d}" + os.unlink(path) + return f"Script '{name}' supprimé." + + return "Action inconnue. Disponible : list, show, save, exec, run, delete"