#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import sys import threading import requests import json from pathlib import Path from slixmpp import ClientXMPP import paho.mqtt.client as mqtt BASE_DIR = Path(__file__).parent.resolve() sys.path.insert(0, str(BASE_DIR)) from skills.loader import load_skills, run_skills # ── CONFIG ─────────────────────────────────────────────────────────────── CONFIG_DIR = BASE_DIR / "config" CONFIG_FILE = CONFIG_DIR / "config.json" PROMPT_FILE = CONFIG_DIR / "system_prompt.txt" def load_config(): with open(CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) def load_system_prompt(): with open(PROMPT_FILE, "r", encoding="utf-8") as f: return f.read() cfg = load_config() OLLAMA_URL = cfg["ollama_url"] MODEL = cfg["model"] XMPP_JID = cfg["xmpp_jid"] XMPP_PASS = cfg["xmpp_pass"] ADMIN_JID = cfg["admin_jid"] MQTT_HOST = cfg["mqtt_host"] MQTT_PORT = int(cfg["mqtt_port"]) MQTT_CLIENT = cfg["mqtt_client_id"] MQTT_INBOX = cfg["mqtt_inbox"] MQTT_OUTBOX = cfg["mqtt_outbox"] SYSTEM_PROMPT = load_system_prompt() load_skills() conversation_history = [] # ── LLM ────────────────────────────────────────────────────────────────── def call_ollama(messages: list) -> str: payload = { "model" : MODEL, "messages": messages, "stream" : False, "options" : {"temperature": 0.3} } response = requests.post(OLLAMA_URL, json=payload, timeout=180) return response.json()["message"]["content"] def ask_llm(user_message: str, history: list = None) -> str: if history is None: history = conversation_history history.append({"role": "user", "content": user_message}) messages = [{"role": "system", "content": SYSTEM_PROMPT}] + history try: MAX_STEPS = 5 for _ in range(MAX_STEPS): reply = call_ollama(messages) skill_triggered, result = run_skills(reply) if not skill_triggered: history.append({"role": "assistant", "content": reply}) return reply messages.append({"role": "assistant", "content": reply}) messages.append({"role": "user", "content": "[Résultat skill]\n" + result}) reply = call_ollama(messages) history.append({"role": "assistant", "content": reply}) return reply except Exception as e: err = "Erreur : " + str(e) history.append({"role": "assistant", "content": err}) return err # ── MQTT LISTENER ───────────────────────────────────────────────────────── mqtt_publish_client = None def mqtt_publish(topic: str, message: str): global mqtt_publish_client if mqtt_publish_client: mqtt_publish_client.publish(topic, message) def register_to_agent1(): """Publie une déclaration de mise en ligne sur agents/register.""" import json as _json payload = _json.dumps({ "agent" : MQTT_CLIENT, "jid" : XMPP_JID, "mqtt_inbox": MQTT_INBOX, "speciality": "Automatisation infrastructure via Ansible : playbooks, commandes ad-hoc, déploiement multi-hôtes, gestion de configuration sur le réseau local", }) mqtt_publish("agents/register", payload) print("[REGISTER] Déclaration envoyée à agent1.") def on_mqtt_message(client, userdata, msg): task = msg.payload.decode(errors="replace") print(f"[MQTT] Tâche reçue d'agent1 : {task[:100]}") # Historique isolé par tâche MQTT (pas mélangé avec XMPP) mqtt_history = [] reply = ask_llm(task, history=mqtt_history) print(f"[MQTT] Réponse envoyée : {reply[:100]}") mqtt_publish(MQTT_OUTBOX, reply) def start_mqtt_listener(): global mqtt_publish_client # Client dédié à la publication import json as _json _status_topic = "agents/status/{}".format(MQTT_CLIENT) _offline_payload = _json.dumps({"status": "offline", "agent": MQTT_CLIENT}) _online_payload = _json.dumps({ "status" : "online", "agent" : MQTT_CLIENT, "jid" : XMPP_JID, "mqtt_inbox": MQTT_INBOX, }) mqtt_publish_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=MQTT_CLIENT + "_pub") mqtt_publish_client.will_set(_status_topic, _offline_payload, retain=True) mqtt_publish_client.connect(MQTT_HOST, MQTT_PORT) mqtt_publish_client.loop_start() mqtt_publish_client.publish(_status_topic, _online_payload, retain=True) register_to_agent1() # Client dédié à la souscription sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=MQTT_CLIENT + "_sub") sub_client.on_message = on_mqtt_message sub_client.connect(MQTT_HOST, MQTT_PORT) sub_client.subscribe(MQTT_INBOX) print(f"[MQTT] Écoute sur {MQTT_INBOX}") sub_client.loop_forever() # ── BOT XMPP ───────────────────────────────────────────────────────────── class AgentBot(ClientXMPP): def __init__(self): ClientXMPP.__init__(self, XMPP_JID, XMPP_PASS) self.add_event_handler("session_start", self.session_start) self.add_event_handler("message", self.message) self.register_plugin('xep_0030') self.register_plugin('xep_0199') async def session_start(self, event): self.send_presence() await self.get_roster() self.send_message(mto=ADMIN_JID, mbody="Agent2_Ansible en ligne !", mtype='chat') async def message(self, msg): if msg['type'] not in ('chat', 'normal'): return if str(msg['from']).split('/')[0] != ADMIN_JID: return user_input = msg['body'].strip() if user_input == "!reset": conversation_history.clear() self.send_message(mto=ADMIN_JID, mbody="Conversation reinitialisee.", mtype='chat') return loop = asyncio.get_event_loop() reply = await loop.run_in_executor(None, ask_llm, user_input) self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat') # ── MAIN ───────────────────────────────────────────────────────────────── if __name__ == "__main__": # Lancer le listener MQTT dans un thread séparé mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True) mqtt_thread.start() # Lancer le bot XMPP bot = AgentBot() bot.connect() bot.loop.run_forever()