File d'attente SQLite FIFO + pause/resume + rapport journalier

- task_queue.py : module FIFO persistant (queue.db), QoS 1
- agent2_ansible.py : intégration queue, topic agents/agent2_ansible/control
  commandes : pause / resume / report (stats envoyées sur agents/daily_report)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-08 15:39:01 +00:00
parent 2c7b9faad2
commit 59c49060aa
2 changed files with 202 additions and 26 deletions
+67 -26
View File
@@ -7,17 +7,20 @@ import threading
import requests
import json
from pathlib import Path
from datetime import datetime
from slixmpp import ClientXMPP
import paho.mqtt.client as mqtt
BASE_DIR = Path(__file__).parent.resolve()
sys.path.insert(0, str(BASE_DIR))
from skills.loader import load_skills, run_skills
from task_queue import TaskQueue
# ── CONFIG ───────────────────────────────────────────────────────────────
CONFIG_DIR = BASE_DIR / "config"
CONFIG_FILE = CONFIG_DIR / "config.json"
PROMPT_FILE = CONFIG_DIR / "system_prompt.txt"
QUEUE_DB = BASE_DIR / "queue.db"
def load_config():
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
@@ -43,6 +46,7 @@ SYSTEM_PROMPT = load_system_prompt()
load_skills()
conversation_history = []
start_time = datetime.now()
# ── LLM ──────────────────────────────────────────────────────────────────
def call_ollama(messages: list) -> str:
@@ -78,18 +82,57 @@ def ask_llm(user_message: str, history: list = None) -> str:
history.append({"role": "assistant", "content": err})
return err
# ── MQTT LISTENER ─────────────────────────────────────────────────────────
# ── MQTT ──────────────────────────────────────────────────────────────────
mqtt_publish_client = None
task_queue: TaskQueue = None
def mqtt_publish(topic: str, message: str):
global mqtt_publish_client
if mqtt_publish_client:
mqtt_publish_client.publish(topic, message)
def _process_task(task: str) -> str:
"""Worker appelé par la TaskQueue pour chaque tâche."""
mqtt_history = []
return ask_llm(task, history=mqtt_history)
def on_mqtt_message(client, userdata, msg):
"""Enqueue la tâche — le worker la traitera en FIFO."""
task = msg.payload.decode(errors="replace")
print(f"[MQTT] Tâche reçue, mise en queue : {task[:100]}")
task_queue.enqueue(task)
def on_control_message(client, userdata, msg):
"""Gère les commandes de contrôle : pause / resume / report."""
try:
data = json.loads(msg.payload.decode(errors="replace"))
command = data.get("command", "")
print(f"[CONTROL] Commande reçue : {command}")
if command == "pause":
task_queue.pause()
elif command == "resume":
task_queue.resume()
elif command == "report":
stats = task_queue.get_stats()
uptime_s = int((datetime.now() - start_time).total_seconds())
payload = json.dumps({
"agent" : MQTT_CLIENT,
"timestamp" : datetime.now().isoformat(timespec='seconds'),
"uptime_s" : uptime_s,
"paused" : task_queue.paused,
**stats
}, ensure_ascii=False)
mqtt_publish("agents/daily_report", payload)
print(f"[CONTROL] Rapport envoyé sur agents/daily_report")
except Exception as e:
print(f"[CONTROL] Erreur : {e}")
def register_to_agent1():
"""Publie une déclaration de mise en ligne sur agents/register."""
import json as _json
payload = _json.dumps({
payload = json.dumps({
"agent" : MQTT_CLIENT,
"jid" : XMPP_JID,
"mqtt_inbox": MQTT_INBOX,
@@ -98,23 +141,12 @@ def register_to_agent1():
mqtt_publish("agents/register", payload)
print("[REGISTER] Déclaration envoyée à agent1.")
def on_mqtt_message(client, userdata, msg):
task = msg.payload.decode(errors="replace")
print(f"[MQTT] Tâche reçue d'agent1 : {task[:100]}")
# Historique isolé par tâche MQTT (pas mélangé avec XMPP)
mqtt_history = []
reply = ask_llm(task, history=mqtt_history)
print(f"[MQTT] Réponse envoyée : {reply[:100]}")
mqtt_publish(MQTT_OUTBOX, reply)
def start_mqtt_listener():
global mqtt_publish_client
global mqtt_publish_client, task_queue
# Client dédié à la publication
import json as _json
_status_topic = "agents/status/{}".format(MQTT_CLIENT)
_offline_payload = _json.dumps({"status": "offline", "agent": MQTT_CLIENT})
_online_payload = _json.dumps({
_status_topic = "agents/status/{}".format(MQTT_CLIENT)
_offline_payload = json.dumps({"status": "offline", "agent": MQTT_CLIENT})
_online_payload = json.dumps({
"status" : "online",
"agent" : MQTT_CLIENT,
"jid" : XMPP_JID,
@@ -122,20 +154,31 @@ def start_mqtt_listener():
})
mqtt_publish_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id=MQTT_CLIENT + "_pub")
client_id=MQTT_CLIENT + "_pub",
clean_session=False)
mqtt_publish_client.will_set(_status_topic, _offline_payload, retain=True)
mqtt_publish_client.connect(MQTT_HOST, MQTT_PORT)
mqtt_publish_client.loop_start()
mqtt_publish_client.publish(_status_topic, _online_payload, retain=True)
register_to_agent1()
# Client dédié à la souscription
# Initialiser et démarrer la queue
task_queue = TaskQueue(QUEUE_DB, _process_task, mqtt_publish, MQTT_OUTBOX)
task_queue.start_worker()
# Client souscription (QoS 1 + session persistante)
sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id=MQTT_CLIENT + "_sub")
client_id=MQTT_CLIENT + "_sub",
clean_session=False)
sub_client.message_callback_add("agents/{}/control".format(MQTT_CLIENT),
on_control_message)
sub_client.on_message = on_mqtt_message
sub_client.connect(MQTT_HOST, MQTT_PORT)
sub_client.subscribe(MQTT_INBOX)
print(f"[MQTT] Écoute sur {MQTT_INBOX}")
sub_client.subscribe([
(MQTT_INBOX, 1),
("agents/{}/control".format(MQTT_CLIENT), 1),
])
print(f"[MQTT] Écoute sur {MQTT_INBOX} + agents/{MQTT_CLIENT}/control")
sub_client.loop_forever()
# ── BOT XMPP ─────────────────────────────────────────────────────────────
@@ -175,11 +218,9 @@ class AgentBot(ClientXMPP):
# ── MAIN ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Lancer le listener MQTT dans un thread séparé
mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True)
mqtt_thread.start()
# Lancer le bot XMPP
bot = AgentBot()
bot.connect()
bot.loop.run_forever()
+135
View File
@@ -0,0 +1,135 @@
"""
File d'attente SQLite FIFO pour tâches MQTT.
Chaque agent instancie un TaskQueue avec :
- db_path : chemin vers queue.db
- worker_fn : function(task: str) -> str (ex: ask_llm)
- publish_fn : function(topic: str, message: str)
- outbox : topic MQTT de réponse
"""
import sqlite3
import threading
import time
from pathlib import Path
from datetime import datetime
class TaskQueue:
def __init__(self, db_path: Path, worker_fn, publish_fn, outbox: str):
self.db_path = db_path
self.worker_fn = worker_fn
self.publish_fn = publish_fn
self.outbox = outbox
self.paused = False
self._event = threading.Event()
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
task TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
result TEXT,
duration_s REAL
)
""")
conn.commit()
def enqueue(self, task: str):
"""Ajoute une tâche en base et réveille le worker."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT INTO tasks_queue (received_at, task, status) VALUES (?, ?, 'pending')",
(datetime.now().isoformat(timespec='seconds'), task)
)
conn.commit()
self._event.set()
def pause(self):
self.paused = True
print("[QUEUE] Mise en pause — tâches en attente conservées.")
def resume(self):
self.paused = False
print("[QUEUE] Reprise.")
self._event.set()
def start_worker(self):
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
def _worker_loop(self):
while True:
self._event.wait()
self._event.clear()
while not self.paused:
row = self._get_next_pending()
if not row:
break
self._process(row)
def _get_next_pending(self):
with sqlite3.connect(self.db_path) as conn:
return conn.execute(
"SELECT id, task FROM tasks_queue WHERE status='pending' ORDER BY id LIMIT 1"
).fetchone()
def _process(self, row):
task_id, task = row
started_at = datetime.now().isoformat(timespec='seconds')
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"UPDATE tasks_queue SET status='running', started_at=? WHERE id=?",
(started_at, task_id)
)
conn.commit()
start = time.time()
try:
result = self.worker_fn(task)
status = 'success'
except Exception as e:
result = str(e)
status = 'error'
duration = round(time.time() - start, 2)
completed_at = datetime.now().isoformat(timespec='seconds')
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"UPDATE tasks_queue SET status=?, completed_at=?, result=?, duration_s=? WHERE id=?",
(status, completed_at, str(result)[:2000], duration, task_id)
)
conn.commit()
self.publish_fn(self.outbox, str(result))
print("[QUEUE] Tâche #{} {} ({:.1f}s) : {}".format(task_id, status, duration, task[:60]))
def get_stats(self) -> dict:
"""Stats du jour (minuit → maintenant)."""
today = datetime.now().strftime('%Y-%m-%d')
with sqlite3.connect(self.db_path) as conn:
row = conn.execute("""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status='success' THEN 1 ELSE 0 END) AS success,
SUM(CASE WHEN status='error' THEN 1 ELSE 0 END) AS errors,
AVG(duration_s) AS avg_dur,
MAX(CASE WHEN status='error' THEN result ELSE NULL END) AS last_error,
SUM(CASE WHEN status='pending' OR status='running'
THEN 1 ELSE 0 END) AS pending
FROM tasks_queue
WHERE received_at LIKE ?
""", (today + '%',)).fetchone()
return {
'tasks_today' : row[0] or 0,
'success' : row[1] or 0,
'errors' : row[2] or 0,
'avg_duration_s': round(row[3] or 0, 2),
'last_error' : row[4],
'pending' : row[5] or 0,
}