Files
sylvain 59c49060aa File d'attente SQLite FIFO + pause/resume + rapport journalier
- task_queue.py : module FIFO persistant (queue.db), QoS 1
- agent2_ansible.py : intégration queue, topic agents/agent2_ansible/control
  commandes : pause / resume / report (stats envoyées sur agents/daily_report)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 15:39:01 +00:00

136 lines
4.9 KiB
Python

"""
File d'attente SQLite FIFO pour tâches MQTT.
Chaque agent instancie un TaskQueue avec :
- db_path : chemin vers queue.db
- worker_fn : function(task: str) -> str (ex: ask_llm)
- publish_fn : function(topic: str, message: str)
- outbox : topic MQTT de réponse
"""
import sqlite3
import threading
import time
from pathlib import Path
from datetime import datetime
class TaskQueue:
def __init__(self, db_path: Path, worker_fn, publish_fn, outbox: str):
self.db_path = db_path
self.worker_fn = worker_fn
self.publish_fn = publish_fn
self.outbox = outbox
self.paused = False
self._event = threading.Event()
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
task TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
result TEXT,
duration_s REAL
)
""")
conn.commit()
def enqueue(self, task: str):
"""Ajoute une tâche en base et réveille le worker."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT INTO tasks_queue (received_at, task, status) VALUES (?, ?, 'pending')",
(datetime.now().isoformat(timespec='seconds'), task)
)
conn.commit()
self._event.set()
def pause(self):
self.paused = True
print("[QUEUE] Mise en pause — tâches en attente conservées.")
def resume(self):
self.paused = False
print("[QUEUE] Reprise.")
self._event.set()
def start_worker(self):
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
def _worker_loop(self):
while True:
self._event.wait()
self._event.clear()
while not self.paused:
row = self._get_next_pending()
if not row:
break
self._process(row)
def _get_next_pending(self):
with sqlite3.connect(self.db_path) as conn:
return conn.execute(
"SELECT id, task FROM tasks_queue WHERE status='pending' ORDER BY id LIMIT 1"
).fetchone()
def _process(self, row):
task_id, task = row
started_at = datetime.now().isoformat(timespec='seconds')
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"UPDATE tasks_queue SET status='running', started_at=? WHERE id=?",
(started_at, task_id)
)
conn.commit()
start = time.time()
try:
result = self.worker_fn(task)
status = 'success'
except Exception as e:
result = str(e)
status = 'error'
duration = round(time.time() - start, 2)
completed_at = datetime.now().isoformat(timespec='seconds')
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"UPDATE tasks_queue SET status=?, completed_at=?, result=?, duration_s=? WHERE id=?",
(status, completed_at, str(result)[:2000], duration, task_id)
)
conn.commit()
self.publish_fn(self.outbox, str(result))
print("[QUEUE] Tâche #{} {} ({:.1f}s) : {}".format(task_id, status, duration, task[:60]))
def get_stats(self) -> dict:
"""Stats du jour (minuit → maintenant)."""
today = datetime.now().strftime('%Y-%m-%d')
with sqlite3.connect(self.db_path) as conn:
row = conn.execute("""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status='success' THEN 1 ELSE 0 END) AS success,
SUM(CASE WHEN status='error' THEN 1 ELSE 0 END) AS errors,
AVG(duration_s) AS avg_dur,
MAX(CASE WHEN status='error' THEN result ELSE NULL END) AS last_error,
SUM(CASE WHEN status='pending' OR status='running'
THEN 1 ELSE 0 END) AS pending
FROM tasks_queue
WHERE received_at LIKE ?
""", (today + '%',)).fetchone()
return {
'tasks_today' : row[0] or 0,
'success' : row[1] or 0,
'errors' : row[2] or 0,
'avg_duration_s': round(row[3] or 0, 2),
'last_error' : row[4],
'pending' : row[5] or 0,
}