Skip to main content

Execution Pipeline

The execution pipeline processes skill calls from LLM responses through safety checks, parallel coordination, and result aggregation.

Before invoking the LLM, the Decision Layer (pre-LLM heuristic classifier) routes the request — 13 fast-paths handle common patterns directly. Requests are routed to one of 5 strategies: DIRECT_RESPONSE, GOAL, SCHEDULED_TASK, SUB_AGENT, or SCRIPT. Only when no fast-path matches does the full LLM skill loop run.

SkillExecutor (src/skills/executor.py)

The SkillExecutor orchestrates all skill execution:

executor = SkillExecutor(
skill_registry,
model_manager=model_manager, # For anticipatory simulation
redis_url=settings.redis_url,
)

execute_batch(skill_calls) → list[SkillResult]

Main execution method. Handles both sequential and parallel execution:

async def execute_batch(skill_calls: list[SkillCall]) -> list[SkillResult]:
# Group by parallel_group
groups = group_by_parallel(skill_calls)

results = []
for group in groups:
if len(group) == 1:
# Sequential execution
result = await execute_one(group[0])
results.append(result)
else:
# Parallel execution
group_results = await asyncio.gather(*[execute_one(c) for c in group])
results.extend(group_results)

return results

execute_one(skill_call) → SkillResult

Single skill execution with full safety pipeline:

async def execute_one(call: SkillCall) -> SkillResult:
# 1. Check skill exists and is enabled
skill = registry.get(call.name)

# 2. Get capability level
level = capability_registry.get_level(call.name)

# 3. Anticipatory simulation (RESTRICTED/PRIVILEGED only)
if level in (RESTRICTED, PRIVILEGED):
simulation = await anticipate(call, context)
# Appended to result for agent self-reflection

# 4. Execute skill
try:
result = await skill.execute(**call.arguments)
except Exception as e:
result = SkillResult(error=str(e)[:300])

# 5. Audit log (CONTROLLED and above)
if policy.requires_audit:
await write_audit_log(call, result)

# 6. Redact secrets from output
result.output = redact(result.output or "")

return result

Skill Call Parsing

LLM responses are parsed for skill calls using parse_skill_calls():

# Pattern: <skill>name(param="value", param2=123)</skill>
_SKILL_CALL_RE = re.compile(
r"<skill>(\w+)\(([^)]*)\)</skill>",
re.DOTALL
)

Arguments are parsed as Python literals (safe eval):

call = SkillCall(
name="web_search",
arguments={"query": "BTC price today", "max_results": 5},
)

Parallel Execution

Skills in <parallel> blocks run concurrently:

<parallel>
<skill>web_search(query="BTC price")</skill>
<skill>web_search(query="ETH price")</skill>
</parallel>

All skills in a parallel group share the same parallel_group ID. execute_batch() processes them with asyncio.gather().

Anticipatory Simulation

Before executing RESTRICTED or PRIVILEGED skills, the simulation runs:

async def anticipate(call: SkillCall, context: str) -> str:
prompt = f"""
About to execute: {call.name}({call.arguments})
Context: {context[:500]}

Predict the outcome and any risks in 2-3 sentences.
"""
simulation = await model_manager.generate(prompt, max_tokens=300)
return f"[ANTICIPATORY SIMULATION]: {simulation}"

The simulation result is appended to the skill output, allowing the LLM to course-correct on the next round if the predicted outcome is problematic.

Simulations are cached in Redis for 5 minutes (same call + context → same prediction).

Audit Log

Every CONTROLLED, RESTRICTED, and PRIVILEGED skill call writes to audit_log:

ColumnDescription
skill_nameThe skill that was called
input_summaryArguments (secrets redacted)
output_summaryResult (secrets redacted)
capability_levelCONTROLLED/RESTRICTED/PRIVILEGED
risk_levelFrom RiskAssessor (RESTRICTED+ only)
duration_msExecution time
chat_idWho triggered it
created_atTimestamp

Query recent audit entries:

docker exec agent-postgres psql -U agent -d agent -c "
SELECT skill_name, input_summary, output_summary, created_at
FROM audit_log
ORDER BY created_at DESC
LIMIT 20;
"

Secret Redaction

Before writing to audit log, all outputs are passed through redact():

from src.utils.redaction import redact

redacted_output = redact(raw_output)

Patterns that are redacted:

  • OpenAI keys: sk-[a-zA-Z0-9]{20,}
  • Anthropic keys: sk-ant-[a-zA-Z0-9-]{20,}
  • Google keys: AIza[a-zA-Z0-9-_]{35}
  • AWS keys: AKIA[A-Z0-9]{16}
  • Stripe keys: sk_live_[a-zA-Z0-9]{24}
  • Bearer tokens, passwords in key=value pairs

Error Handling

Skill failures return structured errors:

SkillResult(
output=None,
error="TimeoutError: browser skill exceeded 60s timeout",
metadata={"duration_ms": 60000}
)

The LLM receives the error and can:

  • Try an alternative approach
  • Use a different skill
  • Report the error to the user

Rate Limiting

PRIVILEGED skills have a hard rate limit of 20 calls per hour. Other levels are currently unlimited but logged for analysis.

Rate limit state is tracked in Redis:

skill:rate:{skill_name}:{hour_bucket}  → count