Files
sylvain ea1c67b33f Initial commit — Agent HAL v1.0
Agent système complet remplaçant agent_debian :
- 20 skills : apt, systemd, cron, process, network, user, sysinfo,
  journal, container, shell, filesystem (enhanced), git, ssh,
  web_fetch, todo, script, mqtt_send, mqtt_subscribe, muc_send, agents_status
- filesystem : read avec numéros de lignes, edit, multiedit (style SHAI)
- git : status, log, diff, add, commit, push, pull, clone, branch, checkout
- ssh : exécution distante + SCP (password ou clé)
- web_fetch : GET/HEAD/POST avec nettoyage HTML
- todo : liste de tâches en mémoire
2026-03-22 21:53:00 +00:00

255 lines
9.2 KiB
Python

"""
Skill FILESYSTEM — opérations avancées sur le système de fichiers.
Usage LLM :
SKILL:filesystem ARGS:ls <chemin>
SKILL:filesystem ARGS:cat <fichier>
SKILL:filesystem ARGS:read <fichier> (avec numéros de lignes)
SKILL:filesystem ARGS:write <fichier> | <contenu>
SKILL:filesystem ARGS:edit <fichier> | <ancien> -> <nouveau>
SKILL:filesystem ARGS:multiedit <fichier> | <anc1> -> <nouv1> ;; <anc2> -> <nouv2>
SKILL:filesystem ARGS:append <fichier> | <contenu>
SKILL:filesystem ARGS:delete <chemin>
SKILL:filesystem ARGS:mkdir <chemin>
SKILL:filesystem ARGS:move <src> | <dst>
SKILL:filesystem ARGS:copy <src> | <dst>
SKILL:filesystem ARGS:chmod <mode> <chemin>
SKILL:filesystem ARGS:chown <owner> <chemin>
SKILL:filesystem ARGS:find <chemin> <pattern>
SKILL:filesystem ARGS:grep <pattern> <fichier>
SKILL:filesystem ARGS:df
SKILL:filesystem ARGS:du <chemin>
SKILL:filesystem ARGS:stat <chemin>
SKILL:filesystem ARGS:tail <fichier> [N]
SKILL:filesystem ARGS:head <fichier> [N]
"""
import os
import subprocess
DESCRIPTION = "Opérations filesystem avancées : ls, read, write, edit, multiedit, delete, find, grep, git-style édition"
USAGE = (
"SKILL:filesystem ARGS:ls <path> | read <file> | write <file>|<content> | "
"edit <file>|<old> -> <new> | multiedit <file>|<old1> -> <new1> ;; <old2> -> <new2> | "
"delete <path> | find <path> <pattern> | grep <pattern> <file> | df | du <path>"
)
FORBIDDEN = ["/proc", "/sys", "/dev", "/run/systemd"]
def _safe_path(path: str) -> bool:
path = os.path.realpath(path)
return not any(path.startswith(f) for f in FORBIDDEN)
def _run(cmd: str, timeout: int = 15) -> str:
try:
result = subprocess.run(
cmd, shell=True, text=True,
capture_output=True, timeout=timeout
)
out = (result.stdout + result.stderr).strip()
return out[:4000] if out else "(aucune sortie)"
except subprocess.TimeoutExpired:
return f"Timeout ({timeout}s)"
except Exception as e:
return str(e)
def run(args: str, context) -> str:
parts = args.strip().split(None, 1)
action = parts[0].lower() if parts else ""
rest = parts[1] if len(parts) > 1 else ""
if action == "ls":
path = rest or "."
return _run(f"ls -lah {path}")
if action == "cat":
if not rest:
return "Précise le fichier."
if not _safe_path(rest):
return f"Accès refusé : {rest}"
return _run(f"cat {rest}")
if action == "read":
# Lecture avec numéros de lignes (style SHAI)
if not rest:
return "Précise le fichier."
if not _safe_path(rest):
return f"Accès refusé : {rest}"
try:
with open(rest.strip(), "r", errors="replace") as f:
lines = f.readlines()
numbered = "".join(f"{i+1:4d} {line}" for i, line in enumerate(lines))
if len(numbered) > 6000:
numbered = numbered[:6000] + f"\n... (tronqué, {len(lines)} lignes total)"
return numbered or "(fichier vide)"
except Exception as e:
return str(e)
if action == "edit":
# Search & replace : edit <fichier> | <ancien> -> <nouveau>
if "|" not in rest:
return "Format : edit <fichier> | <ancien> -> <nouveau>"
filepath, change = rest.split("|", 1)
filepath = filepath.strip()
if not _safe_path(filepath):
return f"Accès refusé : {filepath}"
if " -> " not in change:
return "Format : edit <fichier> | <ancien> -> <nouveau>"
old_text, new_text = change.split(" -> ", 1)
old_text = old_text.strip()
new_text = new_text.strip()
try:
with open(filepath, "r", errors="replace") as f:
content = f.read()
if old_text not in content:
return f"Texte non trouvé dans {filepath} : {old_text[:80]!r}"
count = content.count(old_text)
new_content = content.replace(old_text, new_text)
with open(filepath, "w") as f:
f.write(new_content)
return f"Édition OK : {count} remplacement(s) dans {filepath}"
except Exception as e:
return str(e)
if action == "multiedit":
# Plusieurs search & replace : multiedit <fichier> | old1 -> new1 ;; old2 -> new2
if "|" not in rest:
return "Format : multiedit <fichier> | <anc1> -> <nouv1> ;; <anc2> -> <nouv2>"
filepath, changes_str = rest.split("|", 1)
filepath = filepath.strip()
if not _safe_path(filepath):
return f"Accès refusé : {filepath}"
try:
with open(filepath, "r", errors="replace") as f:
content = f.read()
results = []
for change in changes_str.split(";;"):
change = change.strip()
if " -> " not in change:
continue
old_text, new_text = change.split(" -> ", 1)
old_text = old_text.strip()
new_text = new_text.strip()
count = content.count(old_text)
if count == 0:
results.append(f"Non trouvé : {old_text[:60]!r}")
else:
content = content.replace(old_text, new_text)
results.append(f"{count} remplacement(s) : {old_text[:40]!r}")
with open(filepath, "w") as f:
f.write(content)
return f"Multiedit OK dans {filepath} :\n" + "\n".join(results)
except Exception as e:
return str(e)
if action == "tail":
parts2 = rest.split()
filepath = parts2[0] if parts2 else ""
n = parts2[1] if len(parts2) > 1 else "50"
return _run(f"tail -n {n} {filepath}")
if action == "head":
parts2 = rest.split()
filepath = parts2[0] if parts2 else ""
n = parts2[1] if len(parts2) > 1 else "30"
return _run(f"head -n {n} {filepath}")
if action == "write":
if "|" not in rest:
return "Format : write <fichier> | <contenu>"
filepath, content = rest.split("|", 1)
filepath = filepath.strip()
content = content.strip()
if not _safe_path(filepath):
return f"Accès refusé : {filepath}"
try:
os.makedirs(os.path.dirname(os.path.abspath(filepath)), exist_ok=True)
with open(filepath, "w") as f:
f.write(content)
return f"Fichier écrit : {filepath} ({len(content)} caractères)"
except Exception as e:
return str(e)
if action == "append":
if "|" not in rest:
return "Format : append <fichier> | <contenu>"
filepath, content = rest.split("|", 1)
filepath = filepath.strip()
if not _safe_path(filepath):
return f"Accès refusé : {filepath}"
try:
with open(filepath, "a") as f:
f.write(content.strip() + "\n")
return f"Contenu ajouté à {filepath}"
except Exception as e:
return str(e)
if action == "delete":
if not rest:
return "Précise le chemin."
if not _safe_path(rest):
return f"Accès refusé : {rest}"
if os.path.isdir(rest):
return _run(f"rm -rf {rest}")
return _run(f"rm -f {rest}")
if action == "mkdir":
if not rest:
return "Précise le chemin."
return _run(f"mkdir -p {rest}")
if action == "move":
if "|" not in rest:
return "Format : move <src> | <dst>"
src, dst = rest.split("|", 1)
return _run(f"mv {src.strip()} {dst.strip()}")
if action == "copy":
if "|" not in rest:
return "Format : copy <src> | <dst>"
src, dst = rest.split("|", 1)
return _run(f"cp -r {src.strip()} {dst.strip()}")
if action == "chmod":
parts2 = rest.split(None, 1)
if len(parts2) < 2:
return "Format : chmod <mode> <chemin>"
return _run(f"chmod {parts2[0]} {parts2[1]}")
if action == "chown":
parts2 = rest.split(None, 1)
if len(parts2) < 2:
return "Format : chown <owner:group> <chemin>"
return _run(f"chown -R {parts2[0]} {parts2[1]}")
if action == "find":
parts2 = rest.split(None, 1)
path = parts2[0] if parts2 else "."
pattern = parts2[1] if len(parts2) > 1 else "*"
return _run(f"find {path} -name '{pattern}' 2>/dev/null | head -50")
if action == "grep":
parts2 = rest.split(None, 1)
if len(parts2) < 2:
return "Format : grep <pattern> <fichier>"
return _run(f"grep -rn '{parts2[0]}' {parts2[1]} 2>/dev/null | head -50")
if action == "df":
return _run("df -h")
if action == "du":
path = rest or "."
return _run(f"du -sh {path}/* 2>/dev/null | sort -rh | head -20")
if action == "stat":
if not rest:
return "Précise le chemin."
return _run(f"stat {rest}")
return (
"Action inconnue. Disponible : ls, cat, read, tail, head, write, append, edit, multiedit, "
"delete, mkdir, move, copy, chmod, chown, find, grep, df, du, stat"
)