fix(delegate): make delegation synchronous — wait for agent reply
Previously delegate returned immediately after sending the task, causing the agent's result to be lost. Now subscribes to a results topic and waits up to 120s for the actual response before returning it to Nexus LLM. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+34
-5
@@ -1,11 +1,17 @@
|
|||||||
"""
|
"""
|
||||||
Skill DELEGATE — déléguer une tâche à un agent spécialisé via MQTT.
|
Skill DELEGATE — déléguer une tâche à un agent spécialisé via MQTT.
|
||||||
|
Attend la réponse de l'agent (synchrone) avant de retourner.
|
||||||
|
|
||||||
Usage LLM : SKILL:delegate ARGS:<agent_id> | <tâche>
|
Usage LLM : SKILL:delegate ARGS:<agent_id> | <tâche>
|
||||||
"""
|
"""
|
||||||
DESCRIPTION = "Déléguer une tâche à un agent spécialisé"
|
import threading
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
DESCRIPTION = "Déléguer une tâche à un agent spécialisé et retourner son résultat"
|
||||||
USAGE = "SKILL:delegate ARGS:<agent_id> | <tâche>"
|
USAGE = "SKILL:delegate ARGS:<agent_id> | <tâche>"
|
||||||
|
|
||||||
|
TIMEOUT = 120 # secondes max d'attente de la réponse
|
||||||
|
|
||||||
|
|
||||||
def run(args: str, context) -> str:
|
def run(args: str, context) -> str:
|
||||||
if "|" not in args:
|
if "|" not in args:
|
||||||
@@ -21,11 +27,34 @@ def run(args: str, context) -> str:
|
|||||||
known = [a.agent_id for a in context.registry.all_agents()]
|
known = [a.agent_id for a in context.registry.all_agents()]
|
||||||
return f"Agent '{agent_id}' inconnu. Agents connus : {', '.join(known)}"
|
return f"Agent '{agent_id}' inconnu. Agents connus : {', '.join(known)}"
|
||||||
|
|
||||||
# Envoyer la tâche via MQTT
|
# Préparer la réception de la réponse
|
||||||
sent = context.mqtt.send_to(
|
corr_id = str(uuid.uuid4())
|
||||||
|
reply_topic = context.mqtt.topic_results(corr_id)
|
||||||
|
|
||||||
|
result_event = threading.Event()
|
||||||
|
result_holder = []
|
||||||
|
|
||||||
|
def on_result(msg, topic):
|
||||||
|
payload = msg.payload if hasattr(msg, "payload") else str(msg)
|
||||||
|
result_holder.append(payload)
|
||||||
|
result_event.set()
|
||||||
|
|
||||||
|
# S'abonner avant d'envoyer pour ne pas manquer la réponse
|
||||||
|
context.mqtt.subscribe(reply_topic, on_result)
|
||||||
|
|
||||||
|
# Envoyer la tâche
|
||||||
|
context.mqtt.send_to(
|
||||||
recipient_id=agent_id,
|
recipient_id=agent_id,
|
||||||
payload=task,
|
payload=task,
|
||||||
reply_to=context.mqtt.topic_inbox(),
|
correlation_id=corr_id,
|
||||||
|
reply_to=reply_topic,
|
||||||
)
|
)
|
||||||
|
|
||||||
return f"Tâche déléguée à {agent_id} (id={sent.correlation_id[:8]}). Attente de la réponse..."
|
# Attendre la réponse
|
||||||
|
got_reply = result_event.wait(timeout=TIMEOUT)
|
||||||
|
context.mqtt.unsubscribe(reply_topic)
|
||||||
|
|
||||||
|
if not got_reply:
|
||||||
|
return f"⏱ Timeout : {agent_id} n'a pas répondu en {TIMEOUT}s."
|
||||||
|
|
||||||
|
return result_holder[0] if result_holder else "Réponse vide de l'agent."
|
||||||
|
|||||||
Reference in New Issue
Block a user