From 5d9a507b920947d745c6777bfbded49349de993c Mon Sep 17 00:00:00 2001 From: sylvain Date: Sun, 8 Mar 2026 15:38:57 +0000 Subject: [PATCH] File d'attente SQLite FIFO + pause/resume + rapport journalier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - task_queue.py : module FIFO persistant (queue.db), QoS 1 - agent2_debian13.py : intégration queue, topic agents/agent2_debian13/control commandes : pause / resume / report (stats envoyées sur agents/daily_report) Co-Authored-By: Claude Sonnet 4.6 --- agent2_debian13.py | 93 ++++++++++++++++++++++--------- task_queue.py | 135 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 202 insertions(+), 26 deletions(-) create mode 100644 task_queue.py diff --git a/agent2_debian13.py b/agent2_debian13.py index 0a5dbf8..8aa98d2 100644 --- a/agent2_debian13.py +++ b/agent2_debian13.py @@ -7,17 +7,20 @@ import threading import requests import json from pathlib import Path +from datetime import datetime from slixmpp import ClientXMPP import paho.mqtt.client as mqtt BASE_DIR = Path(__file__).parent.resolve() sys.path.insert(0, str(BASE_DIR)) from skills.loader import load_skills, run_skills +from task_queue import TaskQueue # ── CONFIG ─────────────────────────────────────────────────────────────── CONFIG_DIR = BASE_DIR / "config" CONFIG_FILE = CONFIG_DIR / "config.json" PROMPT_FILE = CONFIG_DIR / "system_prompt.txt" +QUEUE_DB = BASE_DIR / "queue.db" def load_config(): with open(CONFIG_FILE, "r", encoding="utf-8") as f: @@ -43,6 +46,7 @@ SYSTEM_PROMPT = load_system_prompt() load_skills() conversation_history = [] +start_time = datetime.now() # ── LLM ────────────────────────────────────────────────────────────────── def call_ollama(messages: list) -> str: @@ -78,18 +82,57 @@ def ask_llm(user_message: str, history: list = None) -> str: history.append({"role": "assistant", "content": err}) return err -# ── MQTT LISTENER ───────────────────────────────────────────────────────── +# ── MQTT ────────────────────────────────────────────────────────────────── mqtt_publish_client = None +task_queue: TaskQueue = None def mqtt_publish(topic: str, message: str): global mqtt_publish_client if mqtt_publish_client: mqtt_publish_client.publish(topic, message) +def _process_task(task: str) -> str: + """Worker appelé par la TaskQueue pour chaque tâche.""" + mqtt_history = [] + return ask_llm(task, history=mqtt_history) + +def on_mqtt_message(client, userdata, msg): + """Enqueue la tâche — le worker la traitera en FIFO.""" + task = msg.payload.decode(errors="replace") + print(f"[MQTT] Tâche reçue, mise en queue : {task[:100]}") + task_queue.enqueue(task) + +def on_control_message(client, userdata, msg): + """Gère les commandes de contrôle : pause / resume / report.""" + try: + data = json.loads(msg.payload.decode(errors="replace")) + command = data.get("command", "") + print(f"[CONTROL] Commande reçue : {command}") + + if command == "pause": + task_queue.pause() + + elif command == "resume": + task_queue.resume() + + elif command == "report": + stats = task_queue.get_stats() + uptime_s = int((datetime.now() - start_time).total_seconds()) + payload = json.dumps({ + "agent" : MQTT_CLIENT, + "timestamp" : datetime.now().isoformat(timespec='seconds'), + "uptime_s" : uptime_s, + "paused" : task_queue.paused, + **stats + }, ensure_ascii=False) + mqtt_publish("agents/daily_report", payload) + print(f"[CONTROL] Rapport envoyé sur agents/daily_report") + + except Exception as e: + print(f"[CONTROL] Erreur : {e}") + def register_to_agent1(): - """Publie une déclaration de mise en ligne sur agents/register.""" - import json as _json - payload = _json.dumps({ + payload = json.dumps({ "agent" : MQTT_CLIENT, "jid" : XMPP_JID, "mqtt_inbox": MQTT_INBOX, @@ -98,23 +141,12 @@ def register_to_agent1(): mqtt_publish("agents/register", payload) print("[REGISTER] Déclaration envoyée à agent1.") -def on_mqtt_message(client, userdata, msg): - task = msg.payload.decode(errors="replace") - print(f"[MQTT] Tâche reçue d'agent1 : {task[:100]}") - # Historique isolé par tâche MQTT (pas mélangé avec XMPP) - mqtt_history = [] - reply = ask_llm(task, history=mqtt_history) - print(f"[MQTT] Réponse envoyée : {reply[:100]}") - mqtt_publish(MQTT_OUTBOX, reply) - def start_mqtt_listener(): - global mqtt_publish_client + global mqtt_publish_client, task_queue - # Client dédié à la publication - import json as _json - _status_topic = "agents/status/{}".format(MQTT_CLIENT) - _offline_payload = _json.dumps({"status": "offline", "agent": MQTT_CLIENT}) - _online_payload = _json.dumps({ + _status_topic = "agents/status/{}".format(MQTT_CLIENT) + _offline_payload = json.dumps({"status": "offline", "agent": MQTT_CLIENT}) + _online_payload = json.dumps({ "status" : "online", "agent" : MQTT_CLIENT, "jid" : XMPP_JID, @@ -122,20 +154,31 @@ def start_mqtt_listener(): }) mqtt_publish_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, - client_id=MQTT_CLIENT + "_pub") + client_id=MQTT_CLIENT + "_pub", + clean_session=False) mqtt_publish_client.will_set(_status_topic, _offline_payload, retain=True) mqtt_publish_client.connect(MQTT_HOST, MQTT_PORT) mqtt_publish_client.loop_start() mqtt_publish_client.publish(_status_topic, _online_payload, retain=True) register_to_agent1() - # Client dédié à la souscription + # Initialiser et démarrer la queue + task_queue = TaskQueue(QUEUE_DB, _process_task, mqtt_publish, MQTT_OUTBOX) + task_queue.start_worker() + + # Client souscription (QoS 1 + session persistante) sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, - client_id=MQTT_CLIENT + "_sub") + client_id=MQTT_CLIENT + "_sub", + clean_session=False) + sub_client.message_callback_add("agents/{}/control".format(MQTT_CLIENT), + on_control_message) sub_client.on_message = on_mqtt_message sub_client.connect(MQTT_HOST, MQTT_PORT) - sub_client.subscribe(MQTT_INBOX) - print(f"[MQTT] Écoute sur {MQTT_INBOX}") + sub_client.subscribe([ + (MQTT_INBOX, 1), + ("agents/{}/control".format(MQTT_CLIENT), 1), + ]) + print(f"[MQTT] Écoute sur {MQTT_INBOX} + agents/{MQTT_CLIENT}/control") sub_client.loop_forever() # ── BOT XMPP ───────────────────────────────────────────────────────────── @@ -175,11 +218,9 @@ class AgentBot(ClientXMPP): # ── MAIN ───────────────────────────────────────────────────────────────── if __name__ == "__main__": - # Lancer le listener MQTT dans un thread séparé mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True) mqtt_thread.start() - # Lancer le bot XMPP bot = AgentBot() bot.connect() bot.loop.run_forever() diff --git a/task_queue.py b/task_queue.py new file mode 100644 index 0000000..f05ec8e --- /dev/null +++ b/task_queue.py @@ -0,0 +1,135 @@ +""" +File d'attente SQLite FIFO pour tâches MQTT. + +Chaque agent instancie un TaskQueue avec : + - db_path : chemin vers queue.db + - worker_fn : function(task: str) -> str (ex: ask_llm) + - publish_fn : function(topic: str, message: str) + - outbox : topic MQTT de réponse +""" +import sqlite3 +import threading +import time +from pathlib import Path +from datetime import datetime + + +class TaskQueue: + def __init__(self, db_path: Path, worker_fn, publish_fn, outbox: str): + self.db_path = db_path + self.worker_fn = worker_fn + self.publish_fn = publish_fn + self.outbox = outbox + self.paused = False + self._event = threading.Event() + self._init_db() + + def _init_db(self): + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS tasks_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + received_at TEXT NOT NULL, + started_at TEXT, + completed_at TEXT, + task TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + result TEXT, + duration_s REAL + ) + """) + conn.commit() + + def enqueue(self, task: str): + """Ajoute une tâche en base et réveille le worker.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute( + "INSERT INTO tasks_queue (received_at, task, status) VALUES (?, ?, 'pending')", + (datetime.now().isoformat(timespec='seconds'), task) + ) + conn.commit() + self._event.set() + + def pause(self): + self.paused = True + print("[QUEUE] Mise en pause — tâches en attente conservées.") + + def resume(self): + self.paused = False + print("[QUEUE] Reprise.") + self._event.set() + + def start_worker(self): + t = threading.Thread(target=self._worker_loop, daemon=True) + t.start() + + def _worker_loop(self): + while True: + self._event.wait() + self._event.clear() + while not self.paused: + row = self._get_next_pending() + if not row: + break + self._process(row) + + def _get_next_pending(self): + with sqlite3.connect(self.db_path) as conn: + return conn.execute( + "SELECT id, task FROM tasks_queue WHERE status='pending' ORDER BY id LIMIT 1" + ).fetchone() + + def _process(self, row): + task_id, task = row + started_at = datetime.now().isoformat(timespec='seconds') + with sqlite3.connect(self.db_path) as conn: + conn.execute( + "UPDATE tasks_queue SET status='running', started_at=? WHERE id=?", + (started_at, task_id) + ) + conn.commit() + + start = time.time() + try: + result = self.worker_fn(task) + status = 'success' + except Exception as e: + result = str(e) + status = 'error' + duration = round(time.time() - start, 2) + + completed_at = datetime.now().isoformat(timespec='seconds') + with sqlite3.connect(self.db_path) as conn: + conn.execute( + "UPDATE tasks_queue SET status=?, completed_at=?, result=?, duration_s=? WHERE id=?", + (status, completed_at, str(result)[:2000], duration, task_id) + ) + conn.commit() + + self.publish_fn(self.outbox, str(result)) + print("[QUEUE] Tâche #{} {} ({:.1f}s) : {}".format(task_id, status, duration, task[:60])) + + def get_stats(self) -> dict: + """Stats du jour (minuit → maintenant).""" + today = datetime.now().strftime('%Y-%m-%d') + with sqlite3.connect(self.db_path) as conn: + row = conn.execute(""" + SELECT + COUNT(*) AS total, + SUM(CASE WHEN status='success' THEN 1 ELSE 0 END) AS success, + SUM(CASE WHEN status='error' THEN 1 ELSE 0 END) AS errors, + AVG(duration_s) AS avg_dur, + MAX(CASE WHEN status='error' THEN result ELSE NULL END) AS last_error, + SUM(CASE WHEN status='pending' OR status='running' + THEN 1 ELSE 0 END) AS pending + FROM tasks_queue + WHERE received_at LIKE ? + """, (today + '%',)).fetchone() + return { + 'tasks_today' : row[0] or 0, + 'success' : row[1] or 0, + 'errors' : row[2] or 0, + 'avg_duration_s': round(row[3] or 0, 2), + 'last_error' : row[4], + 'pending' : row[5] or 0, + }