Ajout pause/resume/stats pour commandes !agentON/OFF

- Contrôle via topic agents/agent2_deploy/control (pause/resume/report)
- Stats de déploiements (total/success/errors) envoyées sur agents/daily_report
- Vérification flag paused avant démarrage d'un déploiement

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
root
2026-03-08 15:42:00 +00:00
parent ae0064a2a7
commit 7f8086444b
+60 -3
View File
@@ -12,6 +12,7 @@ import asyncio
import json import json
import threading import threading
from pathlib import Path from pathlib import Path
from datetime import datetime
from slixmpp import ClientXMPP from slixmpp import ClientXMPP
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
@@ -33,6 +34,11 @@ MQTT_PORT = int(cfg["mqtt_port"])
MQTT_CLIENT = cfg["mqtt_client_id"] MQTT_CLIENT = cfg["mqtt_client_id"]
AGENT1_INBOX = cfg["agent1_inbox"] AGENT1_INBOX = cfg["agent1_inbox"]
# ── ÉTAT GLOBAL ───────────────────────────────────────────────────────────
paused = False
start_time = datetime.now()
deploy_stats = {"total": 0, "success": 0, "errors": 0}
# ── MQTT ────────────────────────────────────────────────────────────────── # ── MQTT ──────────────────────────────────────────────────────────────────
_mqtt_pub = None _mqtt_pub = None
@@ -40,12 +46,47 @@ def mqtt_publish(topic: str, message: str):
if _mqtt_pub: if _mqtt_pub:
_mqtt_pub.publish(topic, message) _mqtt_pub.publish(topic, message)
def on_control_message(client, userdata, msg):
"""Gère les commandes de contrôle : pause / resume / report."""
global paused
try:
data = json.loads(msg.payload.decode(errors="replace"))
command = data.get("command", "")
print(f"[CONTROL] Commande reçue : {command}")
if command == "pause":
paused = True
print("[CONTROL] Mis en pause.")
elif command == "resume":
paused = False
print("[CONTROL] Reprise.")
elif command == "report":
uptime_s = int((datetime.now() - start_time).total_seconds())
payload = json.dumps({
"agent" : MQTT_CLIENT,
"timestamp" : datetime.now().isoformat(timespec='seconds'),
"uptime_s" : uptime_s,
"paused" : paused,
"tasks_today" : deploy_stats["total"],
"success" : deploy_stats["success"],
"errors" : deploy_stats["errors"],
"avg_duration_s": 0,
"last_error" : None,
"pending" : 0,
}, ensure_ascii=False)
mqtt_publish("agents/daily_report", payload)
print("[CONTROL] Rapport envoyé.")
except Exception as e:
print(f"[CONTROL] Erreur : {e}")
def start_mqtt(): def start_mqtt():
global _mqtt_pub global _mqtt_pub
import json as _json
_status_topic = "agents/status/{}".format(MQTT_CLIENT) _status_topic = "agents/status/{}".format(MQTT_CLIENT)
_offline_payload = _json.dumps({"status": "offline", "agent": MQTT_CLIENT}) _offline_payload = json.dumps({"status": "offline", "agent": MQTT_CLIENT})
_online_payload = _json.dumps({ _online_payload = json.dumps({
"status" : "online", "status" : "online",
"agent" : MQTT_CLIENT, "agent" : MQTT_CLIENT,
"jid" : XMPP_JID, "jid" : XMPP_JID,
@@ -60,6 +101,15 @@ def start_mqtt():
_mqtt_pub.publish(_status_topic, _online_payload, retain=True) _mqtt_pub.publish(_status_topic, _online_payload, retain=True)
register_to_agent1() register_to_agent1()
# Souscription au topic de contrôle
_ctrl_sub = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id=MQTT_CLIENT + "_ctrl")
_ctrl_sub.on_message = on_control_message
_ctrl_sub.connect(MQTT_HOST, MQTT_PORT)
_ctrl_sub.subscribe("agents/{}/control".format(MQTT_CLIENT), qos=1)
_ctrl_sub.loop_start()
print("[MQTT] Écoute sur agents/{}/control".format(MQTT_CLIENT))
def register_to_agent1(): def register_to_agent1():
"""Publie une déclaration de mise en ligne sur agents/register.""" """Publie une déclaration de mise en ligne sur agents/register."""
import json as _json import json as _json
@@ -317,6 +367,9 @@ class DeployBot(ClientXMPP):
return return
if text == "!deploy": if text == "!deploy":
if paused:
self.reply(jid, "[PAUSE] Agent2_Deploy est en pause. Envoyez !agentON agent2_deploy à agent1 pour reprendre.")
return
self.reply(jid, start_session(jid)) self.reply(jid, start_session(jid))
return return
@@ -342,6 +395,8 @@ class DeployBot(ClientXMPP):
self.reply(jid, "Déploiement en cours... Cela peut prendre plusieurs minutes.") self.reply(jid, "Déploiement en cours... Cela peut prendre plusieurs minutes.")
def run_deploy(): def run_deploy():
deploy_stats["total"] += 1
def progress(msg): def progress(msg):
self.reply(jid, "[{}]".format(msg)) self.reply(jid, "[{}]".format(msg))
@@ -360,6 +415,7 @@ class DeployBot(ClientXMPP):
) )
if success: if success:
deploy_stats["success"] += 1
speciality = catalog[session["agent_type"]]["description"] speciality = catalog[session["agent_type"]]["description"]
notify_agent1( notify_agent1(
agent_name = session["agent_name"], agent_name = session["agent_name"],
@@ -370,6 +426,7 @@ class DeployBot(ClientXMPP):
) )
self.reply(jid, result) self.reply(jid, result)
else: else:
deploy_stats["errors"] += 1
self.reply(jid, "Déploiement échoué : " + result) self.reply(jid, "Déploiement échoué : " + result)
if jid in sessions: if jid in sessions: