Files
agent2_deploy/agent2_deploy.py
T
root ee6b671e2e deploy.py : choix local/distant interactif + agent_name partout
agent2_deploy : MQTT Last Will + publish retain sur agents/status/{name}

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-07 21:24:34 +00:00

396 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Agent2_Deploy — Agent de déploiement interactif via XMPP.
Interaction guidée (machine à états) déclenchée par "!deploy".
L'agent pose les questions nécessaires, déploie via SSH,
puis notifie agent1 du nouvel agent via MQTT.
"""
import asyncio
import json
import threading
from pathlib import Path
from slixmpp import ClientXMPP
import paho.mqtt.client as mqtt
from deployer import deploy_agent, load_catalog
# ── CONFIG ────────────────────────────────────────────────────────────────
CONFIG_FILE = Path("/opt/agent2_deploy/config/config.json")
def load_config():
with open(CONFIG_FILE, encoding="utf-8") as f:
return json.load(f)
cfg = load_config()
XMPP_JID = cfg["xmpp_jid"]
XMPP_PASS = cfg["xmpp_pass"]
ADMIN_JID = cfg["admin_jid"]
MQTT_HOST = cfg["mqtt_host"]
MQTT_PORT = int(cfg["mqtt_port"])
MQTT_CLIENT = cfg["mqtt_client_id"]
AGENT1_INBOX = cfg["agent1_inbox"]
# ── MQTT ──────────────────────────────────────────────────────────────────
_mqtt_pub = None
def mqtt_publish(topic: str, message: str):
if _mqtt_pub:
_mqtt_pub.publish(topic, message)
def start_mqtt():
global _mqtt_pub
import json as _json
_status_topic = "agents/status/{}".format(MQTT_CLIENT)
_offline_payload = _json.dumps({"status": "offline", "agent": MQTT_CLIENT})
_online_payload = _json.dumps({
"status" : "online",
"agent" : MQTT_CLIENT,
"jid" : XMPP_JID,
"mqtt_inbox": cfg["mqtt_inbox"],
})
_mqtt_pub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id=MQTT_CLIENT + "_pub")
_mqtt_pub.will_set(_status_topic, _offline_payload, retain=True)
_mqtt_pub.connect(MQTT_HOST, MQTT_PORT)
_mqtt_pub.loop_start()
_mqtt_pub.publish(_status_topic, _online_payload, retain=True)
register_to_agent1()
def register_to_agent1():
"""Publie une déclaration de mise en ligne sur agents/register."""
import json as _json
payload = _json.dumps({
"agent" : MQTT_CLIENT,
"jid" : XMPP_JID,
"mqtt_inbox": cfg["mqtt_inbox"],
"speciality": "Déploiement d'agents : installe et configure d'autres agents sur des machines distantes ou locales via SSH",
})
mqtt_publish("agents/register", payload)
print("[REGISTER] Déclaration envoyée à agent1.")
def notify_agent1(agent_name: str, agent_type: str, host: str, xmpp_jid: str, speciality: str):
"""Publie sur agents/register pour que agent1 enregistre le nouvel agent."""
import json as _json
payload = _json.dumps({
"agent" : agent_name,
"jid" : xmpp_jid,
"mqtt_inbox": "agents/{}/inbox".format(agent_name),
"speciality": speciality,
"host" : host,
"type" : agent_type,
})
mqtt_publish("agents/register", payload)
# ── MACHINE À ÉTATS ───────────────────────────────────────────────────────
# sessions[jid] = dict avec toutes les infos collectées + état courant
STEPS = [
"choose_agent",
"enter_ip",
"enter_user",
"enter_auth",
"enter_credential",
"enter_xmpp_jid",
"enter_xmpp_pass",
"enter_mqtt_host",
"confirm",
]
sessions: dict = {}
def list_agents() -> str:
catalog = load_catalog()
lines = ["Agents disponibles :"]
for i, (name, info) in enumerate(catalog.items(), 1):
lines.append(" {}. {}{}".format(i, name, info["description"]))
lines.append("\nTapez le numéro ou le nom de l'agent à déployer.")
return "\n".join(lines)
def start_session(jid: str) -> str:
sessions[jid] = {
"state" : "choose_agent",
"agent_type" : None,
"agent_name" : None, # nom libre choisi par l'utilisateur (ex: trouducul)
"target_ip" : None,
"ssh_user" : None,
"auth_type" : None, # "password" ou "key"
"ssh_password": None,
"ssh_key_path": None,
"xmpp_jid" : None,
"xmpp_pass" : None,
"mqtt_host" : None,
}
return "Déploiement d'agent démarré. Tapez !annuler pour abandonner.\n\n" + list_agents()
def handle_session(jid: str, text: str) -> str | None:
"""
Gère un message dans le contexte d'une session de déploiement.
Retourne la réponse à envoyer, ou None si hors session.
"""
session = sessions.get(jid)
if session is None:
return None
text = text.strip()
if text.lower() in ("!annuler", "annuler", "cancel"):
del sessions[jid]
return "Déploiement annulé."
state = session["state"]
catalog = load_catalog()
# ── choose_agent ──────────────────────────────────────────────────────
if state == "choose_agent":
agents = list(catalog.keys())
# Accepte numéro ou nom
if text.isdigit():
idx = int(text) - 1
if 0 <= idx < len(agents):
session["agent_type"] = agents[idx]
else:
return "Numéro invalide. Choisissez entre 1 et {}.".format(len(agents))
elif text in catalog:
session["agent_type"] = text
else:
return "Agent inconnu. Tapez le numéro ou le nom exact.\n\n" + list_agents()
session["state"] = "enter_name"
return (
"Agent choisi : {}.\n\n"
"Quel nom donner à cet agent ?\n"
"Ce nom sera utilisé dans vos commandes (ex: \"mets à jour trouducul\"),\n"
"comme nom du service systemd et comme identifiant MQTT.\n"
"Lettres, chiffres, tirets et underscores uniquement."
).format(session["agent_type"])
# ── enter_name ────────────────────────────────────────────────────────
elif state == "enter_name":
import re
name = text.strip().lower()
if not name or not re.match(r'^[a-z0-9_-]+$', name):
return "Nom invalide. Utilisez uniquement des lettres, chiffres, tirets ou underscores."
session["agent_name"] = name
session["state"] = "enter_ip"
return "Nom choisi : {}.\n\nAdresse IP de la machine cible ?".format(name)
# ── enter_ip ──────────────────────────────────────────────────────────
elif state == "enter_ip":
session["target_ip"] = text
session["state"] = "enter_user"
return "Nom d'utilisateur SSH sur {} ?".format(text)
# ── enter_user ────────────────────────────────────────────────────────
elif state == "enter_user":
session["ssh_user"] = text
session["state"] = "enter_auth"
return (
"Méthode d'authentification SSH ?\n"
" 1. Mot de passe\n"
" 2. Clé SSH (chemin local)"
)
# ── enter_auth ────────────────────────────────────────────────────────
elif state == "enter_auth":
if text in ("1", "mot de passe", "password"):
session["auth_type"] = "password"
session["state"] = "enter_credential"
return "Mot de passe SSH pour {}@{} :".format(
session["ssh_user"], session["target_ip"])
elif text in ("2", "clé", "clé ssh", "key"):
session["auth_type"] = "key"
session["state"] = "enter_credential"
return "Chemin local de la clé SSH (ex: /root/.ssh/id_rsa) :"
else:
return "Répondez 1 (mot de passe) ou 2 (clé SSH)."
# ── enter_credential ─────────────────────────────────────────────────
elif state == "enter_credential":
if session["auth_type"] == "password":
session["ssh_password"] = text
else:
session["ssh_key_path"] = text
session["state"] = "enter_xmpp_jid"
default_jid = "{}@xmpp.ovh".format(session["agent_name"])
return (
"Adresse XMPP (JID) pour cet agent ?\n"
"(entrée vide = {})".format(default_jid)
)
# ── enter_xmpp_jid ───────────────────────────────────────────────────
elif state == "enter_xmpp_jid":
default_jid = "{}@xmpp.ovh".format(session["agent_name"])
session["xmpp_jid"] = text if text else default_jid
session["state"] = "enter_xmpp_pass"
return "Mot de passe XMPP pour {} :".format(session["xmpp_jid"])
# ── enter_xmpp_pass ──────────────────────────────────────────────────
elif state == "enter_xmpp_pass":
session["xmpp_pass"] = text
session["state"] = "enter_mqtt_host"
return (
"Adresse du broker MQTT pour cet agent ?\n"
"(laisser vide pour utiliser l'IP de la machine distante : {})".format(
session["target_ip"])
)
# ── enter_mqtt_host ──────────────────────────────────────────────────
elif state == "enter_mqtt_host":
session["mqtt_host"] = text if text else session["target_ip"]
session["state"] = "confirm"
s = session
agent_info = catalog[s["agent_type"]]
return (
"Récapitulatif du déploiement :\n"
" Type : {agent_type}\n"
" Nom : {agent_name}\n"
" Repo : {repo}\n"
" Machine : {ssh_user}@{target_ip}\n"
" Auth SSH : {auth}\n"
" XMPP JID : {xmpp_jid}\n"
" MQTT broker: {mqtt_host}\n"
" MQTT inbox : agents/{agent_name}/inbox\n"
" Service : {agent_name}.service\n\n"
"Confirmer le déploiement ? (oui / non)"
).format(
agent_type=s["agent_type"],
agent_name=s["agent_name"],
repo=agent_info["repo_url"],
ssh_user=s["ssh_user"],
target_ip=s["target_ip"],
auth="clé SSH" if s["auth_type"] == "key" else "mot de passe",
xmpp_jid=s["xmpp_jid"],
mqtt_host=s["mqtt_host"],
)
# ── confirm ───────────────────────────────────────────────────────────
elif state == "confirm":
if text.lower() in ("oui", "o", "yes", "y"):
session["state"] = "deploying"
return None # Signal pour lancer le déploiement en arrière-plan
else:
del sessions[jid]
return "Déploiement annulé."
return "État inconnu. Tapez !annuler pour recommencer."
# ── BOT XMPP ─────────────────────────────────────────────────────────────
class DeployBot(ClientXMPP):
def __init__(self):
ClientXMPP.__init__(self, XMPP_JID, XMPP_PASS)
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.on_message)
self.register_plugin('xep_0030')
self.register_plugin('xep_0199')
def reply(self, jid: str, body: str):
self.send_message(mto=jid, mbody=body, mtype='chat')
async def session_start(self, event):
self.send_presence()
await self.get_roster()
self.reply(ADMIN_JID, (
"Agent2_Deploy en ligne !\n"
"Tapez !deploy pour déployer un agent via SSH.\n"
"Tapez !agents pour voir les agents disponibles."
))
async def on_message(self, msg):
if msg['type'] not in ('chat', 'normal'):
return
jid = str(msg['from']).split('/')[0]
if jid != ADMIN_JID:
return
text = msg['body'].strip()
# Commandes globales
if text == "!agents":
self.reply(jid, list_agents())
return
if text == "!deploy":
self.reply(jid, start_session(jid))
return
if text == "!help":
self.reply(jid, (
"Commandes disponibles :\n"
" !deploy — Démarrer un déploiement guidé\n"
" !agents — Lister les agents déployables\n"
" !annuler — Annuler le déploiement en cours\n"
" !help — Afficher cette aide"
))
return
# Session de déploiement en cours ?
if jid in sessions:
reply = handle_session(jid, text)
if reply is not None:
self.reply(jid, reply)
else:
# reply=None signifie : lancer le déploiement
session = sessions[jid]
self.reply(jid, "Déploiement en cours... Cela peut prendre plusieurs minutes.")
def run_deploy():
def progress(msg):
self.reply(jid, "[{}]".format(msg))
catalog = load_catalog()
success, result = deploy_agent(
host = session["target_ip"],
ssh_user = session["ssh_user"],
agent_type = session["agent_type"],
agent_name = session["agent_name"],
xmpp_jid = session["xmpp_jid"],
xmpp_pass = session["xmpp_pass"],
mqtt_host = session["mqtt_host"],
progress_cb = progress,
ssh_password= session.get("ssh_password"),
ssh_key_path= session.get("ssh_key_path"),
)
if success:
speciality = catalog[session["agent_type"]]["description"]
notify_agent1(
agent_name = session["agent_name"],
agent_type = session["agent_type"],
host = session["target_ip"],
xmpp_jid = session["xmpp_jid"],
speciality = speciality,
)
self.reply(jid, result)
else:
self.reply(jid, "Déploiement échoué : " + result)
if jid in sessions:
del sessions[jid]
threading.Thread(target=run_deploy, daemon=True).start()
return
# Hors session → aide basique
self.reply(jid, (
"Je suis agent2_deploy, spécialisé dans le déploiement d'agents.\n"
"Tapez !deploy pour déployer un agent, ou !help pour l'aide."
))
# ── MAIN ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
mqtt_thread = threading.Thread(target=start_mqtt, daemon=True)
mqtt_thread.start()
bot = DeployBot()
bot.connect()
bot.loop.run_forever()