feat: amélioration scripts bash, scan réseau, fix cron, README
- system_prompt: section scripts bash (commandes interdites, mosquitto_pub, bonnes pratiques) - script.py: nettoyage guillemets échappés à la sauvegarde - network.py: nouvelle action scan (nmap/arp-scan/arp fallback), auto-détection subnet - cron.py: _get_current_crontab() évite d'écrire "(aucune sortie)" dans le crontab - README créé Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+21
-7
@@ -35,6 +35,16 @@ def _run(cmd: str, timeout: int = 10) -> str:
|
||||
return str(e)
|
||||
|
||||
|
||||
def _get_current_crontab() -> str:
|
||||
"""Retourne le crontab actuel, ou chaîne vide si inexistant."""
|
||||
result = subprocess.run(
|
||||
"crontab -l", shell=True, text=True, capture_output=True
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return ""
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def run(args: str, context) -> str:
|
||||
parts = args.strip().split(None, 1)
|
||||
action = parts[0].lower() if parts else "list"
|
||||
@@ -54,26 +64,28 @@ def run(args: str, context) -> str:
|
||||
command = " ".join(words[5:])
|
||||
entry = f"{cron_expr} {command}"
|
||||
|
||||
current = _run("crontab -l 2>/dev/null")
|
||||
current = _get_current_crontab()
|
||||
if entry in current:
|
||||
return f"Cette entrée existe déjà : {entry}"
|
||||
|
||||
def _do_add():
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".cron", delete=False) as f:
|
||||
if current and "no crontab" not in current.lower():
|
||||
if current:
|
||||
f.write(current + "\n")
|
||||
f.write(entry + "\n")
|
||||
tmpfile = f.name
|
||||
out = _run(f"crontab {tmpfile}")
|
||||
result = subprocess.run(f"crontab {tmpfile}", shell=True, text=True, capture_output=True)
|
||||
os.unlink(tmpfile)
|
||||
return f"Entrée ajoutée : {entry}\n{out}"
|
||||
if result.returncode != 0:
|
||||
return f"❌ Erreur crontab : {(result.stdout + result.stderr).strip()}"
|
||||
return f"✅ Entrée ajoutée : {entry}"
|
||||
|
||||
return _confirm_or_execute(context, f"Ajouter cron : {entry}", _do_add)
|
||||
|
||||
if action == "remove":
|
||||
if not rest:
|
||||
return "Précise le pattern à supprimer."
|
||||
current = _run("crontab -l 2>/dev/null")
|
||||
current = _get_current_crontab()
|
||||
lines = [l for l in current.splitlines() if rest not in l]
|
||||
removed_count = len(current.splitlines()) - len(lines)
|
||||
if removed_count == 0:
|
||||
@@ -84,9 +96,11 @@ def run(args: str, context) -> str:
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".cron", delete=False) as f:
|
||||
f.write(new_cron + "\n")
|
||||
tmpfile = f.name
|
||||
out = _run(f"crontab {tmpfile}")
|
||||
result = subprocess.run(f"crontab {tmpfile}", shell=True, text=True, capture_output=True)
|
||||
os.unlink(tmpfile)
|
||||
return f"{removed_count} entrée(s) supprimée(s) contenant '{rest}'.\n{out}"
|
||||
if result.returncode != 0:
|
||||
return f"❌ Erreur crontab : {(result.stdout + result.stderr).strip()}"
|
||||
return f"✅ {removed_count} entrée(s) supprimée(s) contenant '{rest}'."
|
||||
|
||||
return _confirm_or_execute(context, f"Supprimer {removed_count} cron contenant '{rest}'", _do_remove)
|
||||
|
||||
|
||||
+29
-2
@@ -2,6 +2,8 @@
|
||||
Skill NETWORK — administration réseau.
|
||||
|
||||
Usage LLM :
|
||||
SKILL:network ARGS:scan [subnet] — découverte des hôtes (ex: scan 192.168.7.0/24)
|
||||
SKILL:network ARGS:arp — table ARP locale (hôtes récents)
|
||||
SKILL:network ARGS:ip [show|route|link]
|
||||
SKILL:network ARGS:ping <hôte> [count]
|
||||
SKILL:network ARGS:traceroute <hôte>
|
||||
@@ -20,8 +22,8 @@ Usage LLM :
|
||||
"""
|
||||
import subprocess
|
||||
|
||||
DESCRIPTION = "Administration réseau : ip, ping, traceroute, DNS, ports, firewall ufw/iptables"
|
||||
USAGE = "SKILL:network ARGS:ip | ping <host> | traceroute <host> | dns <host> | ports | connections | firewall status|allow|deny|list | wget <url>"
|
||||
DESCRIPTION = "Administration réseau : scan réseau, table ARP, ip, ping, traceroute, DNS, ports, firewall ufw/iptables"
|
||||
USAGE = "SKILL:network ARGS:scan [subnet] | arp | ip | ping <host> | traceroute <host> | dns <host> | ports | connections | firewall status|allow|deny|list | wget <url>"
|
||||
|
||||
|
||||
def _run(cmd: str, timeout: int = 20) -> str:
|
||||
@@ -167,6 +169,31 @@ def run(args: str, context) -> str:
|
||||
return "Précise l'URL."
|
||||
return _run(f"curl -sI {url} | head -10")
|
||||
|
||||
if action == "scan":
|
||||
# Détermine le subnet à scanner
|
||||
subnet = rest.strip()
|
||||
if not subnet:
|
||||
# Auto-détecte le subnet depuis l'IP locale
|
||||
iface_out = _run("ip -br addr show | grep -v '^lo' | head -1")
|
||||
# Extrait le CIDR (ex: 192.168.7.5/24 → 192.168.7.0/24)
|
||||
import re
|
||||
m = re.search(r'(\d+\.\d+\.\d+)\.\d+/(\d+)', iface_out)
|
||||
if m:
|
||||
subnet = f"{m.group(1)}.0/{m.group(2)}"
|
||||
else:
|
||||
subnet = "192.168.0.0/24"
|
||||
# Préfère nmap si dispo, sinon ping sweep
|
||||
nmap_check = _run("which nmap")
|
||||
if nmap_check and "nmap" in nmap_check:
|
||||
return _run(f"nmap -sn --host-timeout 3s {subnet} -oG - | grep 'Up$' | awk '{{print $2, $3}}'", timeout=60)
|
||||
# Fallback : arp-scan
|
||||
arpscan_check = _run("which arp-scan")
|
||||
if arpscan_check and "arp-scan" in arpscan_check:
|
||||
return _run(f"arp-scan {subnet}", timeout=30)
|
||||
# Fallback : table ARP après ping broadcast
|
||||
_run(f"ping -c 1 -b {subnet.rsplit('.',1)[0]}.255 2>/dev/null || true", timeout=5)
|
||||
return _run("arp -n")
|
||||
|
||||
if action == "arp":
|
||||
return _run("arp -n")
|
||||
|
||||
|
||||
+1
-1
@@ -147,7 +147,7 @@ def run(args: str, context) -> str:
|
||||
return "Format : save <nom> | <contenu du script>"
|
||||
name_raw, content = rest.split("|", 1)
|
||||
name = _safe_name(name_raw)
|
||||
content = content.strip().replace("\\n", "\n")
|
||||
content = content.strip().replace("\\n", "\n").replace('\\"', '"').replace("\\'", "'")
|
||||
|
||||
if not name:
|
||||
return "Nom de script invalide."
|
||||
|
||||
Reference in New Issue
Block a user