Files
sylvain 47e93c2afa Agent2_Ansible : spécialiste Ansible pour l'automatisation du réseau local
- agent2_ansible.py : script principal (XMPP + MQTT)
- config/system_prompt.txt : prompt spécialisé Ansible
- skills/ansible_exec.py : ANSIBLE: (ad-hoc) + PLAYBOOK: (playbooks)
- skills/memory.py, prompt_memory.py : chemins corrigés pour agent2_ansible
- ansible/ansible.cfg + inventory/hosts : configuration de base

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-07 16:22:34 +00:00

108 lines
3.1 KiB
Python

"""
Skill : ANSIBLE / PLAYBOOK
Exécute des commandes Ansible (ad-hoc ou playbooks) sur les hôtes du réseau local.
Commandes :
ANSIBLE: <arguments>
→ Exécute ansible avec les arguments fournis
→ Exemples :
ANSIBLE: all -m ping
ANSIBLE: all -m shell -a "uptime"
ANSIBLE: webservers -m apt -a "name=nginx state=present" --become
PLAYBOOK: <playbook.yml> [options]
→ Exécute ansible-playbook depuis le dossier playbooks/
→ Exemples :
PLAYBOOK: site.yml
PLAYBOOK: deploy.yml --limit webservers --tags packages
"""
import subprocess
from pathlib import Path
SKILL_NAME = "ansible_exec"
TRIGGER = None
TRIGGERS = {
"ANSIBLE:": "ansible_adhoc",
"PLAYBOOK:": "ansible_playbook",
}
ANSIBLE_DIR = Path("/opt/agent2_ansible/ansible")
PLAYBOOKS_DIR = ANSIBLE_DIR / "playbooks"
ANSIBLE_CFG = ANSIBLE_DIR / "ansible.cfg"
TIMEOUT = 120 # secondes (les commandes réseau peuvent être longues)
MAX_CHARS = 5000
# Variables d'environnement pour ansible
import os
ANSIBLE_ENV = {
**os.environ,
"ANSIBLE_CONFIG": str(ANSIBLE_CFG),
"ANSIBLE_FORCE_COLOR": "0",
"ANSIBLE_NOCOLOR": "1",
}
def _run(cmd: list) -> str:
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=TIMEOUT,
env=ANSIBLE_ENV,
cwd=str(ANSIBLE_DIR),
)
stdout = result.stdout.strip()
stderr = result.stderr.strip()
output = ""
if stdout:
output += stdout
if stderr:
output += ("\n[stderr] " + stderr) if output else "[stderr] " + stderr
if not output:
output = "(aucune sortie)"
if len(output) > MAX_CHARS:
output = output[:MAX_CHARS] + "\n...[tronqué]"
status = "OK" if result.returncode == 0 else "Erreur (code {})".format(result.returncode)
return "[{}] $ {}\n{}".format(status, " ".join(cmd), output)
except subprocess.TimeoutExpired:
return "Timeout : la commande a dépassé {}s.".format(TIMEOUT)
except FileNotFoundError:
return "Erreur : ansible n'est pas installé ou introuvable dans le PATH."
except Exception as e:
return "Erreur d'exécution : {}".format(e)
def ansible_adhoc(args: str) -> str:
args = args.strip()
if not args:
return "Erreur : arguments vides. Exemple : ANSIBLE: all -m ping"
cmd = ["ansible"] + args.split()
return _run(cmd)
def ansible_playbook(args: str) -> str:
args = args.strip()
if not args:
return "Erreur : playbook non spécifié. Exemple : PLAYBOOK: site.yml"
parts = args.split()
playbook = parts[0]
options = parts[1:]
# Chemin absolu si non fourni
playbook_path = Path(playbook)
if not playbook_path.is_absolute():
playbook_path = PLAYBOOKS_DIR / playbook
if not playbook_path.exists():
return "Erreur : playbook introuvable : {}".format(playbook_path)
cmd = ["ansible-playbook", str(playbook_path)] + options
return _run(cmd)