Compare commits
8 Commits
a95cb0127a
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 0fe1ece68d | |||
| 1565b145dc | |||
| 3575b391b6 | |||
| 1c951f46f1 | |||
| 37e881bc39 | |||
| d765a8457a | |||
| 60a216d565 | |||
| 9e3aa29d74 |
@@ -2,38 +2,221 @@
|
||||
|
||||
Agent principal du réseau. Il reçoit les instructions de l'utilisateur (via XMPP ou CLI MQTT), les analyse avec un LLM, et délègue les tâches aux agents spécialisés.
|
||||
|
||||
## Rôle
|
||||
---
|
||||
|
||||
- Point d'entrée unique pour l'utilisateur (`sylvain@xmpp.ovh`)
|
||||
- Délègue aux agents spécialisés via MQTT (`DELEGATE: <agent> | <tâche>`)
|
||||
- Planifie des tâches récurrentes (`SCHEDULE:`)
|
||||
- Centralise les rapports et erreurs des agents
|
||||
- Maintient un registre des agents disponibles
|
||||
## Commandes XMPP (sylvain@xmpp.ovh → agent1)
|
||||
|
||||
## Skills disponibles
|
||||
### Contrôle des agents
|
||||
|
||||
| Trigger | Description |
|
||||
| Commande | Effet |
|
||||
|---|---|
|
||||
| `SEARCH: <requête>` | Recherche DuckDuckGo |
|
||||
| `READ: <url>` | Lecture de page web |
|
||||
| `REMEMBER: <clé> \| <valeur>` | Mémoire SQLite |
|
||||
| `RECALL: <clé>` | Récupération mémoire |
|
||||
| `MQTT_PUBLISH: <topic> \| <msg>` | Publication MQTT |
|
||||
| `DELEGATE: <agent> \| <tâche>` | Délégation à un agent spécialisé |
|
||||
| `PLAN: <agent>\|<tâche> ;; <agent>\|<tâche>` | Plan multi-étapes |
|
||||
| `SCHEDULE: <cron> \| <agent> \| <tâche>` | Planification |
|
||||
| `PLAN_LIST:` | Lister les tâches planifiées |
|
||||
| `PLAN_CANCEL: <id>` | Annuler une planification |
|
||||
| `REPORT:` | Rapport d'exécutions |
|
||||
| `REPORT_ERRORS:` | Rapport des erreurs |
|
||||
| `!agentsOFF` | Agent1 en veille + pause de tous les agents |
|
||||
| `!agentsON` | Agent1 actif + reprise de tous les agents |
|
||||
| `!agentOFF <nom>` | Pause d'un agent spécifique (ex: `!agentOFF agent2_debian13`) |
|
||||
| `!agentON <nom>` | Reprise d'un agent spécifique |
|
||||
| `!agentOFF agent1` | Agent1 en veille uniquement (les autres agents continuent) |
|
||||
| `!agentON agent1` | Agent1 sort de veille |
|
||||
|
||||
> En mode veille, agent1 reste connecté XMPP et répond uniquement aux commandes `!agentON`.
|
||||
|
||||
### Mises à jour git
|
||||
|
||||
| Commande | Effet |
|
||||
|---|---|
|
||||
| `!agentUPDATE <nom>` | Vérifie si une mise à jour est disponible sur le dépôt git de l'agent |
|
||||
| `!agentsUPDATE` | Vérifie les dépôts de tous les agents enregistrés |
|
||||
| `!agentUPGRADE <nom>` | `git pull` + `systemctl restart` de l'agent |
|
||||
| `!agentsUPGRADE` | `git pull` + restart de tous les agents (agent1 en dernier) |
|
||||
|
||||
> `!agentUPGRADE agent1` redémarre agent1 lui-même via systemd. La réponse XMPP est envoyée avant le redémarrage.
|
||||
|
||||
### Affichage des configurations
|
||||
|
||||
| Commande | Effet |
|
||||
|---|---|
|
||||
| `!reports` | Afficher `reports_schedule.json` (horaires des rapports) |
|
||||
| `!tasks` | Afficher `tasks_schedule.json` (tâches planifiées) |
|
||||
| `!blackout` | Afficher `blackout_hours.json` (plages d'inactivité) |
|
||||
|
||||
### Divers
|
||||
|
||||
| Commande | Effet |
|
||||
|---|---|
|
||||
| `!reset` | Réinitialiser l'historique de conversation |
|
||||
|
||||
---
|
||||
|
||||
## Skills LLM (utilisables en XMPP ou CLI)
|
||||
|
||||
Ces commandes sont générées par le LLM en réponse à une instruction en langage naturel.
|
||||
|
||||
### Délégation et planification
|
||||
|
||||
| Trigger | Syntaxe | Description |
|
||||
|---|---|---|
|
||||
| `DELEGATE:` | `DELEGATE: <agent> \| <tâche>` | Délègue une tâche à un agent (avec vérif. plage horaire + blackout) |
|
||||
| `PLAN:` | `PLAN: <agent>\|<tâche> ;; <agent>\|<tâche>` | Exécute plusieurs tâches en séquence |
|
||||
| `SCHEDULE:` | `SCHEDULE: <cron> \| <agent> \| <tâche>` | Planifie une tâche récurrente |
|
||||
| `PLAN_LIST:` | `PLAN_LIST:` | Lister les tâches planifiées |
|
||||
| `PLAN_CANCEL:` | `PLAN_CANCEL: <id>` | Annuler une planification |
|
||||
|
||||
**Expressions cron supportées :**
|
||||
```
|
||||
daily 03:00 → tous les jours à 03h00
|
||||
every 6h → toutes les 6 heures
|
||||
every 30min → toutes les 30 minutes
|
||||
weekly lun 08:00 → chaque lundi à 08h00
|
||||
```
|
||||
|
||||
### Rapports
|
||||
|
||||
| Trigger | Syntaxe | Description |
|
||||
|---|---|---|
|
||||
| `REPORT:` | `REPORT: [agent]` | Rapport des 20 dernières exécutions |
|
||||
| `REPORT_ERRORS:` | `REPORT_ERRORS: [agent]` | Rapport des 20 dernières erreurs |
|
||||
| `DAILY_REPORT:` | `DAILY_REPORT: [agent]` | Rapport journalier compilé (stats de tous les agents) |
|
||||
|
||||
### Mémoire et recherche
|
||||
|
||||
| Trigger | Syntaxe | Description |
|
||||
|---|---|---|
|
||||
| `SEARCH:` | `SEARCH: <requête>` | Recherche DuckDuckGo |
|
||||
| `READ:` | `READ: <url>` | Lecture de page web |
|
||||
| `REMEMBER:` | `REMEMBER: <clé> \| <valeur>` | Mémoriser une information (SQLite) |
|
||||
| `RECALL:` | `RECALL: <clé>` | Récupérer une information mémorisée |
|
||||
|
||||
### MQTT direct
|
||||
|
||||
| Trigger | Syntaxe | Description |
|
||||
|---|---|---|
|
||||
| `MQTT_PUBLISH:` | `MQTT_PUBLISH: <topic> \| <message>` | Publier un message MQTT |
|
||||
| `MQTT_SUBSCRIBE:` | `MQTT_SUBSCRIBE: <topic>` | Écouter un topic MQTT |
|
||||
|
||||
---
|
||||
|
||||
## CLI (`cli.py`)
|
||||
|
||||
```bash
|
||||
python3 /opt/agent/cli.py # parler à agent1
|
||||
python3 /opt/agent/cli.py agent2_debian13 # accès direct à un agent
|
||||
python3 /opt/agent/cli.py --plans # voir les planifications
|
||||
```
|
||||
|
||||
**Commandes dans la CLI :**
|
||||
|
||||
| Commande | Effet |
|
||||
|---|---|
|
||||
| `/reset` | Réinitialiser la conversation |
|
||||
| `/plans` | Lister les tâches planifiées |
|
||||
| `/report` | Rapport des dernières exécutions |
|
||||
| `/errors` | Rapport des erreurs |
|
||||
| `/agent <nom>` | Basculer vers un autre agent |
|
||||
| `/quit` | Quitter la CLI |
|
||||
|
||||
---
|
||||
|
||||
## Fichiers de configuration
|
||||
|
||||
| Fichier | Rôle | Modifié par |
|
||||
|---|---|---|
|
||||
| `config/config.json` | Paramètres XMPP, MQTT, Ollama | Utilisateur |
|
||||
| `config/agents_registry.json` | Registre des agents (auto-mis à jour) | Agent1 au démarrage des agents |
|
||||
| `config/blackout_hours.json` | Plages horaires d'inactivité totale | **Utilisateur directement** |
|
||||
| `config/reports_schedule.json` | Horaires de sollicitation des rapports | Agent1 (avec confirmation) |
|
||||
| `config/tasks_schedule.json` | Tâches planifiées par agent | Agent1 (avec confirmation) |
|
||||
|
||||
### `blackout_hours.json`
|
||||
Aucun agent ne travaille pendant ces plages, quelle que soit la planification :
|
||||
```json
|
||||
[
|
||||
{ "start": "02:00", "end": "05:00", "label": "maintenance nuit", "enabled": true }
|
||||
]
|
||||
```
|
||||
|
||||
### `reports_schedule.json`
|
||||
```json
|
||||
{
|
||||
"agents": {
|
||||
"agent2_debian13": { "report_time": "22:00", "enabled": true },
|
||||
"agent2_ansible": { "report_time": "22:05", "enabled": true },
|
||||
"agent2_deploy": { "report_time": "22:10", "enabled": true }
|
||||
},
|
||||
"daily_report_time": "22:30",
|
||||
"daily_report_enabled": true
|
||||
}
|
||||
```
|
||||
|
||||
### `tasks_schedule.json`
|
||||
```json
|
||||
{
|
||||
"tasks": [
|
||||
{
|
||||
"agent": "agent2_debian13",
|
||||
"task": "apt update && apt upgrade -y",
|
||||
"cron": "weekly lun 03:00",
|
||||
"enabled": true
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### `agents_registry.json` — champ `work_hours`
|
||||
Plage horaire de travail par agent (vérifiée avant chaque DELEGATE) :
|
||||
```json
|
||||
"work_hours": {
|
||||
"start": "07:00",
|
||||
"end": "23:00",
|
||||
"days": ["mon","tue","wed","thu","fri","sat","sun"],
|
||||
"enabled": true
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Topics MQTT
|
||||
|
||||
| Topic | Direction | Rôle |
|
||||
|---|---|---|
|
||||
| `agents/agent1/inbox` | CLI / agents → agent1 | Tâches pour agent1 |
|
||||
| `agents/<name>/inbox` | Agent1 → agent2_* | Tâches pour les agents |
|
||||
| `agents/<name>/control` | Agent1 → agent2_* | Pause / Resume / Report |
|
||||
| `agents/daily_report` | Agent2_* → agent1 | Rapports journaliers |
|
||||
| `agents/errors` | Agent2_* → agent1 | Erreurs (notif XMPP) |
|
||||
| `agents/register` | Agent2_* → agent1 | Enregistrement au démarrage |
|
||||
| `agents/status/<name>` | Agent2_* → agent1 | Statut en ligne / hors ligne (LWT) |
|
||||
| `agents/cli/outbox` | Agent1 → CLI | Réponses vers la CLI |
|
||||
| `agents/scheduler/notifications` | Scheduler → agent1 | Notifications planificateur |
|
||||
|
||||
---
|
||||
|
||||
## Lancer les agents
|
||||
|
||||
```bash
|
||||
# Agent1
|
||||
python3 /opt/agent/agent1.py > /tmp/agent1.log 2>&1 &
|
||||
|
||||
# Agents spécialisés
|
||||
python3 /opt/agent2_debian13/agent2_debian13.py > /tmp/agent2_debian13.log 2>&1 &
|
||||
python3 /opt/agent2_ansible/agent2_ansible.py > /tmp/agent2_ansible.log 2>&1 &
|
||||
python3 /opt/agent2_deploy/agent2_deploy.py > /tmp/agent2_deploy.log 2>&1 &
|
||||
|
||||
# Via systemd
|
||||
systemctl start agent agent2_debian13 agent2_ansible agent2_deploy
|
||||
```
|
||||
|
||||
## Logs
|
||||
|
||||
```bash
|
||||
journalctl -u agent -f
|
||||
journalctl -u agent2_debian13 -f
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Déploiement manuel
|
||||
|
||||
```bash
|
||||
# Prérequis
|
||||
apt-get install -y python3 python3-pip python3-venv git mosquitto
|
||||
|
||||
# Cloner et installer
|
||||
git clone https://git.piaf.im/sylvain/agent1.git /opt/agent
|
||||
cd /opt/agent
|
||||
python3 -m venv venv
|
||||
@@ -44,35 +227,5 @@ cp config/config.json.example config/config.json # adapter les valeurs
|
||||
|
||||
# Service systemd
|
||||
cp agent.service /etc/systemd/system/
|
||||
systemctl daemon-reload
|
||||
systemctl enable agent
|
||||
systemctl start agent
|
||||
```
|
||||
|
||||
## Configuration (`config/config.json`)
|
||||
|
||||
```json
|
||||
{
|
||||
"ollama_url" : "http://<ip_ollama>:11434/api/chat",
|
||||
"model" : "qwen3:8b",
|
||||
"xmpp_jid" : "agent1@xmpp.ovh",
|
||||
"xmpp_pass" : "<mot_de_passe>",
|
||||
"admin_jid" : "sylvain@xmpp.ovh",
|
||||
"mqtt_host" : "localhost",
|
||||
"mqtt_port" : 1883
|
||||
}
|
||||
```
|
||||
|
||||
## CLI
|
||||
|
||||
```bash
|
||||
python3 /opt/agent/cli.py # parler à agent1
|
||||
python3 /opt/agent/cli.py agent2_debian13 # accès direct à un agent
|
||||
python3 /opt/agent/cli.py --plans # voir les planifications
|
||||
```
|
||||
|
||||
## Logs
|
||||
|
||||
```bash
|
||||
journalctl -u agent -f
|
||||
systemctl daemon-reload && systemctl enable agent && systemctl start agent
|
||||
```
|
||||
|
||||
@@ -3,81 +3,61 @@
|
||||
## 1. Fichiers de configuration (dans `/opt/agent/config/`)
|
||||
|
||||
### `reports_schedule.json` _(géré par agent1 avec confirmation utilisateur)_
|
||||
- [ ] Créer le fichier avec horaires de sollicitation des rapports par agent
|
||||
- [ ] Agent1 : commande XMPP pour modifier ce fichier (confirmation avant écriture)
|
||||
- [ ] Agent1 : planifier les demandes de rapport selon ce fichier (APScheduler)
|
||||
- [ ] Chaque agent2_* : répondre à la demande de rapport avec ses stats locales
|
||||
- [x] Créer le fichier avec horaires de sollicitation des rapports par agent
|
||||
- [x] Agent1 : commande !reports pour afficher, PENDING_CONFIG pour confirmation modification
|
||||
- [x] Agent1 : planifier les demandes de rapport selon ce fichier (APScheduler)
|
||||
- [x] Chaque agent2_* : répondre à la demande de rapport avec ses stats locales
|
||||
|
||||
### `tasks_schedule.json` _(géré par agent1 avec confirmation utilisateur)_
|
||||
- [ ] Créer le fichier avec les tâches planifiées par agent
|
||||
- [ ] Agent1 : commande XMPP pour ajouter/modifier/supprimer une tâche (confirmation avant écriture)
|
||||
- [ ] Agent1 : charger ce fichier au démarrage et planifier les tâches via APScheduler
|
||||
- [ ] Remplacer / compléter l'actuel `SCHEDULE:` skill
|
||||
- [x] Créer le fichier avec les tâches planifiées par agent
|
||||
- [x] Agent1 : !tasks pour afficher, PENDING_CONFIG pour modification avec confirmation
|
||||
- [x] Agent1 : charger ce fichier au démarrage et planifier les tâches via APScheduler
|
||||
|
||||
### `blackout_hours.json` _(édité directement par l'utilisateur)_
|
||||
- [ ] Créer le fichier avec plage(s) horaire(s) de blackout (aucun agent ne travaille)
|
||||
- [ ] Format : `[{"start": "02:00", "end": "05:00", "label": "maintenance"}]`
|
||||
- [ ] Agent1 : vérifier ce fichier avant chaque délégation de tâche
|
||||
- [ ] Agent1 : vérifier ce fichier avant chaque tâche planifiée
|
||||
- [x] Créer le fichier avec plage(s) horaire(s) de blackout (aucun agent ne travaille)
|
||||
- [x] Agent1 : vérifier ce fichier avant chaque délégation de tâche (delegate.py)
|
||||
|
||||
---
|
||||
|
||||
## 2. Plages horaires par agent (`work_hours` dans `agents_registry.json`)
|
||||
- [ ] Ajouter `work_hours: {start, end, days}` pour chaque agent dans `agents_registry.json`
|
||||
- [ ] Modifier `skills/delegate.py` : vérifier plage horaire avant d'envoyer la tâche
|
||||
- [ ] Si hors plage → retourner message d'indisponibilité (pas d'exécution)
|
||||
- [ ] Prendre en compte le blackout_hours.json également dans delegate.py
|
||||
- [x] Ajouter `work_hours: {start, end, days}` pour chaque agent dans `agents_registry.json`
|
||||
- [x] Modifier `skills/delegate.py` : vérifier plage horaire avant d'envoyer la tâche
|
||||
- [x] Si hors plage → retourner message d'indisponibilité (pas d'exécution)
|
||||
- [x] Prendre en compte le blackout_hours.json également dans delegate.py
|
||||
|
||||
---
|
||||
|
||||
## 3. File d'attente locale par agent (SQLite)
|
||||
|
||||
### Pour chaque agent2_* (`/opt/agent2_*/queue.db`)
|
||||
- [ ] Créer table `tasks_queue(id, received_at, started_at, completed_at, task, status, result, duration_s)`
|
||||
- [ ] `on_mqtt_message()` : sauvegarder immédiatement le message en base (status: `pending`)
|
||||
- [ ] Worker FIFO dans un thread séparé : traiter les tâches une par une
|
||||
- [ ] Si paused : worker s'arrête, tâches s'accumulent en base
|
||||
- [ ] Au resume : worker reprend depuis les tâches `pending`
|
||||
- [ ] MQTT : passer à `clean_session=False` + `QoS=1` pour ne pas perdre les messages offline
|
||||
- [x] Créer table `tasks_queue(id, received_at, started_at, completed_at, task, status, result, duration_s)`
|
||||
- [x] `on_mqtt_message()` : sauvegarder immédiatement le message en base (status: `pending`)
|
||||
- [x] Worker FIFO dans un thread séparé : traiter les tâches une par une
|
||||
- [x] Si paused : worker s'arrête, tâches s'accumulent en base
|
||||
- [x] Au resume : worker reprend depuis les tâches `pending`
|
||||
- [x] MQTT : passer à `clean_session=False` + `QoS=1` pour ne pas perdre les messages offline
|
||||
|
||||
---
|
||||
|
||||
## 4. Mode pause / veille par agent
|
||||
|
||||
### Nouveau topic MQTT : `agents/<name>/control`
|
||||
- [ ] Chaque agent2_* : s'abonner à `agents/<name>/control`
|
||||
- [ ] Payload `{"command": "pause"}` → flag `self.paused = True`, stopper le worker
|
||||
- [ ] Payload `{"command": "resume"}` → flag `self.paused = False`, relancer le worker
|
||||
- [ ] Agent1 en veille : rester connecté XMPP, n'accepter que `!agentsON` / `!agentON agent1`
|
||||
### Topic MQTT : `agents/<name>/control`
|
||||
- [x] Chaque agent2_* : s'abonner à `agents/<name>/control`
|
||||
- [x] pause → worker stoppé, resume → worker relancé
|
||||
- [x] Agent1 en veille : n'accepte que `!agentsON` / `!agentON agent1`
|
||||
|
||||
### Commandes XMPP (sylvain → agent1)
|
||||
- [ ] `!agentOFF <nom>` → envoyer pause à l'agent ciblé
|
||||
- [ ] `!agentON <nom>` → envoyer resume à l'agent ciblé
|
||||
- [ ] `!agentsOFF` → agent1 en veille + pause à tous les agent2_*
|
||||
- [ ] `!agentsON` → agent1 sort de veille + resume à tous les agent2_*
|
||||
- [ ] `!agentOFF agent1` → agent1 en veille uniquement
|
||||
- [ ] Mettre à jour `agents_online.json` à chaque changement d'état
|
||||
- [x] `!agentOFF <nom>` / `!agentON <nom>`
|
||||
- [x] `!agentsOFF` / `!agentsON`
|
||||
- [x] `!agentOFF agent1` / `!agentON agent1` (veille agent1 uniquement)
|
||||
|
||||
---
|
||||
|
||||
## 5. Rapports journaliers
|
||||
|
||||
### Chaque agent2_* (stats locales)
|
||||
- [ ] Tracker en mémoire : `tasks_total`, `tasks_success`, `tasks_error`, `avg_duration_s`, `uptime_s`, `last_error`
|
||||
- [ ] Alimenter ces stats depuis la `queue.db`
|
||||
- [ ] Répondre à la demande de rapport d'agent1 via MQTT (`agents/<name>/control` payload `{"command": "report"}`)
|
||||
- [ ] Publier le rapport sur `agents/daily_report` avec toutes les stats
|
||||
|
||||
### Agent1 (compilation)
|
||||
- [ ] S'abonner à `agents/daily_report`
|
||||
- [ ] Stocker les rapports reçus en mémoire + SQLite (`daily_reports` table dans `executions.db`)
|
||||
- [ ] Nouveau skill `skills/daily_report.py` : trigger `DAILY_REPORT:`
|
||||
- [ ] Compiler les rapports de tous les agents
|
||||
- [ ] Formater un résumé lisible
|
||||
- [ ] Envoyer à `sylvain@xmpp.ovh` via XMPP
|
||||
- [ ] Stocker pour historique
|
||||
- [ ] Planifier `DAILY_REPORT:` dans `tasks_schedule.json` (ex: 22:30)
|
||||
- [ ] Disponible aussi à la demande : `DAILY_REPORT:` depuis CLI
|
||||
- [x] Chaque agent2_* : stats depuis queue.db, envoi sur `agents/daily_report`
|
||||
- [x] Agent1 : souscription `agents/daily_report`, stockage en mémoire
|
||||
- [x] `skills/daily_report.py` : DAILY_REPORT: [agent]
|
||||
- [x] APScheduler : sollicitation agents à 22:00/22:05/22:10, rapport journalier à 22:30
|
||||
|
||||
---
|
||||
|
||||
@@ -92,6 +72,58 @@
|
||||
|
||||
---
|
||||
|
||||
---
|
||||
|
||||
## 6. Gestion des mises à jour depuis les dépôts git
|
||||
|
||||
### Commandes XMPP (sylvain → agent1)
|
||||
|
||||
| Commande | Effet |
|
||||
|---|---|
|
||||
| `!agentUPDATE <nom>` | Vérifie si une mise à jour est disponible sur le dépôt git de l'agent (git fetch + comparaison) |
|
||||
| `!agentsUPDATE` | Vérifie les dépôts de tous les agents enregistrés |
|
||||
| `!agentUPGRADE <nom>` | Tire les modifications (git pull) + redémarre le service systemd de l'agent |
|
||||
| `!agentsUPGRADE` | Git pull + restart de tous les agents |
|
||||
|
||||
### Comportement attendu
|
||||
|
||||
- `!agentUPDATE` : affiche le résultat de `git fetch` + `git log HEAD..origin/main --oneline`
|
||||
- Réponse : "Mise à jour disponible : X commit(s)" ou "Déjà à jour"
|
||||
- `!agentUPGRADE` :
|
||||
1. `git pull origin main` dans le répertoire de l'agent
|
||||
2. `systemctl restart <service_name>` pour relancer le service
|
||||
3. Confirmation ou erreur envoyée via XMPP
|
||||
4. Attendre que le service soit de nouveau EN LIGNE (statut MQTT) avant de confirmer
|
||||
|
||||
### Informations nécessaires par agent
|
||||
|
||||
Ajouter dans `agents_registry.json` :
|
||||
```json
|
||||
"agent2_debian13": {
|
||||
"install_path" : "/opt/agent2_debian13",
|
||||
"service_name" : "agent2_debian13",
|
||||
"git_branch" : "main"
|
||||
}
|
||||
```
|
||||
|
||||
### Fichiers à modifier / créer
|
||||
|
||||
| Fichier | Action |
|
||||
|---|---|
|
||||
| `config/agents_registry.json` | Ajouter `install_path`, `service_name`, `git_branch` par agent |
|
||||
| `agent1.py` | Gérer `!agentUPDATE/UPGRADE` et `!agentsUPDATE/UPGRADE` |
|
||||
| `skills/agent_update.py` | **Créer** — logique git fetch/pull + systemctl restart |
|
||||
|
||||
### Points d'attention
|
||||
|
||||
- [x] Exécuter `git` et `systemctl` via subprocess
|
||||
- [x] Timeout sur les commandes git/systemctl
|
||||
- [x] `!agentUPGRADE agent1` : git pull + réponse XMPP envoyée avant `systemctl restart agent`
|
||||
- [x] Gérer le cas où `install_path` n'est pas dans le registre
|
||||
- [x] `!agentUPDATE` suggère `!agentUPGRADE` si commits disponibles
|
||||
|
||||
---
|
||||
|
||||
## Fichiers impactés
|
||||
|
||||
| Fichier | Action |
|
||||
|
||||
@@ -6,9 +6,12 @@ import sys
|
||||
import threading
|
||||
import requests
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from slixmpp import ClientXMPP
|
||||
import paho.mqtt.client as mqtt
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
|
||||
sys.path.insert(0, "/opt/agent")
|
||||
from skills.loader import load_skills, run_skills
|
||||
@@ -40,14 +43,30 @@ SYSTEM_PROMPT = load_system_prompt()
|
||||
load_skills()
|
||||
|
||||
conversation_history = []
|
||||
xmpp_bot = None # référence globale pour répondre via XMPP depuis MQTT
|
||||
AGENTS_ONLINE = {} # {agent_name: {status, jid, mqtt_inbox, last_seen}}
|
||||
xmpp_bot = None
|
||||
AGENTS_ONLINE = {}
|
||||
|
||||
REGISTRY_FILE = CONFIG_DIR / "agents_registry.json"
|
||||
AGENTS_ONLINE_FILE = CONFIG_DIR / "agents_online.json"
|
||||
REPORTS_FILE = CONFIG_DIR / "reports_schedule.json"
|
||||
TASKS_FILE = CONFIG_DIR / "tasks_schedule.json"
|
||||
BLACKOUT_FILE = CONFIG_DIR / "blackout_hours.json"
|
||||
|
||||
# ── MODE VEILLE ───────────────────────────────────────────────────────────
|
||||
SLEEP_MODE = False
|
||||
|
||||
# ── CONFIRMATION CONFIG EN ATTENTE ────────────────────────────────────────
|
||||
# {"file": "reports_schedule" | "tasks_schedule", "content": dict, "description": str}
|
||||
PENDING_CONFIG = None
|
||||
|
||||
# ── RAPPORTS JOURNALIERS (reçus des agents) ───────────────────────────────
|
||||
daily_reports = {} # {agent_name: payload_dict}
|
||||
|
||||
# ── SCHEDULER ─────────────────────────────────────────────────────────────
|
||||
scheduler = BackgroundScheduler()
|
||||
|
||||
# ── AGENTS CONTEXT ────────────────────────────────────────────────────────
|
||||
def _get_agents_context() -> str:
|
||||
"""Construit dynamiquement la liste des agents (registre + statut en ligne)."""
|
||||
try:
|
||||
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
@@ -96,17 +115,341 @@ def ask_llm(user_message: str, history: list = None) -> str:
|
||||
history.append({"role": "assistant", "content": err})
|
||||
return err
|
||||
|
||||
# ── MQTT LISTENER (pour CLI) ──────────────────────────────────────────────
|
||||
# ── MQTT ──────────────────────────────────────────────────────────────────
|
||||
mqtt_pub_client = None
|
||||
|
||||
def mqtt_publish(topic: str, message: str):
|
||||
if mqtt_pub_client:
|
||||
mqtt_pub_client.publish(topic, message)
|
||||
|
||||
def _send_control(agent_name: str, command: str):
|
||||
"""Envoie une commande de contrôle à un agent via MQTT."""
|
||||
payload = json.dumps({"command": command})
|
||||
mqtt_publish("agents/{}/control".format(agent_name), payload)
|
||||
print("[CONTROL] {} → {}".format(agent_name, command))
|
||||
|
||||
def _get_all_agents() -> list:
|
||||
"""Retourne la liste de tous les agents connus (registre)."""
|
||||
try:
|
||||
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
|
||||
return [n for n in registry if n != "agent1"]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
# ── COMMANDES !agent ──────────────────────────────────────────────────────
|
||||
def _handle_agent_command(text: str) -> tuple:
|
||||
"""
|
||||
Gère toutes les commandes !agent*.
|
||||
Retourne (handled: bool, reply: str|None).
|
||||
reply=None signifie que la réponse XMPP a déjà été envoyée (ex: self-upgrade).
|
||||
"""
|
||||
global SLEEP_MODE
|
||||
|
||||
t = text.strip()
|
||||
|
||||
# !agentsOFF — veille agent1 + pause tous les agents
|
||||
if t == "!agentsOFF":
|
||||
agents = _get_all_agents()
|
||||
for a in agents:
|
||||
_send_control(a, "pause")
|
||||
SLEEP_MODE = True
|
||||
return True, "[VEILLE] Agent1 en veille. {} agent(s) mis en pause.\nEnvoyez !agentsON ou !agentON agent1 pour reprendre.".format(len(agents))
|
||||
|
||||
# !agentsON — sortie veille agent1 + resume tous les agents
|
||||
if t == "!agentsON":
|
||||
agents = _get_all_agents()
|
||||
for a in agents:
|
||||
_send_control(a, "resume")
|
||||
SLEEP_MODE = False
|
||||
return True, "[ACTIF] Agent1 actif. {} agent(s) relancés.".format(len(agents))
|
||||
|
||||
# !agentOFF <nom>
|
||||
if t.startswith("!agentOFF "):
|
||||
name = t[len("!agentOFF "):].strip()
|
||||
if name == "agent1":
|
||||
SLEEP_MODE = True
|
||||
return True, "[VEILLE] Agent1 en veille. Envoyez !agentON agent1 pour reprendre."
|
||||
_send_control(name, "pause")
|
||||
return True, "[PAUSE] Commande pause envoyée à {}.".format(name)
|
||||
|
||||
# !agentON <nom>
|
||||
if t.startswith("!agentON "):
|
||||
name = t[len("!agentON "):].strip()
|
||||
if name == "agent1":
|
||||
SLEEP_MODE = False
|
||||
return True, "[ACTIF] Agent1 actif."
|
||||
_send_control(name, "resume")
|
||||
return True, "[ACTIF] Commande resume envoyée à {}.".format(name)
|
||||
|
||||
# !agentsUPDATE — vérifier les mises à jour de tous les agents
|
||||
if t == "!agentsUPDATE":
|
||||
return True, _handle_update_all()
|
||||
|
||||
# !agentUPDATE <nom>
|
||||
if t.startswith("!agentUPDATE "):
|
||||
name = t[len("!agentUPDATE "):].strip()
|
||||
return True, _handle_update_one(name)
|
||||
|
||||
# !agentsUPGRADE — mettre à jour tous les agents
|
||||
if t == "!agentsUPGRADE":
|
||||
return True, _handle_upgrade_all()
|
||||
|
||||
# !agentUPGRADE <nom>
|
||||
if t.startswith("!agentUPGRADE "):
|
||||
name = t[len("!agentUPGRADE "):].strip()
|
||||
return True, _handle_upgrade_one(name)
|
||||
|
||||
return False, None
|
||||
|
||||
# ── MISE À JOUR DEPUIS GIT ───────────────────────────────────────────────
|
||||
def _get_agent_git_info(name: str) -> dict:
|
||||
"""
|
||||
Retourne {install_path, service_name, git_branch} pour un agent.
|
||||
Priorité : registre → convention /opt/<nom> (fallback automatique).
|
||||
"""
|
||||
try:
|
||||
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
|
||||
agent = registry.get(name, {})
|
||||
except Exception:
|
||||
agent = {}
|
||||
|
||||
# Fallback : déduire depuis le nom de l'agent (convention /opt/<nom>)
|
||||
# agent1 est un cas particulier : /opt/agent, service "agent"
|
||||
if name == "agent1":
|
||||
default_path = "/opt/agent"
|
||||
default_service = "agent"
|
||||
else:
|
||||
default_path = "/opt/{}".format(name)
|
||||
default_service = name
|
||||
|
||||
return {
|
||||
"install_path": agent.get("install_path", default_path),
|
||||
"service_name": agent.get("service_name", default_service),
|
||||
"git_branch" : agent.get("git_branch", "main"),
|
||||
"ssh_host" : agent.get("ssh_host"),
|
||||
"ssh_user" : agent.get("ssh_user", "root"),
|
||||
}
|
||||
|
||||
def _handle_update_one(name: str) -> str:
|
||||
from skills.agent_update import check_update
|
||||
info = _get_agent_git_info(name)
|
||||
return check_update(name, info["install_path"], info["git_branch"],
|
||||
info["ssh_host"], info["ssh_user"])
|
||||
|
||||
def _handle_update_all() -> str:
|
||||
from skills.agent_update import check_update
|
||||
try:
|
||||
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return "Erreur lecture registre."
|
||||
results = []
|
||||
for name in registry:
|
||||
info = _get_agent_git_info(name)
|
||||
results.append(check_update(name, info["install_path"], info["git_branch"],
|
||||
info["ssh_host"], info["ssh_user"]))
|
||||
return "\n\n".join(results) if results else "Aucun agent dans le registre."
|
||||
|
||||
def _handle_upgrade_one(name: str) -> str:
|
||||
from skills.agent_update import do_upgrade
|
||||
info = _get_agent_git_info(name)
|
||||
self_upgrade = (name == "agent1")
|
||||
msg = do_upgrade(name, info["install_path"], info["service_name"],
|
||||
info["git_branch"], self_upgrade=self_upgrade,
|
||||
ssh_host=info["ssh_host"], ssh_user=info["ssh_user"])
|
||||
if self_upgrade and "Redémarrage en cours" in msg:
|
||||
# Envoyer le message XMPP avant le restart
|
||||
if xmpp_bot:
|
||||
xmpp_bot.send_message(mto=ADMIN_JID, mbody=msg, mtype='chat')
|
||||
import subprocess
|
||||
subprocess.Popen(["systemctl", "restart", info["service_name"]])
|
||||
return None # Réponse déjà envoyée manuellement
|
||||
return msg
|
||||
|
||||
def _handle_upgrade_all() -> str:
|
||||
from skills.agent_update import do_upgrade
|
||||
try:
|
||||
registry = json.loads(REGISTRY_FILE.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return "Erreur lecture registre."
|
||||
|
||||
results = []
|
||||
agent1_info = None
|
||||
|
||||
for name in registry:
|
||||
info = _get_agent_git_info(name)
|
||||
if name == "agent1":
|
||||
agent1_info = (name, info) # traiter en dernier
|
||||
continue
|
||||
msg = do_upgrade(name, info["install_path"],
|
||||
info["service_name"], info["git_branch"],
|
||||
ssh_host=info["ssh_host"], ssh_user=info["ssh_user"])
|
||||
results.append(msg)
|
||||
|
||||
summary = "\n\n".join(results) if results else "Aucun agent mis à jour."
|
||||
|
||||
if agent1_info:
|
||||
name, info = agent1_info
|
||||
pull_msg = do_upgrade(name, info["install_path"],
|
||||
info["service_name"], info["git_branch"],
|
||||
self_upgrade=True)
|
||||
summary += "\n\n" + pull_msg
|
||||
if xmpp_bot:
|
||||
xmpp_bot.send_message(mto=ADMIN_JID, mbody=summary, mtype='chat')
|
||||
import subprocess
|
||||
subprocess.Popen(["systemctl", "restart", info.get("service_name", "agent")])
|
||||
return None # Réponse déjà envoyée
|
||||
|
||||
return summary
|
||||
|
||||
# ── GESTION CONFIGS AVEC CONFIRMATION ────────────────────────────────────
|
||||
def _handle_config_command(text: str) -> str | None:
|
||||
"""Affiche ou propose une modification des fichiers de config."""
|
||||
global PENDING_CONFIG
|
||||
t = text.strip()
|
||||
|
||||
if t == "!reports":
|
||||
try:
|
||||
data = json.loads(REPORTS_FILE.read_text())
|
||||
return "=== reports_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
return "Erreur lecture : {}".format(e)
|
||||
|
||||
if t == "!tasks":
|
||||
try:
|
||||
data = json.loads(TASKS_FILE.read_text())
|
||||
return "=== tasks_schedule.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
return "Erreur lecture : {}".format(e)
|
||||
|
||||
if t == "!blackout":
|
||||
try:
|
||||
data = json.loads(BLACKOUT_FILE.read_text())
|
||||
return "=== blackout_hours.json ===\n" + json.dumps(data, indent=2, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
return "Erreur lecture : {}".format(e)
|
||||
|
||||
return None
|
||||
|
||||
def _apply_pending_config() -> str:
|
||||
global PENDING_CONFIG
|
||||
if not PENDING_CONFIG:
|
||||
return "Aucune modification en attente."
|
||||
try:
|
||||
file_key = PENDING_CONFIG["file"]
|
||||
content = PENDING_CONFIG["content"]
|
||||
desc = PENDING_CONFIG["description"]
|
||||
if file_key == "reports_schedule":
|
||||
REPORTS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False))
|
||||
_reload_report_scheduler()
|
||||
elif file_key == "tasks_schedule":
|
||||
TASKS_FILE.write_text(json.dumps(content, indent=2, ensure_ascii=False))
|
||||
_reload_task_scheduler()
|
||||
PENDING_CONFIG = None
|
||||
return "Modification appliquée : {}".format(desc)
|
||||
except Exception as e:
|
||||
PENDING_CONFIG = None
|
||||
return "Erreur lors de l'application : {}".format(e)
|
||||
|
||||
# ── RAPPORT JOURNALIER ────────────────────────────────────────────────────
|
||||
def _solicit_report(agent_name: str):
|
||||
"""Demande le rapport à un agent via MQTT control."""
|
||||
print("[REPORT] Sollicitation rapport de {}".format(agent_name))
|
||||
_send_control(agent_name, "report")
|
||||
|
||||
def _compile_and_send_daily_report():
|
||||
"""Compile tous les rapports reçus et envoie à Sylvain."""
|
||||
from skills.daily_report import compile_report
|
||||
report_text = compile_report(daily_reports)
|
||||
print("[REPORT] Rapport journalier compilé, envoi XMPP.")
|
||||
if xmpp_bot:
|
||||
xmpp_bot.send_message(mto=ADMIN_JID, mbody=report_text, mtype='chat')
|
||||
|
||||
def _reload_report_scheduler():
|
||||
"""(Re)planifie les sollicitations de rapports selon reports_schedule.json."""
|
||||
try:
|
||||
data = json.loads(REPORTS_FILE.read_text())
|
||||
agents = data.get("agents", {})
|
||||
for name, conf in agents.items():
|
||||
if not conf.get("enabled", True):
|
||||
continue
|
||||
t = conf["report_time"] # "HH:MM"
|
||||
hour, minute = map(int, t.split(":"))
|
||||
job_id = "report_{}".format(name)
|
||||
try:
|
||||
scheduler.remove_job(job_id)
|
||||
except Exception:
|
||||
pass
|
||||
scheduler.add_job(_solicit_report, "cron", args=[name],
|
||||
hour=hour, minute=minute, id=job_id,
|
||||
replace_existing=True)
|
||||
print("[SCHEDULER] Rapport {} planifié à {}".format(name, t))
|
||||
|
||||
# Rapport journalier compilé
|
||||
daily_t = data.get("daily_report_time", "22:30")
|
||||
if data.get("daily_report_enabled", True):
|
||||
dh, dm = map(int, daily_t.split(":"))
|
||||
try:
|
||||
scheduler.remove_job("daily_report")
|
||||
except Exception:
|
||||
pass
|
||||
scheduler.add_job(_compile_and_send_daily_report, "cron",
|
||||
hour=dh, minute=dm, id="daily_report",
|
||||
replace_existing=True)
|
||||
print("[SCHEDULER] Rapport journalier planifié à {}".format(daily_t))
|
||||
except Exception as e:
|
||||
print("[SCHEDULER] Erreur reload report scheduler : {}".format(e))
|
||||
|
||||
def _reload_task_scheduler():
|
||||
"""(Re)planifie les tâches selon tasks_schedule.json."""
|
||||
try:
|
||||
data = json.loads(TASKS_FILE.read_text())
|
||||
tasks = data.get("tasks", [])
|
||||
# Supprimer anciens jobs tasks_*
|
||||
for job in scheduler.get_jobs():
|
||||
if job.id.startswith("task_"):
|
||||
scheduler.remove_job(job.id)
|
||||
for i, t in enumerate(tasks):
|
||||
if not t.get("enabled", True):
|
||||
continue
|
||||
agent = t["agent"]
|
||||
task = t["task"]
|
||||
cron = t["cron"] # ex: "daily 03:00" ou "every 6h"
|
||||
job_id = "task_{}_{}".format(agent, i)
|
||||
_schedule_task_job(job_id, agent, task, cron)
|
||||
print("[SCHEDULER] Tâche planifiée [{} @ {}] → {}".format(agent, cron, task[:50]))
|
||||
except Exception as e:
|
||||
print("[SCHEDULER] Erreur reload task scheduler : {}".format(e))
|
||||
|
||||
def _schedule_task_job(job_id: str, agent: str, task: str, cron_expr: str):
|
||||
"""Planifie une tâche via APScheduler selon l'expression cron."""
|
||||
from skills.delegate import execute as delegate_exec
|
||||
def run():
|
||||
print("[SCHEDULER] Exécution tâche {} → {}".format(agent, task[:60]))
|
||||
delegate_exec("{} | {}".format(agent, task))
|
||||
|
||||
expr = cron_expr.strip()
|
||||
if expr.startswith("daily "):
|
||||
t = expr[6:].strip()
|
||||
h, m = map(int, t.split(":"))
|
||||
scheduler.add_job(run, "cron", hour=h, minute=m, id=job_id, replace_existing=True)
|
||||
elif expr.startswith("every ") and expr.endswith("h"):
|
||||
hours = int(expr[6:-1])
|
||||
scheduler.add_job(run, "interval", hours=hours, id=job_id, replace_existing=True)
|
||||
elif expr.startswith("every ") and expr.endswith("min"):
|
||||
mins = int(expr[6:-3])
|
||||
scheduler.add_job(run, "interval", minutes=mins, id=job_id, replace_existing=True)
|
||||
elif expr.startswith("weekly "):
|
||||
parts = expr[7:].split()
|
||||
day_map = {"lun":"mon","mar":"tue","mer":"wed","jeu":"thu","ven":"fri","sam":"sat","dim":"sun"}
|
||||
day = day_map.get(parts[0], parts[0])
|
||||
h, m = map(int, parts[1].split(":"))
|
||||
scheduler.add_job(run, "cron", day_of_week=day, hour=h, minute=m,
|
||||
id=job_id, replace_existing=True)
|
||||
|
||||
# ── MQTT HANDLERS ─────────────────────────────────────────────────────────
|
||||
def on_mqtt_message(client, userdata, msg):
|
||||
raw = msg.payload.decode(errors="replace")
|
||||
|
||||
# Support JSON avec reply_to optionnel
|
||||
reply_to = "agents/cli/outbox"
|
||||
task = raw
|
||||
try:
|
||||
@@ -120,10 +463,8 @@ def on_mqtt_message(client, userdata, msg):
|
||||
mqtt_history = []
|
||||
reply = ask_llm(task, history=mqtt_history)
|
||||
mqtt_publish(reply_to, reply)
|
||||
print("[MQTT] Réponse envoyée sur {}".format(reply_to))
|
||||
|
||||
def on_mqtt_error(client, userdata, msg):
|
||||
"""Reçoit les erreurs des agents et notifie l'utilisateur via XMPP."""
|
||||
try:
|
||||
data = json.loads(msg.payload.decode(errors="replace"))
|
||||
agent = data.get("agent", "?")
|
||||
@@ -139,14 +480,12 @@ def on_mqtt_error(client, userdata, msg):
|
||||
print("[MQTT] Erreur parsing notification : {}".format(e))
|
||||
|
||||
def on_mqtt_notification(client, userdata, msg):
|
||||
"""Reçoit les notifications du scheduler."""
|
||||
try:
|
||||
data = json.loads(msg.payload.decode(errors="replace"))
|
||||
status = data.get("status", "?")
|
||||
agent = data.get("agent", "?")
|
||||
task = data.get("task", "?")[:80]
|
||||
ts = data.get("timestamp", "?")
|
||||
# Notifier XMPP seulement en cas d'erreur ou de succès important
|
||||
if status == "error" and xmpp_bot:
|
||||
notif = "[PLANIF ERREUR] {} | {} → {}\nStatut : {}".format(ts, agent, task, status)
|
||||
xmpp_bot.send_message(mto=ADMIN_JID, mbody=notif, mtype='chat')
|
||||
@@ -154,8 +493,6 @@ def on_mqtt_notification(client, userdata, msg):
|
||||
print("[MQTT] Erreur parsing notification scheduler : {}".format(e))
|
||||
|
||||
def on_mqtt_status(client, userdata, msg):
|
||||
"""Suit le statut en ligne/hors-ligne des agents (LWT + retain)."""
|
||||
import time
|
||||
try:
|
||||
data = json.loads(msg.payload.decode(errors="replace"))
|
||||
agent = data.get("agent", "?")
|
||||
@@ -164,11 +501,9 @@ def on_mqtt_status(client, userdata, msg):
|
||||
was_online = AGENTS_ONLINE.get(agent, {}).get("status") == "online"
|
||||
AGENTS_ONLINE[agent] = {**data, "last_seen": time.time()}
|
||||
|
||||
# Sauvegarder pour la skill agents_online
|
||||
AGENTS_ONLINE_FILE.write_text(
|
||||
json.dumps(AGENTS_ONLINE, indent=2, ensure_ascii=False), encoding="utf-8")
|
||||
|
||||
# Notifier sylvain uniquement si le statut change
|
||||
is_online = status == "online"
|
||||
if is_online == was_online:
|
||||
return
|
||||
@@ -182,7 +517,6 @@ def on_mqtt_status(client, userdata, msg):
|
||||
print("[MQTT] Erreur parsing status : {}".format(e))
|
||||
|
||||
def on_mqtt_register(client, userdata, msg):
|
||||
"""Reçoit les déclarations de mise en ligne des agents et met à jour le registre."""
|
||||
try:
|
||||
data = json.loads(msg.payload.decode(errors="replace"))
|
||||
agent = data.get("agent", "?")
|
||||
@@ -191,24 +525,24 @@ def on_mqtt_register(client, userdata, msg):
|
||||
speciality = data.get("speciality", "")
|
||||
print("[REGISTER] {} en ligne (JID: {}, inbox: {})".format(agent, jid, mqtt_inbox))
|
||||
|
||||
# Mettre à jour agents_registry.json
|
||||
registry_file = CONFIG_DIR / "agents_registry.json"
|
||||
try:
|
||||
registry = json.loads(registry_file.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
registry = {}
|
||||
is_new = agent not in registry
|
||||
registry[agent] = {
|
||||
|
||||
# Préserver les champs existants (work_hours etc.) lors d'une mise à jour
|
||||
existing = registry.get(agent, {})
|
||||
existing.update({
|
||||
"jid" : jid,
|
||||
"mqtt_inbox" : mqtt_inbox,
|
||||
"mqtt_outbox": "agents/agent1/inbox",
|
||||
"speciality" : speciality,
|
||||
}
|
||||
})
|
||||
registry[agent] = existing
|
||||
registry_file.write_text(json.dumps(registry, indent=2, ensure_ascii=False), encoding="utf-8")
|
||||
if is_new:
|
||||
print("[REGISTER] {} ajouté au registre.".format(agent))
|
||||
|
||||
# Notifier sylvain via XMPP
|
||||
if xmpp_bot:
|
||||
status = "NOUVEAU" if is_new else "EN LIGNE"
|
||||
notif = "[{}] {}\n JID : {}\n MQTT : {}".format(status, agent, jid, mqtt_inbox)
|
||||
@@ -218,11 +552,21 @@ def on_mqtt_register(client, userdata, msg):
|
||||
except Exception as e:
|
||||
print("[MQTT] Erreur parsing register : {}".format(e))
|
||||
|
||||
def on_mqtt_daily_report(client, userdata, msg):
|
||||
"""Reçoit et stocke les rapports journaliers des agents."""
|
||||
try:
|
||||
data = json.loads(msg.payload.decode(errors="replace"))
|
||||
agent = data.get("agent", "?")
|
||||
daily_reports[agent] = data
|
||||
print("[REPORT] Rapport reçu de {} : {} tâches, {} erreurs".format(
|
||||
agent, data.get("tasks_today", 0), data.get("errors", 0)))
|
||||
except Exception as e:
|
||||
print("[MQTT] Erreur parsing daily_report : {}".format(e))
|
||||
|
||||
def start_mqtt_listener():
|
||||
global mqtt_pub_client
|
||||
|
||||
mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
|
||||
client_id="agent1_pub")
|
||||
mqtt_pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="agent1_pub")
|
||||
mqtt_pub_client.connect(MQTT_HOST, MQTT_PORT)
|
||||
mqtt_pub_client.loop_start()
|
||||
|
||||
@@ -232,7 +576,8 @@ def start_mqtt_listener():
|
||||
sub.message_callback_add("agents/scheduler/notifications", on_mqtt_notification)
|
||||
sub.message_callback_add("agents/register", on_mqtt_register)
|
||||
sub.message_callback_add("agents/status/+", on_mqtt_status)
|
||||
sub.on_message = on_mqtt_message # fallback
|
||||
sub.message_callback_add("agents/daily_report", on_mqtt_daily_report)
|
||||
sub.on_message = on_mqtt_message
|
||||
sub.connect(MQTT_HOST, MQTT_PORT)
|
||||
sub.subscribe([
|
||||
(MQTT_INBOX, 0),
|
||||
@@ -240,8 +585,9 @@ def start_mqtt_listener():
|
||||
("agents/scheduler/notifications", 0),
|
||||
("agents/register", 0),
|
||||
("agents/status/+", 0),
|
||||
("agents/daily_report", 0),
|
||||
])
|
||||
print("[MQTT] Agent1 écoute sur {}, agents/errors, agents/status/+, agents/register".format(MQTT_INBOX))
|
||||
print("[MQTT] Agent1 écoute sur {}, errors, status/+, register, daily_report".format(MQTT_INBOX))
|
||||
sub.loop_forever()
|
||||
|
||||
# ── BOT XMPP ─────────────────────────────────────────────────────────────
|
||||
@@ -259,6 +605,8 @@ class AgentBot(ClientXMPP):
|
||||
self.send_message(mto=ADMIN_JID, mbody="Agent1 (orchestrateur) en ligne !", mtype='chat')
|
||||
|
||||
async def message(self, msg):
|
||||
global SLEEP_MODE, PENDING_CONFIG
|
||||
|
||||
if msg['type'] not in ('chat', 'normal'):
|
||||
return
|
||||
if str(msg['from']).split('/')[0] != ADMIN_JID:
|
||||
@@ -266,17 +614,56 @@ class AgentBot(ClientXMPP):
|
||||
|
||||
user_input = msg['body'].strip()
|
||||
|
||||
# ── Commandes !agent* (prioritaires, toujours traitées) ──────────
|
||||
handled, agent_reply = _handle_agent_command(user_input)
|
||||
if handled:
|
||||
if agent_reply is not None:
|
||||
self.send_message(mto=ADMIN_JID, mbody=agent_reply, mtype='chat')
|
||||
return # None = réponse déjà envoyée manuellement (ex: self-upgrade)
|
||||
|
||||
# ── Mode veille : ignorer tout sauf commandes agent ───────────────
|
||||
if SLEEP_MODE:
|
||||
self.send_message(
|
||||
mto=ADMIN_JID,
|
||||
mbody="[VEILLE] Agent1 en veille. Envoyez !agentsON ou !agentON agent1 pour reprendre.",
|
||||
mtype='chat')
|
||||
return
|
||||
|
||||
# ── Commandes config ──────────────────────────────────────────────
|
||||
config_reply = _handle_config_command(user_input)
|
||||
if config_reply is not None:
|
||||
self.send_message(mto=ADMIN_JID, mbody=config_reply, mtype='chat')
|
||||
return
|
||||
|
||||
# ── Confirmation modification config en attente ───────────────────
|
||||
if PENDING_CONFIG is not None:
|
||||
if user_input.lower() in ("oui", "yes", "o", "confirme", "ok"):
|
||||
reply = _apply_pending_config()
|
||||
else:
|
||||
PENDING_CONFIG = None
|
||||
reply = "Modification annulée."
|
||||
self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat')
|
||||
return
|
||||
|
||||
# ── Reset conversation ─────────────────────────────────────────────
|
||||
if user_input == "!reset":
|
||||
conversation_history.clear()
|
||||
self.send_message(mto=ADMIN_JID, mbody="Conversation reinitialisee.", mtype='chat')
|
||||
return
|
||||
|
||||
# ── Traitement LLM normal ─────────────────────────────────────────
|
||||
loop = asyncio.get_event_loop()
|
||||
reply = await loop.run_in_executor(None, ask_llm, user_input)
|
||||
self.send_message(mto=ADMIN_JID, mbody=reply, mtype='chat')
|
||||
|
||||
# ── MAIN ─────────────────────────────────────────────────────────────────
|
||||
if __name__ == "__main__":
|
||||
# Démarrer le scheduler
|
||||
scheduler.start()
|
||||
_reload_report_scheduler()
|
||||
_reload_task_scheduler()
|
||||
|
||||
# Démarrer MQTT dans un thread séparé
|
||||
mqtt_thread = threading.Thread(target=start_mqtt_listener, daemon=True)
|
||||
mqtt_thread.start()
|
||||
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"agent2_deploy": {
|
||||
"status": "online",
|
||||
"agent": "agent2_deploy",
|
||||
"jid": "agent2_deploy@xmpp.ovh",
|
||||
"mqtt_inbox": "agents/agent2_deploy/inbox",
|
||||
"last_seen": 1772919416.397681
|
||||
},
|
||||
"agent2_ansible": {
|
||||
"status": "online",
|
||||
"agent": "agent2_ansible",
|
||||
"jid": "agent2_ansible@xmpp.ovh",
|
||||
"mqtt_inbox": "agents/agent2_ansible/inbox",
|
||||
"last_seen": 1772919417.011551
|
||||
},
|
||||
"agent2_debian13": {
|
||||
"status": "online",
|
||||
"agent": "agent2_debian13",
|
||||
"jid": "agent2_debian13@xmpp.ovh",
|
||||
"mqtt_inbox": "agents/agent2_debian13/inbox",
|
||||
"last_seen": 1772919416.9976034
|
||||
},
|
||||
"agent2_test": {
|
||||
"status": "online",
|
||||
"agent": "agent2_test",
|
||||
"jid": "agent2_test@xmpp.ovh",
|
||||
"mqtt_inbox": "agents/agent2_test/inbox",
|
||||
"last_seen": 1772919358.6175494
|
||||
}
|
||||
}
|
||||
@@ -4,26 +4,86 @@
|
||||
"mqtt_inbox": "agents/agent2_debian13/inbox",
|
||||
"mqtt_outbox": "agents/agent1/inbox",
|
||||
"speciality": "Administration Debian : apt, dpkg, systemd, conteneurs LXC/Docker, KVM, réseau, sécurité système",
|
||||
"work_hours": { "start": "07:00", "end": "23:00", "days": ["mon","tue","wed","thu","fri","sat","sun"], "enabled": true }
|
||||
"work_hours": {
|
||||
"start": "07:00",
|
||||
"end": "23:00",
|
||||
"days": [
|
||||
"mon",
|
||||
"tue",
|
||||
"wed",
|
||||
"thu",
|
||||
"fri",
|
||||
"sat",
|
||||
"sun"
|
||||
],
|
||||
"enabled": true
|
||||
},
|
||||
"install_path": "/opt/agent2_debian13",
|
||||
"service_name": "agent2_debian13",
|
||||
"git_branch": "main"
|
||||
},
|
||||
"agent2_ansible": {
|
||||
"jid": "agent2_ansible@xmpp.ovh",
|
||||
"mqtt_inbox": "agents/agent2_ansible/inbox",
|
||||
"mqtt_outbox": "agents/agent1/inbox",
|
||||
"speciality": "Automatisation infrastructure via Ansible : playbooks, commandes ad-hoc, déploiement multi-hôtes, gestion de configuration sur le réseau local",
|
||||
"work_hours": { "start": "07:00", "end": "23:00", "days": ["mon","tue","wed","thu","fri","sat","sun"], "enabled": true }
|
||||
"work_hours": {
|
||||
"start": "07:00",
|
||||
"end": "23:00",
|
||||
"days": [
|
||||
"mon",
|
||||
"tue",
|
||||
"wed",
|
||||
"thu",
|
||||
"fri",
|
||||
"sat",
|
||||
"sun"
|
||||
],
|
||||
"enabled": true
|
||||
},
|
||||
"install_path": "/opt/agent2_ansible",
|
||||
"service_name": "agent2_ansible",
|
||||
"git_branch": "main"
|
||||
},
|
||||
"agent2_deploy": {
|
||||
"jid": "agent2_deploy@xmpp.ovh",
|
||||
"mqtt_inbox": "agents/agent2_deploy/inbox",
|
||||
"mqtt_outbox": "agents/agent1/inbox",
|
||||
"speciality": "Déploiement d'agents : installe et configure d'autres agents sur des machines distantes ou locales via SSH",
|
||||
"work_hours": { "start": "08:00", "end": "20:00", "days": ["mon","tue","wed","thu","fri"], "enabled": true }
|
||||
"work_hours": {
|
||||
"start": "08:00",
|
||||
"end": "20:00",
|
||||
"days": [
|
||||
"mon",
|
||||
"tue",
|
||||
"wed",
|
||||
"thu",
|
||||
"fri"
|
||||
],
|
||||
"enabled": true
|
||||
},
|
||||
"install_path": "/opt/agent2_deploy",
|
||||
"service_name": "agent2_deploy",
|
||||
"git_branch": "main"
|
||||
},
|
||||
"agent2_test": {
|
||||
"jid": "agent2_test@xmpp.ovh",
|
||||
"mqtt_inbox": "agents/agent2_test/inbox",
|
||||
"mqtt_outbox": "agents/agent1/inbox",
|
||||
"speciality": "Spécialiste Debian : apt, systemd, conteneurs LXC/Docker, KVM, réseau, sécurité"
|
||||
"speciality": "Administration Debian : apt, dpkg, systemd, conteneurs LXC/Docker, KVM, réseau, sécurité système",
|
||||
"install_path": "/opt/agent2_test",
|
||||
"service_name": "agent2_test",
|
||||
"git_branch": "main",
|
||||
"ssh_host": "192.168.7.13",
|
||||
"ssh_user": "root"
|
||||
},
|
||||
"agent1": {
|
||||
"jid": "agent1@xmpp.ovh",
|
||||
"mqtt_inbox": "agents/agent1/inbox",
|
||||
"mqtt_outbox": "agents/agent1/inbox",
|
||||
"speciality": "Orchestrateur principal",
|
||||
"install_path": "/opt/agent",
|
||||
"service_name": "agent",
|
||||
"git_branch": "main"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,159 @@
|
||||
"""
|
||||
Utilitaire : vérification et application des mises à jour git pour les agents.
|
||||
|
||||
Fonctions appelées directement depuis agent1.py (pas de trigger LLM).
|
||||
|
||||
check_update(name, install_path, branch, ssh_host, ssh_user)
|
||||
do_upgrade(name, install_path, service, branch, ssh_host, ssh_user, self_upgrade)
|
||||
|
||||
Si ssh_host est défini, les commandes sont exécutées via SSH sur la machine distante.
|
||||
"""
|
||||
import subprocess
|
||||
import shlex
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def _run_local(cmd: str, cwd: str = None, timeout: int = 30) -> tuple:
|
||||
"""Lance une commande locale, retourne (stdout, stderr, returncode)."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
shlex.split(cmd),
|
||||
cwd=cwd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout
|
||||
)
|
||||
return result.stdout.strip(), result.stderr.strip(), result.returncode
|
||||
except subprocess.TimeoutExpired:
|
||||
return "", "Timeout ({} s)".format(timeout), -1
|
||||
except Exception as e:
|
||||
return "", str(e), -1
|
||||
|
||||
|
||||
def _run_ssh(cmd: str, ssh_host: str, ssh_user: str = "root",
|
||||
cwd: str = None, timeout: int = 30) -> tuple:
|
||||
"""Lance une commande via SSH, retourne (stdout, stderr, returncode)."""
|
||||
if cwd:
|
||||
cmd = "cd {} && {}".format(cwd, cmd)
|
||||
ssh_cmd = "sshpass -p 'Matador3721' ssh -o StrictHostKeyChecking=no -o ConnectTimeout=10 {}@{} {}".format(
|
||||
ssh_user, ssh_host, shlex.quote(cmd))
|
||||
try:
|
||||
result = subprocess.run(
|
||||
shlex.split(ssh_cmd),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout
|
||||
)
|
||||
return result.stdout.strip(), result.stderr.strip(), result.returncode
|
||||
except subprocess.TimeoutExpired:
|
||||
return "", "Timeout SSH ({} s)".format(timeout), -1
|
||||
except Exception as e:
|
||||
return "", str(e), -1
|
||||
|
||||
|
||||
def _run(cmd: str, cwd: str = None, timeout: int = 30,
|
||||
ssh_host: str = None, ssh_user: str = "root") -> tuple:
|
||||
"""Dispatche local ou SSH selon ssh_host."""
|
||||
if ssh_host:
|
||||
return _run_ssh(cmd, ssh_host, ssh_user, cwd=cwd, timeout=timeout)
|
||||
return _run_local(cmd, cwd=cwd, timeout=timeout)
|
||||
|
||||
|
||||
def check_update(agent_name: str, install_path: str, branch: str = "main",
|
||||
ssh_host: str = None, ssh_user: str = "root") -> str:
|
||||
"""
|
||||
Vérifie si une mise à jour est disponible sur le dépôt distant.
|
||||
Effectue un git fetch puis compare HEAD avec origin/<branch>.
|
||||
"""
|
||||
remote = " [{}]".format(ssh_host) if ssh_host else ""
|
||||
|
||||
# Vérifier que le répertoire existe
|
||||
if ssh_host:
|
||||
out, _, rc = _run("test -d {}".format(install_path),
|
||||
ssh_host=ssh_host, ssh_user=ssh_user)
|
||||
if rc != 0:
|
||||
return "[{}{}] Répertoire introuvable : {}".format(
|
||||
agent_name, remote, install_path)
|
||||
else:
|
||||
if not Path(install_path).is_dir():
|
||||
return "[{}] Répertoire introuvable : {}".format(agent_name, install_path)
|
||||
|
||||
# git fetch
|
||||
out, err, rc = _run("git fetch origin {}".format(branch),
|
||||
cwd=install_path, timeout=20,
|
||||
ssh_host=ssh_host, ssh_user=ssh_user)
|
||||
if rc != 0:
|
||||
return "[{}{}] Erreur git fetch : {}".format(agent_name, remote, err or out)
|
||||
|
||||
# Compter les commits disponibles
|
||||
out, err, rc = _run(
|
||||
"git log HEAD..origin/{} --oneline".format(branch),
|
||||
cwd=install_path, ssh_host=ssh_host, ssh_user=ssh_user)
|
||||
if rc != 0:
|
||||
return "[{}{}] Erreur git log : {}".format(agent_name, remote, err or out)
|
||||
|
||||
commits = [l for l in out.splitlines() if l.strip()]
|
||||
if not commits:
|
||||
return "[{}{}] Déjà à jour.".format(agent_name, remote)
|
||||
|
||||
lines = ["[{}{}] {} commit(s) disponible(s) :".format(
|
||||
agent_name, remote, len(commits))]
|
||||
for c in commits[:10]:
|
||||
lines.append(" {}".format(c))
|
||||
if len(commits) > 10:
|
||||
lines.append(" ... et {} autre(s)".format(len(commits) - 10))
|
||||
lines.append("Lancez !agentUPGRADE {} pour appliquer.".format(agent_name))
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def do_upgrade(agent_name: str, install_path: str,
|
||||
service_name: str, branch: str = "main",
|
||||
self_upgrade: bool = False,
|
||||
ssh_host: str = None, ssh_user: str = "root") -> str:
|
||||
"""
|
||||
Applique la mise à jour : git pull + systemctl restart.
|
||||
Pour agent1 (self_upgrade=True), retourne le message AVANT le restart.
|
||||
"""
|
||||
remote = " [{}]".format(ssh_host) if ssh_host else ""
|
||||
|
||||
# Vérifier que le répertoire existe
|
||||
if ssh_host:
|
||||
_, _, rc = _run("test -d {}".format(install_path),
|
||||
ssh_host=ssh_host, ssh_user=ssh_user)
|
||||
if rc != 0:
|
||||
return "[{}{}] Répertoire introuvable : {}".format(
|
||||
agent_name, remote, install_path)
|
||||
else:
|
||||
if not Path(install_path).is_dir():
|
||||
return "[{}] Répertoire introuvable : {}".format(agent_name, install_path)
|
||||
|
||||
# git pull
|
||||
out, err, rc = _run(
|
||||
"git pull origin {}".format(branch),
|
||||
cwd=install_path, timeout=60,
|
||||
ssh_host=ssh_host, ssh_user=ssh_user)
|
||||
if rc != 0:
|
||||
return "[{}{}] Erreur git pull : {}".format(agent_name, remote, err or out)
|
||||
|
||||
pull_msg = out or "Déjà à jour."
|
||||
|
||||
if self_upgrade:
|
||||
return "[{}] {}\nRedémarrage en cours...".format(agent_name, pull_msg)
|
||||
|
||||
# systemctl restart
|
||||
_, err, rc = _run("systemctl restart {}".format(service_name),
|
||||
timeout=15, ssh_host=ssh_host, ssh_user=ssh_user)
|
||||
if rc != 0:
|
||||
return "[{}{}] Pull OK mais restart échoué : {}".format(
|
||||
agent_name, remote, err)
|
||||
|
||||
# Vérifier que le service est bien remonté
|
||||
out, _, _ = _run("systemctl is-active {}".format(service_name),
|
||||
timeout=5, ssh_host=ssh_host, ssh_user=ssh_user)
|
||||
state = out.strip()
|
||||
if state == "active":
|
||||
return "[{}{}] Mise à jour appliquée. Service actif.\n{}".format(
|
||||
agent_name, remote, pull_msg)
|
||||
else:
|
||||
return "[{}{}] Pull OK, service état : {}. Vérifiez les logs.".format(
|
||||
agent_name, remote, state)
|
||||
@@ -0,0 +1,102 @@
|
||||
"""
|
||||
Skill : DAILY_REPORT
|
||||
Compile et formate le rapport journalier de tous les agents.
|
||||
|
||||
Commande :
|
||||
DAILY_REPORT: → rapport du jour
|
||||
DAILY_REPORT: <agent> → rapport d'un agent spécifique
|
||||
"""
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
SKILL_NAME = "daily_report"
|
||||
TRIGGER = "DAILY_REPORT:"
|
||||
|
||||
|
||||
def _fmt_uptime(seconds: int) -> str:
|
||||
if seconds < 60:
|
||||
return "{}s".format(seconds)
|
||||
elif seconds < 3600:
|
||||
return "{}m{}s".format(seconds // 60, seconds % 60)
|
||||
else:
|
||||
h = seconds // 3600
|
||||
m = (seconds % 3600) // 60
|
||||
return "{}h{}m".format(h, m)
|
||||
|
||||
|
||||
def compile_report(daily_reports: dict, agent_filter: str = None) -> str:
|
||||
"""
|
||||
Compile les rapports reçus des agents en un texte formaté.
|
||||
daily_reports : {agent_name: {tasks_today, success, errors, avg_duration_s,
|
||||
last_error, pending, uptime_s, paused, timestamp}}
|
||||
"""
|
||||
now = datetime.now().strftime("%Y-%m-%d %H:%M")
|
||||
lines = ["=== RAPPORT JOURNALIER — {} ===".format(now), ""]
|
||||
|
||||
if not daily_reports:
|
||||
lines.append("Aucun rapport reçu des agents.")
|
||||
return "\n".join(lines)
|
||||
|
||||
agents = {k: v for k, v in daily_reports.items()
|
||||
if agent_filter is None or k == agent_filter}
|
||||
|
||||
if not agents:
|
||||
return "Aucun rapport pour l'agent '{}'.".format(agent_filter)
|
||||
|
||||
total_tasks = 0
|
||||
total_errors = 0
|
||||
|
||||
for agent_name, data in sorted(agents.items()):
|
||||
tasks = data.get("tasks_today", 0)
|
||||
success = data.get("success", 0)
|
||||
errors = data.get("errors", 0)
|
||||
avg_dur = data.get("avg_duration_s", 0)
|
||||
pending = data.get("pending", 0)
|
||||
uptime = data.get("uptime_s", 0)
|
||||
paused = data.get("paused", False)
|
||||
last_err= data.get("last_error")
|
||||
ts = data.get("timestamp", "?")
|
||||
|
||||
total_tasks += tasks
|
||||
total_errors += errors
|
||||
|
||||
state = "[EN PAUSE]" if paused else "[ACTIF]"
|
||||
lines.append("── {} {} ──".format(agent_name, state))
|
||||
lines.append(" Rapport : {}".format(ts))
|
||||
lines.append(" Uptime : {}".format(_fmt_uptime(uptime)))
|
||||
lines.append(" Tâches : {} total, {} succès, {} erreurs".format(tasks, success, errors))
|
||||
if pending:
|
||||
lines.append(" En attente : {}".format(pending))
|
||||
if avg_dur:
|
||||
lines.append(" Durée moy : {:.1f}s".format(avg_dur))
|
||||
if last_err:
|
||||
lines.append(" Dernière erreur : {}".format(str(last_err)[:150]))
|
||||
lines.append("")
|
||||
|
||||
if len(agents) > 1:
|
||||
lines.append("── TOTAL ──")
|
||||
lines.append(" Tâches : {} | Erreurs : {}".format(total_tasks, total_errors))
|
||||
if total_tasks > 0:
|
||||
rate = round((total_tasks - total_errors) / total_tasks * 100, 1)
|
||||
lines.append(" Taux de succès : {}%".format(rate))
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def execute(args: str) -> str:
|
||||
"""
|
||||
Appelé par le LLM via DAILY_REPORT:.
|
||||
Lit les rapports stockés dans agent1.daily_reports.
|
||||
"""
|
||||
import sys
|
||||
# Accéder aux rapports stockés dans agent1 (module principal)
|
||||
agent_filter = args.strip() or None
|
||||
daily_reports = {}
|
||||
try:
|
||||
# agent1.py est le __main__, on récupère ses daily_reports via sys.modules
|
||||
main_mod = sys.modules.get("__main__")
|
||||
if main_mod and hasattr(main_mod, "daily_reports"):
|
||||
daily_reports = main_mod.daily_reports
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return compile_report(daily_reports, agent_filter)
|
||||
Reference in New Issue
Block a user