""" File d'attente SQLite FIFO pour tâches MQTT. Chaque agent instancie un TaskQueue avec : - db_path : chemin vers queue.db - worker_fn : function(task: str) -> str (ex: ask_llm) - publish_fn : function(topic: str, message: str) - outbox : topic MQTT de réponse """ import sqlite3 import threading import time from pathlib import Path from datetime import datetime class TaskQueue: def __init__(self, db_path: Path, worker_fn, publish_fn, outbox: str): self.db_path = db_path self.worker_fn = worker_fn self.publish_fn = publish_fn self.outbox = outbox self.paused = False self._event = threading.Event() self._init_db() def _init_db(self): with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS tasks_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, received_at TEXT NOT NULL, started_at TEXT, completed_at TEXT, task TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', result TEXT, duration_s REAL ) """) conn.commit() def enqueue(self, task: str): """Ajoute une tâche en base et réveille le worker.""" with sqlite3.connect(self.db_path) as conn: conn.execute( "INSERT INTO tasks_queue (received_at, task, status) VALUES (?, ?, 'pending')", (datetime.now().isoformat(timespec='seconds'), task) ) conn.commit() self._event.set() def pause(self): self.paused = True print("[QUEUE] Mise en pause — tâches en attente conservées.") def resume(self): self.paused = False print("[QUEUE] Reprise.") self._event.set() def start_worker(self): t = threading.Thread(target=self._worker_loop, daemon=True) t.start() def _worker_loop(self): while True: self._event.wait() self._event.clear() while not self.paused: row = self._get_next_pending() if not row: break self._process(row) def _get_next_pending(self): with sqlite3.connect(self.db_path) as conn: return conn.execute( "SELECT id, task FROM tasks_queue WHERE status='pending' ORDER BY id LIMIT 1" ).fetchone() def _process(self, row): task_id, task = row started_at = datetime.now().isoformat(timespec='seconds') with sqlite3.connect(self.db_path) as conn: conn.execute( "UPDATE tasks_queue SET status='running', started_at=? WHERE id=?", (started_at, task_id) ) conn.commit() start = time.time() try: result = self.worker_fn(task) status = 'success' except Exception as e: result = str(e) status = 'error' duration = round(time.time() - start, 2) completed_at = datetime.now().isoformat(timespec='seconds') with sqlite3.connect(self.db_path) as conn: conn.execute( "UPDATE tasks_queue SET status=?, completed_at=?, result=?, duration_s=? WHERE id=?", (status, completed_at, str(result)[:2000], duration, task_id) ) conn.commit() self.publish_fn(self.outbox, str(result)) print("[QUEUE] Tâche #{} {} ({:.1f}s) : {}".format(task_id, status, duration, task[:60])) def get_stats(self) -> dict: """Stats du jour (minuit → maintenant).""" today = datetime.now().strftime('%Y-%m-%d') with sqlite3.connect(self.db_path) as conn: row = conn.execute(""" SELECT COUNT(*) AS total, SUM(CASE WHEN status='success' THEN 1 ELSE 0 END) AS success, SUM(CASE WHEN status='error' THEN 1 ELSE 0 END) AS errors, AVG(duration_s) AS avg_dur, MAX(CASE WHEN status='error' THEN result ELSE NULL END) AS last_error, SUM(CASE WHEN status='pending' OR status='running' THEN 1 ELSE 0 END) AS pending FROM tasks_queue WHERE received_at LIKE ? """, (today + '%',)).fetchone() return { 'tasks_today' : row[0] or 0, 'success' : row[1] or 0, 'errors' : row[2] or 0, 'avg_duration_s': round(row[3] or 0, 2), 'last_error' : row[4], 'pending' : row[5] or 0, }