Compare commits

...

8 Commits

Author SHA1 Message Date
sylvain 0fe1ece68d Support agents distants (SSH) pour !agentUPDATE/UPGRADE
- agent_update.py : _run_ssh() via sshpass, dispatche local ou SSH selon ssh_host
- agent1.py : _get_agent_git_info() transmet ssh_host/ssh_user depuis le registre
- agents_registry.json : agent2_test → ssh_host: 192.168.7.13, ssh_user: root

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 16:11:43 +00:00
sylvain 1565b145dc !agentUPDATE/UPGRADE : fallback automatique sur /opt/<nom> si install_path absent
_get_agent_git_info déduit le chemin depuis le nom de l'agent par convention
(/opt/<nom>, service <nom>) au lieu de bloquer avec une erreur.
!agentsUPDATE/UPGRADE couvre tous les agents du registre sans filtre.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 16:06:14 +00:00
sylvain 3575b391b6 Ajout !agentUPDATE/UPGRADE : mises à jour agents depuis git
- skills/agent_update.py : check_update (git fetch + log) et do_upgrade (git pull + systemctl restart)
- agent1.py : commandes !agentUPDATE <nom>, !agentsUPDATE, !agentUPGRADE <nom>, !agentsUPGRADE
  - _handle_agent_command retourne (handled, reply) pour gérer le self-upgrade agent1
  - !agentUPGRADE agent1 : envoie la réponse XMPP avant systemctl restart
  - !agentsUPGRADE : met à jour tous les agents puis agent1 en dernier
- agents_registry.json : ajout install_path, service_name, git_branch + entrée agent1
- README.md : documentation des nouvelles commandes
- TODO.md : tâches marquées comme terminées

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 15:55:31 +00:00
sylvain 1c951f46f1 TODO : ajout gestion mises à jour agents (git update/upgrade) 2026-03-08 15:49:21 +00:00
sylvain 37e881bc39 README : documentation complète de toutes les commandes 2026-03-08 15:45:08 +00:00
sylvain d765a8457a TODO.md : marquer les tâches batch 1-3 comme terminées 2026-03-08 15:42:48 +00:00
sylvain 60a216d565 Batch 3 : commandes !agentON/OFF, mode veille, rapports journaliers
agent1.py :
  - !agentOFF/ON <nom> : pause/resume d'un agent via MQTT control
  - !agentsOFF/ON : mode veille agent1 + pause/resume tous les agents
  - Confirmation en attente pour modif config (PENDING_CONFIG)
  - !reports / !tasks / !blackout : afficher les configs
  - APScheduler : sollicitation rapports + rapport journalier automatique
  - Souscription agents/daily_report : stockage des rapports reçus
  - on_mqtt_register : préserve work_hours lors des mises à jour registre

skills/daily_report.py :
  - DAILY_REPORT: [agent] : compile les rapports journaliers reçus
  - Formatage uptime, stats, taux de succès

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 15:41:54 +00:00
sylvain 9e3aa29d74 Batch 2 : file d'attente SQLite + pause/resume + stats par agent
- task_queue.py : module FIFO SQLite (queue.db) pour agent2_debian13 et agent2_ansible
- agent2_debian13.py : intégration queue, topic agents/agent2_debian13/control,
  commandes pause/resume/report, QoS 1 + clean_session=False
- agent2_ansible.py : idem

