Skip to main content

Execution Pipeline

The execution pipeline processes skill calls from LLM responses through safety checks, parallel coordination, and result aggregation.

SkillExecutor (src/skills/executor.py)

The SkillExecutor orchestrates all skill execution:

executor = SkillExecutor(
skill_registry,
model_manager=model_manager, # For anticipatory simulation
redis_url=settings.redis_url,
)

execute_batch(skill_calls) → list[SkillResult]

Main execution method. Handles both sequential and parallel execution:

async def execute_batch(skill_calls: list[SkillCall]) -> list[SkillResult]:
# Group by parallel_group
groups = group_by_parallel(skill_calls)

results = []
for group in groups:
if len(group) == 1:
# Sequential execution
result = await execute_one(group[0])
results.append(result)
else:
# Parallel execution
group_results = await asyncio.gather(*[execute_one(c) for c in group])
results.extend(group_results)

return results

execute_one(skill_call) → SkillResult

Single skill execution with full safety pipeline:

async def execute_one(call: SkillCall) -> SkillResult:
# 1. Check skill exists and is enabled
skill = registry.get(call.name)

# 2. Get capability level
level = capability_registry.get_level(call.name)

# 3. Anticipatory simulation (RESTRICTED/PRIVILEGED only)
if level in (RESTRICTED, PRIVILEGED):
simulation = await anticipate(call, context)
# Appended to result for agent self-reflection

# 4. Execute skill
try:
result = await skill.execute(**call.arguments)
except Exception as e:
result = SkillResult(error=str(e)[:300])

# 5. Audit log (CONTROLLED and above)
if policy.requires_audit:
await write_audit_log(call, result)

# 6. Redact secrets from output
result.output = redact(result.output or "")

return result

Skill Call Parsing

LLM responses are parsed for skill calls using parse_skill_calls():

# Pattern: <skill>name(param="value", param2=123)</skill>
_SKILL_CALL_RE = re.compile(
r"<skill>(\w+)\(([^)]*)\)</skill>",
re.DOTALL
)

Arguments are parsed as Python literals (safe eval):

call = SkillCall(
name="web_search",
arguments={"query": "BTC price today", "max_results": 5},
)

Parallel Execution

Skills in <parallel> blocks run concurrently:

<parallel>
<skill>web_search(query="BTC price")</skill>
<skill>web_search(query="ETH price")</skill>
</parallel>

All skills in a parallel group share the same parallel_group ID. execute_batch() processes them with asyncio.gather().

Anticipatory Simulation

Before executing RESTRICTED or PRIVILEGED skills, the simulation runs:

async def anticipate(call: SkillCall, context: str) -> str:
prompt = f"""
About to execute: {call.name}({call.arguments})
Context: {context[:500]}

Predict the outcome and any risks in 2-3 sentences.
"""
simulation = await model_manager.generate(prompt, max_tokens=300)
return f"[ANTICIPATORY SIMULATION]: {simulation}"

The simulation result is appended to the skill output, allowing the LLM to course-correct on the next round if the predicted outcome is problematic.

Simulations are cached in Redis for 5 minutes (same call + context → same prediction).

Audit Log

Every CONTROLLED, RESTRICTED, and PRIVILEGED skill call writes to audit_log:

ColumnDescription
skill_nameThe skill that was called
input_summaryArguments (secrets redacted)
output_summaryResult (secrets redacted)
capability_levelCONTROLLED/RESTRICTED/PRIVILEGED
risk_levelFrom RiskAssessor (RESTRICTED+ only)
duration_msExecution time
chat_idWho triggered it
created_atTimestamp

Query recent audit entries:

docker exec agent-postgres psql -U agent -d agent -c "
SELECT skill_name, input_summary, output_summary, created_at
FROM audit_log
ORDER BY created_at DESC
LIMIT 20;
"

Secret Redaction

Before writing to audit log, all outputs are passed through redact():

from src.utils.redaction import redact

redacted_output = redact(raw_output)

Patterns that are redacted:

  • OpenAI keys: sk-[a-zA-Z0-9]{20,}
  • Anthropic keys: sk-ant-[a-zA-Z0-9-]{20,}
  • Google keys: AIza[a-zA-Z0-9-_]{35}
  • AWS keys: AKIA[A-Z0-9]{16}
  • Stripe keys: sk_live_[a-zA-Z0-9]{24}
  • Bearer tokens, passwords in key=value pairs

Error Handling

Skill failures return structured errors:

SkillResult(
output=None,
error="TimeoutError: browser skill exceeded 60s timeout",
metadata={"duration_ms": 60000}
)

The LLM receives the error and can:

  • Try an alternative approach
  • Use a different skill
  • Report the error to the user

Rate Limiting

PRIVILEGED skills have a hard rate limit of 20 calls per hour. Other levels are currently unlimited but logged for analysis.

Rate limit state is tracked in Redis:

skill:rate:{skill_name}:{hour_bucket}  → count