Execution Pipeline
The execution pipeline processes skill calls from LLM responses through safety checks, parallel coordination, and result aggregation.
SkillExecutor (src/skills/executor.py)
The SkillExecutor orchestrates all skill execution:
executor = SkillExecutor(
skill_registry,
model_manager=model_manager, # For anticipatory simulation
redis_url=settings.redis_url,
)
execute_batch(skill_calls) → list[SkillResult]
Main execution method. Handles both sequential and parallel execution:
async def execute_batch(skill_calls: list[SkillCall]) -> list[SkillResult]:
# Group by parallel_group
groups = group_by_parallel(skill_calls)
results = []
for group in groups:
if len(group) == 1:
# Sequential execution
result = await execute_one(group[0])
results.append(result)
else:
# Parallel execution
group_results = await asyncio.gather(*[execute_one(c) for c in group])
results.extend(group_results)
return results
execute_one(skill_call) → SkillResult
Single skill execution with full safety pipeline:
async def execute_one(call: SkillCall) -> SkillResult:
# 1. Check skill exists and is enabled
skill = registry.get(call.name)
# 2. Get capability level
level = capability_registry.get_level(call.name)
# 3. Anticipatory simulation (RESTRICTED/PRIVILEGED only)
if level in (RESTRICTED, PRIVILEGED):
simulation = await anticipate(call, context)
# Appended to result for agent self-reflection
# 4. Execute skill
try:
result = await skill.execute(**call.arguments)
except Exception as e:
result = SkillResult(error=str(e)[:300])
# 5. Audit log (CONTROLLED and above)
if policy.requires_audit:
await write_audit_log(call, result)
# 6. Redact secrets from output
result.output = redact(result.output or "")
return result
Skill Call Parsing
LLM responses are parsed for skill calls using parse_skill_calls():
# Pattern: <skill>name(param="value", param2=123)</skill>
_SKILL_CALL_RE = re.compile(
r"<skill>(\w+)\(([^)]*)\)</skill>",
re.DOTALL
)
Arguments are parsed as Python literals (safe eval):
call = SkillCall(
name="web_search",
arguments={"query": "BTC price today", "max_results": 5},
)
Parallel Execution
Skills in <parallel> blocks run concurrently:
<parallel>
<skill>web_search(query="BTC price")</skill>
<skill>web_search(query="ETH price")</skill>
</parallel>
All skills in a parallel group share the same parallel_group ID. execute_batch() processes them with asyncio.gather().
Anticipatory Simulation
Before executing RESTRICTED or PRIVILEGED skills, the simulation runs:
async def anticipate(call: SkillCall, context: str) -> str:
prompt = f"""
About to execute: {call.name}({call.arguments})
Context: {context[:500]}
Predict the outcome and any risks in 2-3 sentences.
"""
simulation = await model_manager.generate(prompt, max_tokens=300)
return f"[ANTICIPATORY SIMULATION]: {simulation}"
The simulation result is appended to the skill output, allowing the LLM to course-correct on the next round if the predicted outcome is problematic.
Simulations are cached in Redis for 5 minutes (same call + context → same prediction).
Audit Log
Every CONTROLLED, RESTRICTED, and PRIVILEGED skill call writes to audit_log:
| Column | Description |
|---|---|
skill_name | The skill that was called |
input_summary | Arguments (secrets redacted) |
output_summary | Result (secrets redacted) |
capability_level | CONTROLLED/RESTRICTED/PRIVILEGED |
risk_level | From RiskAssessor (RESTRICTED+ only) |
duration_ms | Execution time |
chat_id | Who triggered it |
created_at | Timestamp |
Query recent audit entries:
docker exec agent-postgres psql -U agent -d agent -c "
SELECT skill_name, input_summary, output_summary, created_at
FROM audit_log
ORDER BY created_at DESC
LIMIT 20;
"
Secret Redaction
Before writing to audit log, all outputs are passed through redact():
from src.utils.redaction import redact
redacted_output = redact(raw_output)
Patterns that are redacted:
- OpenAI keys:
sk-[a-zA-Z0-9]{20,} - Anthropic keys:
sk-ant-[a-zA-Z0-9-]{20,} - Google keys:
AIza[a-zA-Z0-9-_]{35} - AWS keys:
AKIA[A-Z0-9]{16} - Stripe keys:
sk_live_[a-zA-Z0-9]{24} - Bearer tokens, passwords in key=value pairs
Error Handling
Skill failures return structured errors:
SkillResult(
output=None,
error="TimeoutError: browser skill exceeded 60s timeout",
metadata={"duration_ms": 60000}
)
The LLM receives the error and can:
- Try an alternative approach
- Use a different skill
- Report the error to the user
Rate Limiting
PRIVILEGED skills have a hard rate limit of 20 calls per hour. Other levels are currently unlimited but logged for analysis.
Rate limit state is tracked in Redis:
skill:rate:{skill_name}:{hour_bucket} → count