feat: add script library skill to all agents

Generic script.py skill (list/show/save/exec/run/delete) with dynamic
scripts dir derived from queue_db path or config scripts_dir key.
Each agent gets its own /opt/<agent>/scripts/ folder.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-15 19:38:15 +00:00
parent 6e29721834
commit 6f87ee908c
+129 -82
View File
@@ -1,132 +1,179 @@
""" """
Skill SCRIPT — créer et exécuter un script bash, avec renvoi du résultat via MQTT. Skill SCRIPT — bibliothèque de scripts bash par agent.
Chaque agent dispose de son propre dossier scripts/ (configurable via
"scripts_dir" dans config.json, sinon /opt/<install_dir>/scripts).
L'environnement du script expose automatiquement : L'environnement du script expose automatiquement :
MQTT_BROKER, MQTT_REPLY_TOPIC, AGENT_ID MQTT_BROKER, MQTT_PORT, MQTT_REPLY_TOPIC, AGENT_ID, SCRIPTS_DIR
Ainsi un script peut publier son résultat directement : Ainsi un script peut publier son résultat directement :
mosquitto_pub -h $MQTT_BROKER -t $MQTT_REPLY_TOPIC -m "mon résultat" mosquitto_pub -h $MQTT_BROKER -t $MQTT_REPLY_TOPIC -m "mon résultat"
Usage LLM : Usage LLM :
SKILL:script ARGS:run | <contenu du script>
SKILL:script ARGS:save <nom> | <contenu>
SKILL:script ARGS:exec <nom> [args]
SKILL:script ARGS:list SKILL:script ARGS:list
SKILL:script ARGS:show <nom> SKILL:script ARGS:show <nom>
SKILL:script ARGS:save <nom> | <contenu>
SKILL:script ARGS:exec <nom> [args...]
SKILL:script ARGS:run | <contenu inline>
SKILL:script ARGS:delete <nom> SKILL:script ARGS:delete <nom>
""" """
import os import os
import subprocess
import stat import stat
import subprocess
import tempfile
DESCRIPTION = "Créer/exécuter des scripts bash avec renvoi du résultat via MQTT" DESCRIPTION = "Bibliothèque de scripts bash : sauvegarder, lister, afficher, exécuter"
USAGE = "SKILL:script ARGS:run|<contenu> | save <nom>|<contenu> | exec <nom> | list | show <nom>" USAGE = (
"SKILL:script ARGS:list\n"
SCRIPTS_DIR = "/opt/agent_debian/scripts" "SKILL:script ARGS:show <nom>\n"
"SKILL:script ARGS:save <nom> | <contenu>\n"
"SKILL:script ARGS:exec <nom> [args]\n"
def _ensure_dir(): "SKILL:script ARGS:run | <contenu inline>\n"
os.makedirs(SCRIPTS_DIR, exist_ok=True) "SKILL:script ARGS:delete <nom>"
def _run(cmd: str, env: dict = None, timeout: int = 60) -> str:
try:
result = subprocess.run(
cmd, shell=True, text=True,
capture_output=True, timeout=timeout,
env=env, executable="/bin/bash"
) )
out = (result.stdout + result.stderr).strip()
if len(out) > 4000:
out = out[:4000] + "\n... [tronqué]"
return out or f"(code retour : {result.returncode})"
except subprocess.TimeoutExpired:
return f"Timeout ({timeout}s)"
except Exception as e:
return str(e)
def _build_env(context) -> dict: def _scripts_dir(context) -> str:
"""Environnement injecté dans chaque script.""" """Détermine le répertoire scripts de cet agent."""
if context.config.get("scripts_dir"):
return context.config["scripts_dir"]
queue_db = context.config.get("queue_db", "")
if queue_db:
install = os.path.dirname(os.path.dirname(queue_db))
return os.path.join(install, "scripts")
return f"/opt/{context.agent_id}/scripts"
def _ensure_dir(context) -> str:
d = _scripts_dir(context)
os.makedirs(d, exist_ok=True)
return d
def _safe_name(name: str) -> str:
"""Empêche les traversées de répertoire."""
return os.path.basename(name.strip().replace("/", "_"))
def _build_env(context, scripts_dir: str) -> dict:
env = os.environ.copy() env = os.environ.copy()
mc = context.config.get("mqtt", {}) mc = context.config.get("mqtt", {})
env["MQTT_BROKER"] = mc.get("host", "localhost") env["MQTT_BROKER"] = mc.get("host", "localhost")
env["MQTT_PORT"] = str(mc.get("port", 1883)) env["MQTT_PORT"] = str(mc.get("port", 1883))
env["MQTT_REPLY_TOPIC"] = "agents/nexus/inbox" env["MQTT_REPLY_TOPIC"] = "agents/nexus/inbox"
env["AGENT_ID"] = context.agent_id env["AGENT_ID"] = context.agent_id
env["SCRIPTS_DIR"] = scripts_dir
return env return env
def _run_script(cmd: str, env: dict, timeout: int = 120) -> str:
try:
result = subprocess.run(
cmd, shell=True, text=True,
capture_output=True, timeout=timeout,
env=env, executable="/bin/bash",
)
out = (result.stdout + result.stderr).strip()
if len(out) > 4000:
out = out[:4000] + "\n... [tronqué]"
return out or f"(code retour : {result.returncode})"
except subprocess.TimeoutExpired:
return f"Timeout ({timeout}s dépassé)"
except Exception as e:
return str(e)
def run(args: str, context) -> str: def run(args: str, context) -> str:
_ensure_dir()
parts = args.strip().split(None, 1) parts = args.strip().split(None, 1)
action = parts[0].lower() if parts else "run" action = parts[0].lower() if parts else "list"
rest = parts[1] if len(parts) > 1 else "" rest = parts[1] if len(parts) > 1 else ""
# ── list ──────────────────────────────────────────────────────────────
if action == "list":
d = _ensure_dir(context)
files = sorted(f for f in os.listdir(d) if f.endswith(".sh"))
if not files:
return f"Aucun script dans {d}"
lines = [f"Scripts disponibles ({d}) :"]
for f in files:
path = os.path.join(d, f)
size = os.path.getsize(path)
lines.append(f" {f[:-3]:30s} ({size} octets)")
return "\n".join(lines)
# ── show ──────────────────────────────────────────────────────────────
if action == "show":
name = _safe_name(rest)
if not name:
return "Précise le nom du script."
d = _ensure_dir(context)
path = os.path.join(d, name + ".sh")
if not os.path.exists(path):
return f"Script '{name}' introuvable dans {d}"
with open(path) as f:
content = f.read()
return f"── {name}.sh ──\n{content}"
# ── save ──────────────────────────────────────────────────────────────
if action == "save":
if "|" not in rest:
return "Format : save <nom> | <contenu du script>"
name_raw, content = rest.split("|", 1)
name = _safe_name(name_raw)
content = content.strip().replace("\\n", "\n")
d = _ensure_dir(context)
path = os.path.join(d, name + ".sh")
existed = os.path.exists(path)
with open(path, "w") as f:
if not content.startswith("#!"):
f.write("#!/bin/bash\n")
f.write(content + "\n")
os.chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
verb = "mis à jour" if existed else "créé"
return f"Script '{name}' {verb} : {path}"
# ── exec ──────────────────────────────────────────────────────────────
if action == "exec":
parts2 = rest.split(None, 1)
name = _safe_name(parts2[0]) if parts2 else ""
sargs = parts2[1] if len(parts2) > 1 else ""
if not name:
return "Précise le nom du script."
d = _ensure_dir(context)
path = os.path.join(d, name + ".sh")
if not os.path.exists(path):
return f"Script '{name}' introuvable. Utilise 'list' pour voir les scripts disponibles."
env = _build_env(context, d)
return _run_script(f'"{path}" {sargs}', env=env, timeout=120)
# ── run (inline) ──────────────────────────────────────────────────────
if action == "run": if action == "run":
# Exécution directe d'un script inline
if not rest: if not rest:
return "Précise le contenu du script." return "Précise le contenu du script."
d = _ensure_dir(context)
content = rest.replace("\\n", "\n") content = rest.replace("\\n", "\n")
# Fichier temporaire
import tempfile
with tempfile.NamedTemporaryFile( with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False, dir="/tmp" mode="w", suffix=".sh", delete=False, dir="/tmp"
) as f: ) as f:
f.write("#!/bin/bash\nset -e\n" + content) f.write("#!/bin/bash\nset -e\n" + content)
tmpfile = f.name tmpfile = f.name
os.chmod(tmpfile, stat.S_IRWXU) os.chmod(tmpfile, stat.S_IRWXU)
env = _build_env(context) env = _build_env(context, d)
out = _run(tmpfile, env=env, timeout=60) out = _run_script(tmpfile, env=env, timeout=60)
os.unlink(tmpfile) os.unlink(tmpfile)
return out return out
if action == "save": # ── delete ────────────────────────────────────────────────────────────
if "|" not in rest:
return "Format : save <nom> | <contenu du script>"
name, content = rest.split("|", 1)
name = name.strip().replace("/", "_") # Sécurité
content = content.strip().replace("\\n", "\n")
path = os.path.join(SCRIPTS_DIR, name + ".sh")
with open(path, "w") as f:
f.write("#!/bin/bash\n" + content)
os.chmod(path, stat.S_IRWXU)
return f"Script sauvegardé : {path}"
if action == "exec":
parts2 = rest.split(None, 1)
name = parts2[0] if parts2 else ""
sargs = parts2[1] if len(parts2) > 1 else ""
if not name:
return "Précise le nom du script."
path = os.path.join(SCRIPTS_DIR, name + ".sh")
if not os.path.exists(path):
return f"Script '{name}' introuvable dans {SCRIPTS_DIR}"
env = _build_env(context)
return _run(f"{path} {sargs}", env=env, timeout=120)
if action == "list":
files = [f for f in os.listdir(SCRIPTS_DIR) if f.endswith(".sh")]
return "\n".join(files) if files else "Aucun script sauvegardé."
if action == "show":
name = rest.strip()
if not name:
return "Précise le nom du script."
path = os.path.join(SCRIPTS_DIR, name + ".sh")
if not os.path.exists(path):
return f"Script '{name}' introuvable."
with open(path) as f:
return f.read()
if action == "delete": if action == "delete":
name = rest.strip() name = _safe_name(rest)
if not name: if not name:
return "Précise le nom du script." return "Précise le nom du script."
path = os.path.join(SCRIPTS_DIR, name + ".sh") d = _ensure_dir(context)
if os.path.exists(path): path = os.path.join(d, name + ".sh")
if not os.path.exists(path):
return f"Script '{name}' introuvable dans {d}"
os.unlink(path) os.unlink(path)
return f"Script '{name}' supprimé." return f"Script '{name}' supprimé."
return f"Script '{name}' introuvable."
return "Action inconnue. Disponible : run, save, exec, list, show, delete" return "Action inconnue. Disponible : list, show, save, exec, run, delete"