Les tâches MQTT sont maintenant persistées avant exécution.
Le worker traite en FIFO, les tâches en pause s'accumulent sans être perdues.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 15:34:45 +00:00
7 changed files with 1067 additions and 144 deletions
+207 -54
View File
@@ -2,38 +2,221 @@
Agent principal du réseau. Il reçoit les instructions de l'utilisateur (via XMPP ou CLI MQTT), les analyse avec un LLM, et délègue les tâches aux agents spécialisés.
## Rôle
---
- Point d'entrée unique pour l'utilisateur (`sylvain@xmpp.ovh`)
- Délègue aux agents spécialisés via MQTT (`DELEGATE: <agent> | <tâche>`)
- Planifie des tâches récurrentes (`SCHEDULE:`)
- Centralise les rapports et erreurs des agents
- Maintient un registre des agents disponibles
## Commandes XMPP (sylvain@xmpp.ovh → agent1)
## Skills disponibles
### Contrôle des agents
| Trigger | Description |
| Commande | Effet |
|---|---|
| `SEARCH: <requête>` | Recherche DuckDuckGo |
| `READ: <url>` | Lecture de page web |
| `REMEMBER: <clé> \| <valeur>` | Mémoire SQLite |
| `RECALL: <clé>` | Récupération mémoire |
| `MQTT_PUBLISH: <topic> \| <msg>` | Publication MQTT |
| `DELEGATE: <agent> \| <tâche>` | Délégation à un agent spécialisé |
| `PLAN: <agent>\|<tâche> ;; <agent>\|<tâche>` | Plan multi-étapes |
| `SCHEDULE: <cron> \| <agent> \| <tâche>` | Planification |
| `PLAN_LIST:` | Lister les tâches planifiées |
| `PLAN_CANCEL: <id>` | Annuler une planification |
| `REPORT:` | Rapport d'exécutions |
| `REPORT_ERRORS:` | Rapport des erreurs |
| `!agentsOFF` | Agent1 en veille + pause de tous les agents |
| `!agentsON` | Agent1 actif + reprise de tous les agents |
| `!agentOFF <nom>` | Pause d'un agent spécifique (ex: `!agentOFF agent2_debian13`) |
| `!agentON <nom>` | Reprise d'un agent spécifique |
| `!agentOFF agent1` | Agent1 en veille uniquement (les autres agents continuent) |
| `!agentON agent1` | Agent1 sort de veille |
> En mode veille, agent1 reste connecté XMPP et répond uniquement aux commandes `!agentON`.
### Mises à jour git
| Commande | Effet |
|---|---|
| `!agentUPDATE <nom>` | Vérifie si une mise à jour est disponible sur le dépôt git de l'agent |
| `!agentsUPDATE` | Vérifie les dépôts de tous les agents enregistrés |
| `!agentUPGRADE <nom>` | `git pull` + `systemctl restart` de l'agent |
| `!agentsUPGRADE` | `git pull` + restart de tous les agents (agent1 en dernier) |
> `!agentUPGRADE agent1` redémarre agent1 lui-même via systemd. La réponse XMPP est envoyée avant le redémarrage.
### Affichage des configurations
| Commande | Effet |
|---|---|
| `!reports` | Afficher `reports_schedule.json` (horaires des rapports) |
| `!tasks` | Afficher `tasks_schedule.json` (tâches planifiées) |
| `!blackout` | Afficher `blackout_hours.json` (plages d'inactivité) |
### Divers
| Commande | Effet |
|---|---|
| `!reset` | Réinitialiser l'historique de conversation |
---
## Skills LLM (utilisables en XMPP ou CLI)
Ces commandes sont générées par le LLM en réponse à une instruction en langage naturel.
### Délégation et planification
| Trigger | Syntaxe | Description |
|---|---|---|
| `DELEGATE:` | `DELEGATE: <agent> \| <tâche>` | Délègue une tâche à un agent (avec vérif. plage horaire + blackout) |
| `PLAN:` | `PLAN: <agent>\|<tâche> ;; <agent>\|<tâche>` | Exécute plusieurs tâches en séquence |
| `SCHEDULE:` | `SCHEDULE: <cron> \| <agent> \| <tâche>` | Planifie une tâche récurrente |
| `PLAN_LIST:` | `PLAN_LIST:` | Lister les tâches planifiées |
| `PLAN_CANCEL:` | `PLAN_CANCEL: <id>` | Annuler une planification |
**Expressions cron supportées :**
```
daily 03:00 → tous les jours à 03h00
every 6h → toutes les 6 heures
every 30min → toutes les 30 minutes
weekly lun 08:00 → chaque lundi à 08h00
```
### Rapports
| Trigger | Syntaxe | Description |
|---|---|---|
| `REPORT:` | `REPORT: [agent]` | Rapport des 20 dernières exécutions |
| `REPORT_ERRORS:` | `REPORT_ERRORS: [agent]` | Rapport des 20 dernières erreurs |
| `DAILY_REPORT:` | `DAILY_REPORT: [agent]` | Rapport journalier compilé (stats de tous les agents) |
### Mémoire et recherche
| Trigger | Syntaxe | Description |
|---|---|---|
| `SEARCH:` | `SEARCH: <requête>` | Recherche DuckDuckGo |
| `READ:` | `READ: <url>` | Lecture de page web |
| `REMEMBER:` | `REMEMBER: <clé> \| <valeur>` | Mémoriser une information (SQLite) |
| `RECALL:` | `RECALL: <clé>` | Récupérer une information mémorisée |
### MQTT direct
| Trigger | Syntaxe | Description |
|---|---|---|
| `MQTT_PUBLISH:` | `MQTT_PUBLISH: <topic> \| <message>` | Publier un message MQTT |
| `MQTT_SUBSCRIBE:` | `MQTT_SUBSCRIBE: <topic>` | Écouter un topic MQTT |
---
## CLI (`cli.py`)
```bash
python3 /opt/agent/cli.py # parler à agent1
python3 /opt/agent/cli.py agent2_debian13 # accès direct à un agent
python3 /opt/agent/cli.py --plans # voir les planifications
```
**Commandes dans la CLI :**
| Commande | Effet |
|---|---|
| `/reset` | Réinitialiser la conversation |
| `/plans` | Lister les tâches planifiées |
| `/report` | Rapport des dernières exécutions |
| `/errors` | Rapport des erreurs |
| `/agent <nom>` | Basculer vers un autre agent |
| `/quit` | Quitter la CLI |
---
## Fichiers de configuration
| Fichier | Rôle | Modifié par |
|---|---|---|
| `config/config.json` | Paramètres XMPP, MQTT, Ollama | Utilisateur |
| `config/agents_registry.json` | Registre des agents (auto-mis à jour) | Agent1 au démarrage des agents |
| `config/blackout_hours.json` | Plages horaires d'inactivité totale | **Utilisateur directement** |
| `config/reports_schedule.json` | Horaires de sollicitation des rapports | Agent1 (avec confirmation) |
| `config/tasks_schedule.json` | Tâches planifiées par agent | Agent1 (avec confirmation) |
### `blackout_hours.json`
Aucun agent ne travaille pendant ces plages, quelle que soit la planification :
```json
[
{ "start": "02:00", "end": "05:00", "label": "maintenance nuit", "enabled": true }
]
```
### `reports_schedule.json`
```json
{
"agents": {
"agent2_debian13": { "report_time": "22:00", "enabled": true },
"agent2_ansible": { "report_time": "22:05", "enabled": true },
"agent2_deploy": { "report_time": "22:10", "enabled": true }
},
"daily_report_time": "22:30",
"daily_report_enabled": true
}
```
### `tasks_schedule.json`
```json
{
"tasks": [
{
"agent": "agent2_debian13",
"task": "apt update && apt upgrade -y",
"cron": "weekly lun 03:00",
"enabled": true
}
]
}
```
### `agents_registry.json` — champ `work_hours`
Plage horaire de travail par agent (vérifiée avant chaque DELEGATE) :
```json
"work_hours": {
"start": "07:00",
"end": "23:00",
"days": ["mon","tue","wed","thu","fri","sat","sun"],
"enabled": true
}
```
---
## Topics MQTT
| Topic | Direction | Rôle |
|---|---|---|
| `agents/agent1/inbox` | CLI / agents → agent1 | Tâches pour agent1 |
| `agents/<name>/inbox` | Agent1 → agent2_* | Tâches pour les agents |
| `agents/<name>/control` | Agent1 → agent2_* | Pause / Resume / Report |
| `agents/daily_report` | Agent2_* → agent1 | Rapports journaliers |
| `agents/errors` | Agent2_* → agent1 | Erreurs (notif XMPP) |
| `agents/register` | Agent2_* → agent1 | Enregistrement au démarrage |
| `agents/status/<name>` | Agent2_* → agent1 | Statut en ligne / hors ligne (LWT) |
| `agents/cli/outbox` | Agent1 → CLI | Réponses vers la CLI |
| `agents/scheduler/notifications` | Scheduler → agent1 | Notifications planificateur |
---
## Lancer les agents
```bash
# Agent1
python3 /opt/agent/agent1.py > /tmp/agent1.log 2>&1 &
# Agents spécialisés
python3 /opt/agent2_debian13/agent2_debian13.py > /tmp/agent2_debian13.log 2>&1 &
python3 /opt/agent2_ansible/agent2_ansible.py > /tmp/agent2_ansible.log 2>&1 &
python3 /opt/agent2_deploy/agent2_deploy.py > /tmp/agent2_deploy.log 2>&1 &
# Via systemd
systemctl start agent agent2_debian13 agent2_ansible agent2_deploy
```
## Logs
```bash
journalctl -u agent -f
journalctl -u agent2_debian13 -f
```
---
## Déploiement manuel
```bash
# Prérequis
apt-get install -y python3 python3-pip python3-venv git mosquitto
# Cloner et installer
git clone https://git.piaf.im/sylvain/agent1.git /opt/agent
cd /opt/agent
python3 -m venv venv
@@ -44,35 +227,5 @@ cp config/config.json.example config/config.json # adapter les valeurs
# Service systemd
cp agent.service /etc/systemd/system/
systemctl daemon-reload
systemctl enable agent
systemctl start agent
```
## Configuration (`config/config.json`)
```json
{
"ollama_url" : "http://<ip_ollama>:11434/api/chat",
"model" : "qwen3:8b",
"xmpp_jid" : "agent1@xmpp.ovh",
"xmpp_pass" : "<mot_de_passe>",
"admin_jid" : "sylvain@xmpp.ovh",
"mqtt_host" : "localhost",
"mqtt_port" : 1883
}
```
## CLI
```bash
python3 /opt/agent/cli.py # parler à agent1
python3 /opt/agent/cli.py agent2_debian13 # accès direct à un agent
python3 /opt/agent/cli.py --plans # voir les planifications
```
## Logs
```bash
journalctl -u agent -f
systemctl daemon-reload && systemctl enable agent && systemctl start agent
```
+82 -50
View File
@@ -3,81 +3,61 @@
## 1. Fichiers de configuration (dans `/opt/agent/config/`)
### `reports_schedule.json` _(géré par agent1 avec confirmation utilisateur)_
- [ ] Créer le fichier avec horaires de sollicitation des rapports par agent
- [ ] Agent1 : commande XMPP pour modifier ce fichier (confirmation avant écriture)
- [ ] Agent1 : planifier les demandes de rapport selon ce fichier (APScheduler)
- [ ] Chaque agent2_* : répondre à la demande de rapport avec ses stats locales
- [x] Créer le fichier avec horaires de sollicitation des rapports par agent
- [x] Agent1 : commande !reports pour afficher, PENDING_CONFIG pour confirmation modification
- [x] Agent1 : planifier les demandes de rapport selon ce fichier (APScheduler)
- [x] Chaque agent2_* : répondre à la demande de rapport avec ses stats locales
### `tasks_schedule.json` _(géré par agent1 avec confirmation utilisateur)_
- [ ] Créer le fichier avec les tâches planifiées par agent
- [ ] Agent1 : commande XMPP pour ajouter/modifier/supprimer une tâche (confirmation avant écriture)
- [ ] Agent1 : charger ce fichier au démarrage et planifier les tâches via APScheduler
- [ ] Remplacer / compléter l'actuel `SCHEDULE:` skill
- [x] Créer le fichier avec les tâches planifiées par agent
- [x] Agent1 : !tasks pour afficher, PENDING_CONFIG pour modification avec confirmation
- [x] Agent1 : charger ce fichier au démarrage et planifier les tâches via APScheduler
### `blackout_hours.json` _(édité directement par l'utilisateur)_
- [ ] Créer le fichier avec plage(s) horaire(s) de blackout (aucun agent ne travaille)
- [ ] Format : `[{"start": "02:00", "end": "05:00", "label": "maintenance"}]`
- [ ] Agent1 : vérifier ce fichier avant chaque délégation de tâche
- [ ] Agent1 : vérifier ce fichier avant chaque tâche planifiée
- [x] Créer le fichier avec plage(s) horaire(s) de blackout (aucun agent ne travaille)
- [x] Agent1 : vérifier ce fichier avant chaque délégation de tâche (delegate.py)
---
## 2. Plages horaires par agent (`work_hours` dans `agents_registry.json`)
- [ ] Ajouter `work_hours: {start, end, days}` pour chaque agent dans `agents_registry.json`
- [ ] Modifier `skills/delegate.py` : vérifier plage horaire avant d'envoyer la tâche
- [ ] Si hors plage → retourner message d'indisponibilité (pas d'exécution)
- [ ] Prendre en compte le blackout_hours.json également dans delegate.py
- [x] Ajouter `work_hours: {start, end, days}` pour chaque agent dans `agents_registry.json`
- [x] Modifier `skills/delegate.py` : vérifier plage horaire avant d'envoyer la tâche
- [x] Si hors plage → retourner message d'indisponibilité (pas d'exécution)
- [x] Prendre en compte le blackout_hours.json également dans delegate.py
---
## 3. File d'attente locale par agent (SQLite)
### Pour chaque agent2_* (`/opt/agent2_*/queue.db`)
- [ ] Créer table `tasks_queue(id, received_at, started_at, completed_at, task, status, result, duration_s)`
- [ ] `on_mqtt_message()` : sauvegarder immédiatement le message en base (status: `pending`)
- [ ] Worker FIFO dans un thread séparé : traiter les tâches une par une
- [ ] Si paused : worker s'arrête, tâches s'accumulent en base
- [ ] Au resume : worker reprend depuis les tâches `pending`
- [ ] MQTT : passer à `clean_session=False` + `QoS=1` pour ne pas perdre les messages offline
- [x] Créer table `tasks_queue(id, received_at, started_at, completed_at, task, status, result, duration_s)`
- [x] `on_mqtt_message()` : sauvegarder immédiatement le message en base (status: `pending`)
- [x] Worker FIFO dans un thread séparé : traiter les tâches une par une
- [x] Si paused : worker s'arrête, tâches s'accumulent en base
- [x] Au resume : worker reprend depuis les tâches `pending`
- [x] MQTT : passer à `clean_session=False` + `QoS=1` pour ne pas perdre les messages offline
---
## 4. Mode pause / veille par agent
### Nouveau topic MQTT : `agents/<name>/control`
- [ ] Chaque agent2_* : s'abonner à `agents/<name>/control`
- [ ] Payload `{"command": "pause"}` → flag `self.paused = True`, stopper le worker
- [ ] Payload `{"command": "resume"}` → flag `self.paused = False`, relancer le worker
- [ ] Agent1 en veille : rester connecté XMPP, n'accepter que `!agentsON` / `!agentON agent1`
### Topic MQTT : `agents/<name>/control`
- [x] Chaque agent2_* : s'abonner à `agents/<name>/control`
- [x] pause → worker stoppé, resume → worker relancé
- [x] Agent1 en veille : n'accepte que `!agentsON` / `!agentON agent1`
### Commandes XMPP (sylvain → agent1)
- [ ] `!agentOFF <nom>` → envoyer pause à l'agent ciblé
- [ ] `!agentON <nom>` → envoyer resume à l'agent ciblé
- [ ] `!agentsOFF` → agent1 en veille + pause à tous les agent2_*
- [ ] `!agentsON` → agent1 sort de veille + resume à tous les agent2_*
- [ ] `!agentOFF agent1` → agent1 en veille uniquement
- [ ] Mettre à jour `agents_online.json` à chaque changement d'état
- [x] `!agentOFF <nom>` / `!agentON <nom>`
- [x] `!agentsOFF` / `!agentsON`
- [x] `!agentOFF agent1` / `!agentON agent1` (veille agent1 uniquement)
---
## 5. Rapports journaliers
### Chaque agent2_* (stats locales)
- [ ] Tracker en mémoire : `tasks_total`, `tasks_success`, `tasks_error`, `avg_duration_s`, `uptime_s`, `last_error`
- [ ] Alimenter ces stats depuis la `queue.db`
- [ ] Répondre à la demande de rapport d'agent1 via MQTT (`agents/<name>/control` payload `{"command": "report"}`)
- [ ] Publier le rapport sur `agents/daily_report` avec toutes les stats
### Agent1 (compilation)
- [ ] S'abonner à `agents/daily_report`
- [ ] Stocker les rapports reçus en mémoire + SQLite (`daily_reports` table dans `executions.db`)
- [ ] Nouveau skill `skills/daily_report.py` : trigger `DAILY_REPORT:`
- [ ] Compiler les rapports de tous les agents
- [ ] Formater un résumé lisible
- [ ] Envoyer à `sylvain@xmpp.ovh` via XMPP
- [ ] Stocker pour historique
- [ ] Planifier `DAILY_REPORT:` dans `tasks_schedule.json` (ex: 22:30)
- [ ] Disponible aussi à la demande : `DAILY_REPORT:` depuis CLI
- [x] Chaque agent2_* : stats depuis queue.db, envoi sur `agents/daily_report`
- [x] Agent1 : souscription `agents/daily_report`, stockage en mémoire
- [x] `skills/daily_report.py` : DAILY_REPORT: [agent]
- [x] APScheduler : sollicitation agents à 22:00/22:05/22:10, rapport journalier à 22:30
---
@@ -92,6 +72,58 @@
---
---
## 6. Gestion des mises à jour depuis les dépôts git
### Commandes XMPP (sylvain → agent1)
| Commande | Effet |
|---|---|
| `!agentUPDATE <nom>` | Vérifie si une mise à jour est disponible sur le dépôt git de l'agent (git fetch + comparaison) |
| `!agentsUPDATE` | Vérifie les dépôts de tous les agents enregistrés |
| `!agentUPGRADE <nom>` | Tire les modifications (git pull) + redémarre le service systemd de l'agent |
| `!agentsUPGRADE` | Git pull + restart de tous les agents |
### Comportement attendu
- `!agentUPDATE` : affiche le résultat de `git fetch` + `git log HEAD..origin/main --oneline`
- Réponse : "Mise à jour disponible : X commit(s)" ou "Déjà à jour"
- `!agentUPGRADE` :
1. `git pull origin main` dans le répertoire de l'agent
2. `systemctl restart <service_name>` pour relancer le service
3. Confirmation ou erreur envoyée via XMPP
4. Attendre que le service soit de nouveau EN LIGNE (statut MQTT) avant de confirmer
### Informations nécessaires par agent
Ajouter dans `agents_registry.json` :
```json
"agent2_debian13": {
"install_path" : "/opt/agent2_debian13",
"service_name" : "agent2_debian13",
"git_branch" : "main"
}
```
### Fichiers à modifier / créer
| Fichier | Action |
|---|---|
| `config/agents_registry.json` | Ajouter `install_path`, `service_name`, `git_branch` par agent |
| `agent1.py` | Gérer `!agentUPDATE/UPGRADE` et `!agentsUPDATE/UPGRADE` |
| `skills/agent_update.py` | **Créer** — logique git fetch/pull + systemctl restart |
### Points d'attention
- [x] Exécuter `git` et `systemctl` via subprocess
- [x] Timeout sur les commandes git/systemctl
- [x] `!agentUPGRADE agent1` : git pull + réponse XMPP envoyée avant `systemctl restart agent`
- [x] Gérer le cas où `install_path` n'est pas dans le registre
- [x] `!agentUPDATE` suggère `!agentUPGRADE` si commits disponibles
---
## Fichiers impactés
| Fichier | Action |
+422 -35
View File
@@ -6,9 +6,12 @@ import sys
import threading
import requests
import json
import time
from pathlib import Path
from datetime import datetime
from slixmpp import ClientXMPP
import paho.mqtt.client as mqtt
from apscheduler.schedulers.background import BackgroundScheduler
sys.path.insert(0, "/opt/agent")
from skills.loader import load_skills, run_skills
@@ -40,14 +43,30 @@ SYSTEM_PROMPT = load_system_prompt()
load_skills()
conversation_history = []
xmpp_bot = None # référence globale pour répondre via XMPP depuis MQTT
AGENTS_ONLINE = {} # {agent_name: {status, jid, mqtt_inbox, last_seen}}
xmpp_bot = None
AGENTS_ONLINE = {}
REGISTRY_FILE = CONFIG_DIR / "agents_registry.json"
REGISTRY_FILE = CONFIG_DIR / "agents_registry.json"
AGENTS_ONLINE_FILE = CONFIG_DIR / "agents_online.json"
REPORTS_FILE = CONFIG_DIR / "reports_schedule.json"
TASKS_FILE = CONFIG_DIR / "tasks_schedule.json"
BLACKOUT_FILE = CONFIG_DIR / "blackout_hours.json"
# ── MODE VEILLE ───────────────────────────────────────────────────────────
SLEEP_MODE = False
# ── CONFIRMATION CONFIG EN ATTENTE ────────────────────────────────────────
# {"file": "reports_schedule" | "tasks_schedule", "content": dict, "description": str}
PENDING_CONFIG = None
# ── RAPPORTS JOURNALIERS (reçus des agents) ───────────────────────────────
daily_reports = {} # {agent_name: payload_dict}
# ── SCHEDULER ─────────────────────────────────────────────────────────────
scheduler = BackgroundScheduler()
# ── AGENTS CONTEXT ────────────────────────────────────────────────────────
def _get_agents_context() -> str:
"""Construit dynamiquement la liste des agents (registre + statut en ligne)."""
try:
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
except Exception:
@@ -96,21 +115,345 @@ def ask_llm(user_message: str, history: list = None) -> str:
history.append({"role": "assistant", "content": err})
return err
# ── MQTT LISTENER (pour CLI) ──────────────────────────────────────────────
# ── MQTT ──────────────────────────────────────────────────────────────────
mqtt_pub_client = None
def mqtt_publish(topic: str, message: str):
if mqtt_pub_client:
mqtt_pub_client.publish(topic, message)
def _send_control(agent_name: str, command: str):
"""Envoie une commande de contrôle à un agent via MQTT."""
payload = json.dumps({"command": command})
mqtt_publish("agents/{}/control".format(agent_name), payload)
print("[CONTROL] {}{}".format(agent_name, command))
def _get_all_agents() -> list:
"""Retourne la liste de tous les agents connus (registre)."""
try:
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
return [n for n in registry if n != "agent1"]
except Exception:
return []
# ── COMMANDES !agent ──────────────────────────────────────────────────────
def _handle_agent_command(text: str) -> tuple:
"""
Gère toutes les commandes !agent*.
Retourne (handled: bool, reply: str|None).
reply=None signifie que la réponse XMPP a déjà été envoyée (ex: self-upgrade).
"""
global SLEEP_MODE
t = text.strip()
# !agentsOFF — veille agent1 + pause tous les agents
if t == "!agentsOFF":
agents = _get_all_agents()
for a in agents:
_send_control(a, "pause")
SLEEP_MODE = True
return True, "[VEILLE] Agent1 en veille. {} agent(s) mis en pause.\nEnvoyez !agentsON ou !agentON agent1 pour reprendre.".format(len(agents))
# !agentsON — sortie veille agent1 + resume tous les agents
if t == "!agentsON":
agents = _get_all_agents()
for a in agents:
_send_control(a, "resume")
SLEEP_MODE = False
return True, "[ACTIF] Agent1 actif. {} agent(s) relancés.".format(len(agents))
# !agentOFF <nom>
if t.startswith("!agentOFF "):
name = t[len("!agentOFF "):].strip()
if name == "agent1":
SLEEP_MODE = True
return True, "[VEILLE] Agent1 en veille. Envoyez !agentON agent1 pour reprendre."
_send_control(name, "pause")
return True, "[PAUSE] Commande pause envoyée à {}.".format(name)
# !agentON <nom>
if t.startswith("!agentON "):
name = t[len("!agentON "):].strip()
if name == "agent1":
SLEEP_MODE = False
return True, "[ACTIF] Agent1 actif."
_send_control(name, "resume")
return True, "[ACTIF] Commande resume envoyée à {}.".format(name)
# !agentsUPDATE — vérifier les mises à jour de tous les agents
if t == "!agentsUPDATE":
return True, _handle_update_all()
# !agentUPDATE <nom>
if t.startswith("!agentUPDATE "):
name = t[len("!agentUPDATE "):].strip()
return True, _handle_update_one(name)
# !agentsUPGRADE — mettre à jour tous les agents
if t == "!agentsUPGRADE":
return True, _handle_upgrade_all()
# !agentUPGRADE <nom>
if t.startswith("!agentUPGRADE "):
name = t[len("!agentUPGRADE "):].strip()
return True, _handle_upgrade_one(name)
return False, None
# ── MISE À JOUR DEPUIS GIT ───────────────────────────────────────────────
def _get_agent_git_info(name: str) -> dict:
"""
Retourne {install_path, service_name, git_branch} pour un agent.
Priorité : registre → convention /opt/<nom> (fallback automatique).
"""
try:
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
agent = registry.get(name, {})
except Exception:
agent = {}
# Fallback : déduire depuis le nom de l'agent (convention /opt/<nom>)
# agent1 est un cas particulier : /opt/agent, service "agent"
if name == "agent1":
default_path = "/opt/agent"
default_service = "agent"
else:
default_path = "/opt/{}".format(name)
default_service = name
return {
"install_path": agent.get("install_path", default_path),
"service_name": agent.get("service_name", default_service),
"git_branch" : agent.get("git_branch", "main"),
"ssh_host" : agent.get("ssh_host"),
"ssh_user" : agent.get("ssh_user", "root"),
}
def _handle_update_one(name: str) -> str:
from skills.agent_update import check_update
info = _get_agent_git_info(name)
return check_update(name, info["install_path"], info["git_branch"],
info["ssh_host"], info["ssh_user"])
def _handle_update_all() -> str:
from skills.agent_update import check_update
try:
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
except Exception:
return "Erreur lecture registre."
results = []
for name in registry:
info = _get_agent_git_info(name)
results.append(check_update(name, info["install_path"], info["git_branch"],
info["ssh_host"], info["ssh_user"]))
return "\n\n".join(results) if results else "Aucun agent dans le registre."
def _handle_upgrade_one(name: str) -> str:
from skills.agent_update import do_upgrade
info = _get_agent_git_info(name)
self_upgrade = (name == "agent1")
msg = do_upgrade(name, info["install_path"], info["service_name"],
info["git_branch"], self_upgrade=self_upgrade,
ssh_host=info["ssh_host"], ssh_user=info["ssh_user"])
if self_upgrade and "Redémarrage en cours" in msg:
# Envoyer le message XMPP avant le restart
if xmpp_bot:
xmpp_bot.send_message(mto=ADMIN_JID, mbody=msg, mtype='chat')
import subprocess
subprocess.Popen(["systemctl", "restart", info["service_name"]])
return None # Réponse déjà envoyée manuellement
return msg
def _handle_upgrade_all() -> str:
from skills.agent_update import do_upgrade
try:
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
except Exception:
return "Erreur lecture registre."
results = []
agent1_info = None
for name in registry:
info = _get_agent_git_info(name)
if name == "agent1":
agent1_info = (name, info) # traiter en dernier
continue
msg = do_upgrade(name, info["install_path"],
info["service_name"], info["git_branch"],
ssh_host=info["ssh_host"], ssh_user=info["ssh_user"])
results.append(msg)
summary = "\n\n".join(results) if results else "Aucun agent mis à jour."
if agent1_info:
name, info = agent1_info
pull_msg = do_upgrade(name, info["install_path"],
info["service_name"], info["git_branch"],
self_upgrade=True)
summary += "\n\n" + pull_msg
if xmpp_bot:
xmpp_bot.send_message(mto=ADMIN_JID, mbody=summary, mtype='chat')
import subprocess
subprocess.Popen(["systemctl", "restart", info.get("service_name", "agent")])
return None # Réponse déjà envoyée
return summary
# ── GESTION CONFIGS AVEC CONFIRMATION ────────────────────────────────────
def _handle_config_command(text: str) -> str | None:
"""Affiche ou propose une modification des fichiers de config."""
global PENDING_CONFIG
t = text.strip()
if t == "!reports":
try:
data = json.loads(REPORTS_FILE.read_text())
return "=== reports_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
except Exception as e:
return "Erreur lecture : {}".format(e)
if t == "!tasks":
try:
data = json.loads(TASKS_FILE.read_text())
return "=== tasks_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
except Exception as e:
return "Erreur lecture : {}".format(e)
if t == "!blackout":
try:
data = json.loads(BLACKOUT_FILE.read_text())
return "=== blackout_hours.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
except Exception as e:
return "Erreur lecture : {}".format(e)
return None
def _apply_pending_config() -> str:
global PENDING_CONFIG
if not PENDING_CONFIG:
return "Aucune modification en attente."
try:
file_key = PENDING_CONFIG["file"]
content = PENDING_CONFIG["content"]
desc = PENDING_CONFIG["description"]
if file_key == "reports_schedule":
REPORTS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False))
_reload_report_scheduler()
elif file_key == "tasks_schedule":
TASKS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False))
_reload_task_scheduler()
PENDING_CONFIG = None
return "Modification appliquée : {}".format(desc)
except Exception as e:
PENDING_CONFIG = None
return "Erreur lors de l'application : {}".format(e)
# ── RAPPORT JOURNALIER ────────────────────────────────────────────────────
def _solicit_report(agent_name: str):
"""Demande le rapport à un agent via MQTT control."""
print("[REPORT] Sollicitation rapport de {}".format(agent_name))
_send_control(agent_name, "report")
def _compile_and_send_daily_report():
"""Compile tous les rapports reçus et envoie à Sylvain."""
from skills.daily_report import compile_report
report_text = compile_report(daily_reports)
print("[REPORT] Rapport journalier compilé, envoi XMPP.")
if xmpp_bot:
xmpp_bot.send_message(mto=ADMIN_JID, mbody=report_text, mtype='chat')
def _reload_report_scheduler():
"""(Re)planifie les sollicitations de rapports selon reports_schedule.json."""
try:
data = json.loads(REPORTS_FILE.read_text())
agents = data.get("agents", {})
for name, conf in agents.items():
if not conf.get("enabled", True):
continue
t = conf["report_time"] # "HH:MM"
hour, minute = map(int, t.split(":"))
job_id = "report_{}".format(name)
try:
scheduler.remove_job(job_id)
except Exception:
pass
scheduler.add_job(_solicit_report, "cron", args=[name],
hour=hour, minute=minute, id=job_id,
replace_existing=True)
print("[SCHEDULER] Rapport {} planifié à {}".format(name, t))
# Rapport journalier compilé
daily_t = data.get("daily_report_time", "22:30")
if data.get("daily_report_enabled", True):
dh, dm = map(int, daily_t.split(":"))
try:
scheduler.remove_job("daily_report")
except Exception:
pass
scheduler.add_job(_compile_and_send_daily_report, "cron",
hour=dh, minute=dm, id="daily_report",
replace_existing=True)
print("[SCHEDULER] Rapport journalier planifié à {}".format(daily_t))
except Exception as e:
print("[SCHEDULER] Erreur reload report scheduler : {}".format(e))
def _reload_task_scheduler():
"""(Re)planifie les tâches selon tasks_schedule.json."""
try:
data = json.loads(TASKS_FILE.read_text())
tasks = data.get("tasks", [])
# Supprimer anciens jobs tasks_*
for job in scheduler.get_jobs():
if job.id.startswith("task_"):
scheduler.remove_job(job.id)
for i, t in enumerate(tasks):
if not t.get("enabled", True):
continue
agent = t["agent"]
task = t["task"]
cron = t["cron"] # ex: "daily 03:00" ou "every 6h"
job_id = "task_{}_{}".format(agent, i)
_schedule_task_job(job_id, agent, task, cron)
print("[SCHEDULER] Tâche planifiée [{} @ {}] → {}".format(agent, cron, task[:50]))
except Exception as e:
print("[SCHEDULER] Erreur reload task scheduler : {}".format(e))
def _schedule_task_job(job_id: str, agent: str, task: str, cron_expr: str):
"""Planifie une tâche via APScheduler selon l'expression cron."""
from skills.delegate import execute as delegate_exec
def run():
print("[SCHEDULER] Exécution tâche {}{}".format(agent, task[:60]))
delegate_exec("{} | {}".format(agent, task))
expr = cron_expr.strip()
if expr.startswith("daily "):
t = expr[6:].strip()
h, m = map(int, t.split(":"))
scheduler.add_job(run, "cron", hour=h, minute=m, id=job_id, replace_existing=True)
elif expr.startswith("every ") and expr.endswith("h"):
hours = int(expr[6:-1])
scheduler.add_job(run, "interval", hours=hours, id=job_id, replace_existing=True)
elif expr.startswith("every ") and expr.endswith("min"):
mins = int(expr[6:-3])
scheduler.add_job(run, "interval", minutes=mins, id=job_id, replace_existing=True)
elif expr.startswith("weekly "):
parts = expr[7:].split()
day_map = {"lun":"mon","mar":"tue","mer":"wed","jeu":"thu","ven":"fri","sam":"sat","dim":"sun"}
day = day_map.get(parts[0], parts[0])
h, m = map(int, parts[1].split(":"))
scheduler.add_job(run, "cron", day_of_week=day, hour=h, minute=m,
id=job_id, replace_existing=True)
# ── MQTT HANDLERS ─────────────────────────────────────────────────────────
def on_mqtt_message(client, userdata, msg):
raw = msg.payload.decode(errors="replace")
# Support JSON avec reply_to optionnel
reply_to = "agents/cli/outbox"
task = raw
try:
data = json.loads(raw)
data = json.loads(raw)
task = data.get("task", raw)
reply_to = data.get("reply_to", reply_to)
except json.JSONDecodeError:
@@ -120,17 +463,15 @@ def on_mqtt_message(client, userdata, msg):
mqtt_history = []
reply = ask_llm(task, history=mqtt_history)
mqtt_publish(reply_to, reply)
print("[MQTT] Réponse envoyée sur {}".format(reply_to))
def on_mqtt_error(client, userdata, msg):
"""Reçoit les erreurs des agents et notifie l'utilisateur via XMPP."""
try:
data = json.loads(msg.payload.decode(errors="replace"))
data = json.loads(msg.payload.decode(errors="replace"))
agent = data.get("agent", "?")
task = data.get("task", "?")
error = data.get("error", "?")
source = data.get("source", "?")
notif = "[ERREUR][{}] Agent : {}\nTâche : {}\nErreur : {}".format(
notif = "[ERREUR][{}] Agent : {}\nTâche : {}\nErreur : {}".format(
source.upper(), agent, task[:100], error[:300])
print(notif)
if xmpp_bot:
@@ -139,14 +480,12 @@ def on_mqtt_error(client, userdata, msg):
print("[MQTT] Erreur parsing notification : {}".format(e))
def on_mqtt_notification(client, userdata, msg):
"""Reçoit les notifications du scheduler."""
try:
data = json.loads(msg.payload.decode(errors="replace"))
status = data.get("status", "?")
agent = data.get("agent", "?")
task = data.get("task", "?")[:80]
ts = data.get("timestamp", "?")
# Notifier XMPP seulement en cas d'erreur ou de succès important
if status == "error" and xmpp_bot:
notif = "[PLANIF ERREUR] {} | {}{}\nStatut : {}".format(ts, agent, task, status)
xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat')
@@ -154,8 +493,6 @@ def on_mqtt_notification(client, userdata, msg):
print("[MQTT] Erreur parsing notification scheduler : {}".format(e))
def on_mqtt_status(client, userdata, msg):
"""Suit le statut en ligne/hors-ligne des agents (LWT + retain)."""
import time
try:
data = json.loads(msg.payload.decode(errors="replace"))
agent = data.get("agent", "?")
@@ -164,15 +501,13 @@ def on_mqtt_status(client, userdata, msg):
was_online = AGENTS_ONLINE.get(agent, {}).get("status") == "online"
AGENTS_ONLINE[agent] = {**data, "last_seen": time.time()}
# Sauvegarder pour la skill agents_online
AGENTS_ONLINE_FILE.write_text(
json.dumps(AGENTS_ONLINE, indent=2, ensure_ascii=False), encoding="utf-8")
# Notifier sylvain uniquement si le statut change
is_online = status == "online"
if is_online == was_online:
return
emoji = "[EN LIGNE]" if is_online else "[HORS LIGNE]"
emoji = "[EN LIGNE]" if is_online else "[HORS LIGNE]"
print("[STATUS] {}{}".format(agent, status))
if xmpp_bot:
xmpp_bot.send_message(mto=ADMIN_JID,
@@ -182,7 +517,6 @@ def on_mqtt_status(client, userdata, msg):
print("[MQTT] Erreur parsing status : {}".format(e))
def on_mqtt_register(client, userdata, msg):
"""Reçoit les déclarations de mise en ligne des agents et met à jour le registre."""
try:
data = json.loads(msg.payload.decode(errors="replace"))
agent = data.get("agent", "?")
@@ -191,38 +525,48 @@ def on_mqtt_register(client, userdata, msg):
speciality = data.get("speciality", "")
print("[REGISTER] {} en ligne (JID: {}, inbox: {})".format(agent, jid, mqtt_inbox))
# Mettre à jour agents_registry.json
registry_file = CONFIG_DIR / "agents_registry.json"
try:
registry = json.loads(registry_file.read_text(encoding="utf-8"))
except Exception:
registry = {}
is_new = agent not in registry
registry[agent] = {
# Préserver les champs existants (work_hours etc.) lors d'une mise à jour
existing = registry.get(agent, {})
existing.update({
"jid" : jid,
"mqtt_inbox" : mqtt_inbox,
"mqtt_outbox": "agents/agent1/inbox",
"speciality" : speciality,
}
})
registry[agent] = existing
registry_file.write_text(json.dumps(registry, indent=2, ensure_ascii=False), encoding="utf-8")
if is_new:
print("[REGISTER] {} ajouté au registre.".format(agent))
# Notifier sylvain via XMPP
if xmpp_bot:
status = "NOUVEAU" if is_new else "EN LIGNE"
notif = "[{}] {}\n JID : {}\n MQTT : {}".format(status, agent, jid, mqtt_inbox)
notif = "[{}] {}\n JID : {}\n MQTT : {}".format(status, agent, jid, mqtt_inbox)
if speciality:
notif += "\n Rôle : {}".format(speciality)
xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat')
except Exception as e:
print("[MQTT] Erreur parsing register : {}".format(e))
def on_mqtt_daily_report(client, userdata, msg):
"""Reçoit et stocke les rapports journaliers des agents."""
try:
data = json.loads(msg.payload.decode(errors="replace"))
agent = data.get("agent", "?")
daily_reports[agent] = data
print("[REPORT] Rapport reçu de {} : {} tâches, {} erreurs".format(
agent, data.get("tasks_today", 0), data.get("errors", 0)))
except Exception as e:
print("[MQTT] Erreur parsing daily_report : {}".format(e))
def start_mqtt_listener():
global mqtt_pub_client
mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id="agent1_pub")
mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_pub")
mqtt_pub_client.connect(MQTT_HOST, MQTT_PORT)
mqtt_pub_client.loop_start()
@@ -232,16 +576,18 @@ def start_mqtt_listener():
sub.message_callback_add("agents/scheduler/notifications", on_mqtt_notification)
sub.message_callback_add("agents/register", on_mqtt_register)
sub.message_callback_add("agents/status/+", on_mqtt_status)
sub.on_message = on_mqtt_message # fallback
sub.message_callback_add("agents/daily_report", on_mqtt_daily_report)
sub.on_message = on_mqtt_message
sub.connect(MQTT_HOST, MQTT_PORT)
sub.subscribe([
(MQTT_INBOX, 0),
("agents/errors", 0),
("agents/scheduler/notifications", 0),
("agents/register", 0),
("agents/status/+", 0),
(MQTT_INBOX, 0),
("agents/errors", 0),
("agents/scheduler/notifications", 0),
("agents/register", 0),
("agents/status/+", 0),
("agents/daily_report", 0),
])
print("[MQTT] Agent1 écoute sur {}, agents/errors, agents/status/+, agents/register".format(MQTT_INBOX))
print("[MQTT] Agent1 écoute sur {}, errors, status/+, register, daily_report".format(MQTT_INBOX))
sub.loop_forever()
# ── BOT XMPP ─────────────────────────────────────────────────────────────
@@ -259,6 +605,8 @@ class AgentBot(ClientXMPP):
self.send_message(mto=ADMIN_JID, mbody="Agent1 (orchestrateur) en ligne !", mtype='chat')
async def message(self, msg):
global SLEEP_MODE, PENDING_CONFIG
if msg['type'] not in ('chat', 'normal'):
return
if str(msg['from']).split('/')[0] != ADMIN_JID:
@@ -266,17 +614,56 @@ class AgentBot(ClientXMPP):
user_input = msg['body'].strip()
# ── Commandes !agent* (prioritaires, toujours traitées) ──────────
handled, agent_reply = _handle_agent_command(user_input)
if handled:
if agent_reply is not None:
self.send_message(mto=ADMIN_JID, mbody=agent_reply, mtype='chat')
return # None = réponse déjà envoyée manuellement (ex: self-upgrade)
# ── Mode veille : ignorer tout sauf commandes agent ───────────────
if SLEEP_MODE:
self.send_message(
mto=ADMIN_JID,
mbody="[VEILLE] Agent1 en veille. Envoyez !agentsON ou !agentON agent1 pour reprendre.",
mtype='chat')
return
# ── Commandes config ──────────────────────────────────────────────
config_reply = _handle_config_command(user_input)
if config_reply is not None:
self.send_message(mto=ADMIN_JID, mbody=config_reply, mtype='chat')
return
# ── Confirmation modification config en attente ───────────────────
if PENDING_CONFIG is not None:
if user_input.lower() in ("oui", "yes", "o", "confirme", "ok"):
reply = _apply_pending_config()
else:
PENDING_CONFIG = None
reply = "Modification annulée."
self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat')
return
# ── Reset conversation ─────────────────────────────────────────────
if user_input == "!reset":
conversation_history.clear()
self.send_message(mto=ADMIN_JID, mbody="Conversation reinitialisee.", mtype='chat')
return
# ── Traitement LLM normal ─────────────────────────────────────────
loop = asyncio.get_event_loop()
reply = await loop.run_in_executor(None, ask_llm, user_input)
self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat')
# ── MAIN ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Démarrer le scheduler
scheduler.start()
_reload_report_scheduler()
_reload_task_scheduler()
# Démarrer MQTT dans un thread séparé
mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True)
mqtt_thread.start()
+30
View File
@@ -0,0 +1,30 @@
{
"agent2_deploy": {
"status": "online",
"agent": "agent2_deploy",
"jid": "agent2_deploy@xmpp.ovh",
"mqtt_inbox": "agents/agent2_deploy/inbox",
"last_seen": 1772919416.397681
},
"agent2_ansible": {
"status": "online",
"agent": "agent2_ansible",
"jid": "agent2_ansible@xmpp.ovh",
"mqtt_inbox": "agents/agent2_ansible/inbox",
"last_seen": 1772919417.011551
},
"agent2_debian13": {
"status": "online",
"agent": "agent2_debian13",
"jid": "agent2_debian13@xmpp.ovh",
"mqtt_inbox": "agents/agent2_debian13/inbox",
"last_seen": 1772919416.9976034
},
"agent2_test": {
"status": "online",
"agent": "agent2_test",
"jid": "agent2_test@xmpp.ovh",
"mqtt_inbox": "agents/agent2_test/inbox",
"last_seen": 1772919358.6175494
}
}
+65 -5
View File
@@ -4,26 +4,86 @@
"mqtt_inbox": "agents/agent2_debian13/inbox",
"mqtt_outbox": "agents/agent1/inbox",
"speciality": "Administration Debian : apt, dpkg, systemd, conteneurs LXC/Docker, KVM, réseau, sécurité système",
"work_hours": { "start": "07:00", "end": "23:00", "days": ["mon","tue","wed","thu","fri","sat","sun"], "enabled": true }
"work_hours": {
"start": "07:00",
"end": "23:00",
"days": [
"mon",
"tue",
"wed",
"thu",
"fri",
"sat",
"sun"
],
"enabled": true
},
"install_path": "/opt/agent2_debian13",
"service_name": "agent2_debian13",
"git_branch": "main"
},
"agent2_ansible": {
"jid": "agent2_ansible@xmpp.ovh",
"mqtt_inbox": "agents/agent2_ansible/inbox",
"mqtt_outbox": "agents/agent1/inbox",
"speciality": "Automatisation infrastructure via Ansible : playbooks, commandes ad-hoc, déploiement multi-hôtes, gestion de configuration sur le réseau local",
"work_hours": { "start": "07:00", "end": "23:00", "days": ["mon","tue","wed","thu","fri","sat","sun"], "enabled": true }
"work_hours": {
"start": "07:00",
"end": "23:00",
"days": [
"mon",
"tue",
"wed",
"thu",
"fri",
"sat",
"sun"
],
"enabled": true
},
"install_path": "/opt/agent2_ansible",
"service_name": "agent2_ansible",
"git_branch": "main"
},
"agent2_deploy": {
"jid": "agent2_deploy@xmpp.ovh",
"mqtt_inbox": "agents/agent2_deploy/inbox",
"mqtt_outbox": "agents/agent1/inbox",
"speciality": "Déploiement d'agents : installe et configure d'autres agents sur des machines distantes ou locales via SSH",
"work_hours": { "start": "08:00", "end": "20:00", "days": ["mon","tue","wed","thu","fri"], "enabled": true }
"work_hours": {
"start": "08:00",
"end": "20:00",
"days": [
"mon",
"tue",
"wed",
"thu",
"fri"
],
"enabled": true
},
"install_path": "/opt/agent2_deploy",
"service_name": "agent2_deploy",
"git_branch": "main"
},
"agent2_test": {
"jid": "agent2_test@xmpp.ovh",
"mqtt_inbox": "agents/agent2_test/inbox",
"mqtt_outbox": "agents/agent1/inbox",
"speciality": "Spécialiste Debian : apt, systemd, conteneurs LXC/Docker, KVM, réseau, sécurité"
"speciality": "Administration Debian : apt, dpkg, systemd, conteneurs LXC/Docker, KVM, réseau, sécurité système",
"install_path": "/opt/agent2_test",
"service_name": "agent2_test",
"git_branch": "main",
"ssh_host": "192.168.7.13",
"ssh_user": "root"
},
"agent1": {
"jid": "agent1@xmpp.ovh",
"mqtt_inbox": "agents/agent1/inbox",
"mqtt_outbox": "agents/agent1/inbox",
"speciality": "Orchestrateur principal",
"install_path": "/opt/agent",
"service_name": "agent",
"git_branch": "main"
}
}
}
+159
View File
@@ -0,0 +1,159 @@
"""
Utilitaire : vérification et application des mises à jour git pour les agents.
Fonctions appelées directement depuis agent1.py (pas de trigger LLM).
check_update(name, install_path, branch, ssh_host, ssh_user)
do_upgrade(name, install_path, service, branch, ssh_host, ssh_user, self_upgrade)
Si ssh_host est défini, les commandes sont exécutées via SSH sur la machine distante.
"""
import subprocess
import shlex
from pathlib import Path
def _run_local(cmd: str, cwd: str = None, timeout: int = 30) -> tuple:
"""Lance une commande locale, retourne (stdout, stderr, returncode)."""
try:
result = subprocess.run(
shlex.split(cmd),
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout
)
return result.stdout.strip(), result.stderr.strip(), result.returncode
except subprocess.TimeoutExpired:
return "", "Timeout ({} s)".format(timeout), -1
except Exception as e:
return "", str(e), -1
def _run_ssh(cmd: str, ssh_host: str, ssh_user: str = "root",
cwd: str = None, timeout: int = 30) -> tuple:
"""Lance une commande via SSH, retourne (stdout, stderr, returncode)."""
if cwd:
cmd = "cd {} && {}".format(cwd, cmd)
ssh_cmd = "sshpass -p 'Matador3721' ssh -o StrictHostKeyChecking=no -o ConnectTimeout=10 {}@{} {}".format(
ssh_user, ssh_host, shlex.quote(cmd))
try:
result = subprocess.run(
shlex.split(ssh_cmd),
capture_output=True,
text=True,
timeout=timeout
)
return result.stdout.strip(), result.stderr.strip(), result.returncode
except subprocess.TimeoutExpired:
return "", "Timeout SSH ({} s)".format(timeout), -1
except Exception as e:
return "", str(e), -1
def _run(cmd: str, cwd: str = None, timeout: int = 30,
ssh_host: str = None, ssh_user: str = "root") -> tuple:
"""Dispatche local ou SSH selon ssh_host."""
if ssh_host:
return _run_ssh(cmd, ssh_host, ssh_user, cwd=cwd, timeout=timeout)
return _run_local(cmd, cwd=cwd, timeout=timeout)
def check_update(agent_name: str, install_path: str, branch: str = "main",
ssh_host: str = None, ssh_user: str = "root") -> str:
"""
Vérifie si une mise à jour est disponible sur le dépôt distant.
Effectue un git fetch puis compare HEAD avec origin/<branch>.
"""
remote = " [{}]".format(ssh_host) if ssh_host else ""
# Vérifier que le répertoire existe
if ssh_host:
out, _, rc = _run("test -d {}".format(install_path),
ssh_host=ssh_host, ssh_user=ssh_user)
if rc != 0:
return "[{}{}] Répertoire introuvable : {}".format(
agent_name, remote, install_path)
else:
if not Path(install_path).is_dir():
return "[{}] Répertoire introuvable : {}".format(agent_name, install_path)
# git fetch
out, err, rc = _run("git fetch origin {}".format(branch),
cwd=install_path, timeout=20,
ssh_host=ssh_host, ssh_user=ssh_user)
if rc != 0:
return "[{}{}] Erreur git fetch : {}".format(agent_name, remote, err or out)
# Compter les commits disponibles
out, err, rc = _run(
"git log HEAD..origin/{} --oneline".format(branch),
cwd=install_path, ssh_host=ssh_host, ssh_user=ssh_user)
if rc != 0:
return "[{}{}] Erreur git log : {}".format(agent_name, remote, err or out)
commits = [l for l in out.splitlines() if l.strip()]
if not commits:
return "[{}{}] Déjà à jour.".format(agent_name, remote)
lines = ["[{}{}] {} commit(s) disponible(s) :".format(
agent_name, remote, len(commits))]
for c in commits[:10]:
lines.append(" {}".format(c))
if len(commits) > 10:
lines.append(" ... et {} autre(s)".format(len(commits) - 10))
lines.append("Lancez !agentUPGRADE {} pour appliquer.".format(agent_name))
return "\n".join(lines)
def do_upgrade(agent_name: str, install_path: str,
service_name: str, branch: str = "main",
self_upgrade: bool = False,
ssh_host: str = None, ssh_user: str = "root") -> str:
"""
Applique la mise à jour : git pull + systemctl restart.
Pour agent1 (self_upgrade=True), retourne le message AVANT le restart.
"""
remote = " [{}]".format(ssh_host) if ssh_host else ""
# Vérifier que le répertoire existe
if ssh_host:
_, _, rc = _run("test -d {}".format(install_path),
ssh_host=ssh_host, ssh_user=ssh_user)
if rc != 0:
return "[{}{}] Répertoire introuvable : {}".format(
agent_name, remote, install_path)
else:
if not Path(install_path).is_dir():
return "[{}] Répertoire introuvable : {}".format(agent_name, install_path)
# git pull
out, err, rc = _run(
"git pull origin {}".format(branch),
cwd=install_path, timeout=60,
ssh_host=ssh_host, ssh_user=ssh_user)
if rc != 0:
return "[{}{}] Erreur git pull : {}".format(agent_name, remote, err or out)
pull_msg = out or "Déjà à jour."
if self_upgrade:
return "[{}] {}\nRedémarrage en cours...".format(agent_name, pull_msg)
# systemctl restart
_, err, rc = _run("systemctl restart {}".format(service_name),
timeout=15, ssh_host=ssh_host, ssh_user=ssh_user)
if rc != 0:
return "[{}{}] Pull OK mais restart échoué : {}".format(
agent_name, remote, err)
# Vérifier que le service est bien remonté
out, _, _ = _run("systemctl is-active {}".format(service_name),
timeout=5, ssh_host=ssh_host, ssh_user=ssh_user)
state = out.strip()
if state == "active":
return "[{}{}] Mise à jour appliquée. Service actif.\n{}".format(
agent_name, remote, pull_msg)
else:
return "[{}{}] Pull OK, service état : {}. Vérifiez les logs.".format(
agent_name, remote, state)
+102
View File
@@ -0,0 +1,102 @@
"""
Skill : DAILY_REPORT
Compile et formate le rapport journalier de tous les agents.
Commande :
DAILY_REPORT: → rapport du jour
DAILY_REPORT: <agent> → rapport d'un agent spécifique
"""
from datetime import datetime, timedelta
SKILL_NAME = "daily_report"
TRIGGER = "DAILY_REPORT:"
def _fmt_uptime(seconds: int) -> str:
if seconds < 60:
return "{}s".format(seconds)
elif seconds < 3600:
return "{}m{}s".format(seconds // 60, seconds % 60)
else:
h = seconds // 3600
m = (seconds % 3600) // 60
return "{}h{}m".format(h, m)
def compile_report(daily_reports: dict, agent_filter: str = None) -> str:
"""
Compile les rapports reçus des agents en un texte formaté.
daily_reports : {agent_name: {tasks_today, success, errors, avg_duration_s,
last_error, pending, uptime_s, paused, timestamp}}
"""
now = datetime.now().strftime("%Y-%m-%d %H:%M")
lines = ["=== RAPPORT JOURNALIER — {} ===".format(now), ""]
if not daily_reports:
lines.append("Aucun rapport reçu des agents.")
return "\n".join(lines)
agents = {k: v for k, v in daily_reports.items()
if agent_filter is None or k == agent_filter}
if not agents:
return "Aucun rapport pour l'agent '{}'.".format(agent_filter)
total_tasks = 0
total_errors = 0
for agent_name, data in sorted(agents.items()):
tasks = data.get("tasks_today", 0)
success = data.get("success", 0)
errors = data.get("errors", 0)
avg_dur = data.get("avg_duration_s", 0)
pending = data.get("pending", 0)
uptime = data.get("uptime_s", 0)
paused = data.get("paused", False)
last_err= data.get("last_error")
ts = data.get("timestamp", "?")
total_tasks += tasks
total_errors += errors
state = "[EN PAUSE]" if paused else "[ACTIF]"
lines.append("── {} {} ──".format(agent_name, state))
lines.append(" Rapport : {}".format(ts))
lines.append(" Uptime : {}".format(_fmt_uptime(uptime)))
lines.append(" Tâches : {} total, {} succès, {} erreurs".format(tasks, success, errors))
if pending:
lines.append(" En attente : {}".format(pending))
if avg_dur:
lines.append(" Durée moy : {:.1f}s".format(avg_dur))
if last_err:
lines.append(" Dernière erreur : {}".format(str(last_err)[:150]))
lines.append("")
if len(agents) > 1:
lines.append("── TOTAL ──")
lines.append(" Tâches : {} | Erreurs : {}".format(total_tasks, total_errors))
if total_tasks > 0:
rate = round((total_tasks - total_errors) / total_tasks * 100, 1)
lines.append(" Taux de succès : {}%".format(rate))
return "\n".join(lines)
def execute(args: str) -> str:
"""
Appelé par le LLM via DAILY_REPORT:.
Lit les rapports stockés dans agent1.daily_reports.
"""
import sys
# Accéder aux rapports stockés dans agent1 (module principal)
agent_filter = args.strip() or None
daily_reports = {}
try:
# agent1.py est le __main__, on récupère ses daily_reports via sys.modules
main_mod = sys.modules.get("__main__")
if main_mod and hasattr(main_mod, "daily_reports"):
daily_reports = main_mod.daily_reports
except Exception:
pass
return compile_report(daily_reports, agent_filter)