File d'attente SQLite FIFO + pause/resume + rapport journalier

- task_queue.py : module FIFO persistant (queue.db), QoS 1
- agent2_debian13.py : intégration queue, topic agents/agent2_debian13/control
  commandes : pause / resume / report (stats envoyées sur agents/daily_report)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-08 15:38:57 +00:00
parent e31f429c17
commit 5d9a507b92
2 changed files with 202 additions and 26 deletions
+67 -26
View File
@@ -7,17 +7,20 @@ import threading
import requests import requests
import json import json
from pathlib import Path from pathlib import Path
from datetime import datetime
from slixmpp import ClientXMPP from slixmpp import ClientXMPP
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
BASE_DIR = Path(__file__).parent.resolve() BASE_DIR = Path(__file__).parent.resolve()
sys.path.insert(0, str(BASE_DIR)) sys.path.insert(0, str(BASE_DIR))
from skills.loader import load_skills, run_skills from skills.loader import load_skills, run_skills
from task_queue import TaskQueue
# ── CONFIG ─────────────────────────────────────────────────────────────── # ── CONFIG ───────────────────────────────────────────────────────────────
CONFIG_DIR = BASE_DIR / "config" CONFIG_DIR = BASE_DIR / "config"
CONFIG_FILE = CONFIG_DIR / "config.json" CONFIG_FILE = CONFIG_DIR / "config.json"
PROMPT_FILE = CONFIG_DIR / "system_prompt.txt" PROMPT_FILE = CONFIG_DIR / "system_prompt.txt"
QUEUE_DB = BASE_DIR / "queue.db"
def load_config(): def load_config():
with open(CONFIG_FILE, "r", encoding="utf-8") as f: with open(CONFIG_FILE, "r", encoding="utf-8") as f:
@@ -43,6 +46,7 @@ SYSTEM_PROMPT = load_system_prompt()
load_skills() load_skills()
conversation_history = [] conversation_history = []
start_time = datetime.now()
# ── LLM ────────────────────────────────────────────────────────────────── # ── LLM ──────────────────────────────────────────────────────────────────
def call_ollama(messages: list) -> str: def call_ollama(messages: list) -> str:
@@ -78,18 +82,57 @@ def ask_llm(user_message: str, history: list = None) -> str:
history.append({"role": "assistant", "content": err}) history.append({"role": "assistant", "content": err})
return err return err
# ── MQTT LISTENER ───────────────────────────────────────────────────────── # ── MQTT ──────────────────────────────────────────────────────────────────
mqtt_publish_client = None mqtt_publish_client = None
task_queue: TaskQueue = None
def mqtt_publish(topic: str, message: str): def mqtt_publish(topic: str, message: str):
global mqtt_publish_client global mqtt_publish_client
if mqtt_publish_client: if mqtt_publish_client:
mqtt_publish_client.publish(topic, message) mqtt_publish_client.publish(topic, message)
def _process_task(task: str) -> str:
"""Worker appelé par la TaskQueue pour chaque tâche."""
mqtt_history = []
return ask_llm(task, history=mqtt_history)
def on_mqtt_message(client, userdata, msg):
"""Enqueue la tâche — le worker la traitera en FIFO."""
task = msg.payload.decode(errors="replace")
print(f"[MQTT] Tâche reçue, mise en queue : {task[:100]}")
task_queue.enqueue(task)
def on_control_message(client, userdata, msg):
"""Gère les commandes de contrôle : pause / resume / report."""
try:
data = json.loads(msg.payload.decode(errors="replace"))
command = data.get("command", "")
print(f"[CONTROL] Commande reçue : {command}")
if command == "pause":
task_queue.pause()
elif command == "resume":
task_queue.resume()
elif command == "report":
stats = task_queue.get_stats()
uptime_s = int((datetime.now() - start_time).total_seconds())
payload = json.dumps({
"agent" : MQTT_CLIENT,
"timestamp" : datetime.now().isoformat(timespec='seconds'),
"uptime_s" : uptime_s,
"paused" : task_queue.paused,
**stats
}, ensure_ascii=False)
mqtt_publish("agents/daily_report", payload)
print(f"[CONTROL] Rapport envoyé sur agents/daily_report")
except Exception as e:
print(f"[CONTROL] Erreur : {e}")
def register_to_agent1(): def register_to_agent1():
"""Publie une déclaration de mise en ligne sur agents/register.""" payload = json.dumps({
import json as _json
payload = _json.dumps({
"agent" : MQTT_CLIENT, "agent" : MQTT_CLIENT,
"jid" : XMPP_JID, "jid" : XMPP_JID,
"mqtt_inbox": MQTT_INBOX, "mqtt_inbox": MQTT_INBOX,
@@ -98,23 +141,12 @@ def register_to_agent1():
mqtt_publish("agents/register", payload) mqtt_publish("agents/register", payload)
print("[REGISTER] Déclaration envoyée à agent1.") print("[REGISTER] Déclaration envoyée à agent1.")
def on_mqtt_message(client, userdata, msg):
task = msg.payload.decode(errors="replace")
print(f"[MQTT] Tâche reçue d'agent1 : {task[:100]}")
# Historique isolé par tâche MQTT (pas mélangé avec XMPP)
mqtt_history = []
reply = ask_llm(task, history=mqtt_history)
print(f"[MQTT] Réponse envoyée : {reply[:100]}")
mqtt_publish(MQTT_OUTBOX, reply)
def start_mqtt_listener(): def start_mqtt_listener():
global mqtt_publish_client global mqtt_publish_client, task_queue
# Client dédié à la publication _status_topic = "agents/status/{}".format(MQTT_CLIENT)
import json as _json _offline_payload = json.dumps({"status": "offline", "agent": MQTT_CLIENT})
_status_topic = "agents/status/{}".format(MQTT_CLIENT) _online_payload = json.dumps({
_offline_payload = _json.dumps({"status": "offline", "agent": MQTT_CLIENT})
_online_payload = _json.dumps({
"status" : "online", "status" : "online",
"agent" : MQTT_CLIENT, "agent" : MQTT_CLIENT,
"jid" : XMPP_JID, "jid" : XMPP_JID,
@@ -122,20 +154,31 @@ def start_mqtt_listener():
}) })
mqtt_publish_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, mqtt_publish_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id=MQTT_CLIENT + "_pub") client_id=MQTT_CLIENT + "_pub",
clean_session=False)
mqtt_publish_client.will_set(_status_topic, _offline_payload, retain=True) mqtt_publish_client.will_set(_status_topic, _offline_payload, retain=True)
mqtt_publish_client.connect(MQTT_HOST, MQTT_PORT) mqtt_publish_client.connect(MQTT_HOST, MQTT_PORT)
mqtt_publish_client.loop_start() mqtt_publish_client.loop_start()
mqtt_publish_client.publish(_status_topic, _online_payload, retain=True) mqtt_publish_client.publish(_status_topic, _online_payload, retain=True)
register_to_agent1() register_to_agent1()
# Client dédié à la souscription # Initialiser et démarrer la queue
task_queue = TaskQueue(QUEUE_DB, _process_task, mqtt_publish, MQTT_OUTBOX)
task_queue.start_worker()
# Client souscription (QoS 1 + session persistante)
sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id=MQTT_CLIENT + "_sub") client_id=MQTT_CLIENT + "_sub",
clean_session=False)
sub_client.message_callback_add("agents/{}/control".format(MQTT_CLIENT),
on_control_message)
sub_client.on_message = on_mqtt_message sub_client.on_message = on_mqtt_message
sub_client.connect(MQTT_HOST, MQTT_PORT) sub_client.connect(MQTT_HOST, MQTT_PORT)
sub_client.subscribe(MQTT_INBOX) sub_client.subscribe([
print(f"[MQTT] Écoute sur {MQTT_INBOX}") (MQTT_INBOX, 1),
("agents/{}/control".format(MQTT_CLIENT), 1),
])
print(f"[MQTT] Écoute sur {MQTT_INBOX} + agents/{MQTT_CLIENT}/control")
sub_client.loop_forever() sub_client.loop_forever()
# ── BOT XMPP ───────────────────────────────────────────────────────────── # ── BOT XMPP ─────────────────────────────────────────────────────────────
@@ -175,11 +218,9 @@ class AgentBot(ClientXMPP):
# ── MAIN ───────────────────────────────────────────────────────────────── # ── MAIN ─────────────────────────────────────────────────────────────────
if __name__ == "__main__": if __name__ == "__main__":
# Lancer le listener MQTT dans un thread séparé
mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True) mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True)
mqtt_thread.start() mqtt_thread.start()
# Lancer le bot XMPP
bot = AgentBot() bot = AgentBot()
bot.connect() bot.connect()
bot.loop.run_forever() bot.loop.run_forever()
+135
View File
@@ -0,0 +1,135 @@
"""
File d'attente SQLite FIFO pour tâches MQTT.
Chaque agent instancie un TaskQueue avec :
- db_path : chemin vers queue.db
- worker_fn : function(task: str) -> str (ex: ask_llm)
- publish_fn : function(topic: str, message: str)
- outbox : topic MQTT de réponse
"""
import sqlite3
import threading
import time
from pathlib import Path
from datetime import datetime
class TaskQueue:
def __init__(self, db_path: Path, worker_fn, publish_fn, outbox: str):
self.db_path = db_path
self.worker_fn = worker_fn
self.publish_fn = publish_fn
self.outbox = outbox
self.paused = False
self._event = threading.Event()
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
task TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
result TEXT,
duration_s REAL
)
""")
conn.commit()
def enqueue(self, task: str):
"""Ajoute une tâche en base et réveille le worker."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT INTO tasks_queue (received_at, task, status) VALUES (?, ?, 'pending')",
(datetime.now().isoformat(timespec='seconds'), task)
)
conn.commit()
self._event.set()
def pause(self):
self.paused = True
print("[QUEUE] Mise en pause — tâches en attente conservées.")
def resume(self):
self.paused = False
print("[QUEUE] Reprise.")
self._event.set()
def start_worker(self):
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
def _worker_loop(self):
while True:
self._event.wait()
self._event.clear()
while not self.paused:
row = self._get_next_pending()
if not row:
break
self._process(row)
def _get_next_pending(self):
with sqlite3.connect(self.db_path) as conn:
return conn.execute(
"SELECT id, task FROM tasks_queue WHERE status='pending' ORDER BY id LIMIT 1"
).fetchone()
def _process(self, row):
task_id, task = row
started_at = datetime.now().isoformat(timespec='seconds')
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"UPDATE tasks_queue SET status='running', started_at=? WHERE id=?",
(started_at, task_id)
)
conn.commit()
start = time.time()
try:
result = self.worker_fn(task)
status = 'success'
except Exception as e:
result = str(e)
status = 'error'
duration = round(time.time() - start, 2)
completed_at = datetime.now().isoformat(timespec='seconds')
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"UPDATE tasks_queue SET status=?, completed_at=?, result=?, duration_s=? WHERE id=?",
(status, completed_at, str(result)[:2000], duration, task_id)
)
conn.commit()
self.publish_fn(self.outbox, str(result))
print("[QUEUE] Tâche #{} {} ({:.1f}s) : {}".format(task_id, status, duration, task[:60]))
def get_stats(self) -> dict:
"""Stats du jour (minuit → maintenant)."""
today = datetime.now().strftime('%Y-%m-%d')
with sqlite3.connect(self.db_path) as conn:
row = conn.execute("""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status='success' THEN 1 ELSE 0 END) AS success,
SUM(CASE WHEN status='error' THEN 1 ELSE 0 END) AS errors,
AVG(duration_s) AS avg_dur,
MAX(CASE WHEN status='error' THEN result ELSE NULL END) AS last_error,
SUM(CASE WHEN status='pending' OR status='running'
THEN 1 ELSE 0 END) AS pending
FROM tasks_queue
WHERE received_at LIKE ?
""", (today + '%',)).fetchone()
return {
'tasks_today' : row[0] or 0,
'success' : row[1] or 0,
'errors' : row[2] or 0,
'avg_duration_s': round(row[3] or 0, 2),
'last_error' : row[4],
'pending' : row[5] or 0,
}