Skip to main content

Extending WASP

This guide covers how to extend WASP with new scheduler jobs, memory types, integration connectors, and other components.

Adding a Scheduler Job

1. Create the Job Class

# src/scheduler/my_job.py
import structlog
from .scheduler import BaseJob

logger = structlog.get_logger()

class MyCustomJob(BaseJob):
def __init__(self, redis_url: str, notify_chat_id: str = ""):
self.redis_url = redis_url
self.notify_chat_id = notify_chat_id

async def run(self) -> None:
logger.info("my_custom_job.starting")
try:
# Your job logic here
result = await do_something()
logger.info("my_custom_job.complete", result=result)
except Exception as e:
logger.error("my_custom_job.failed", error=str(e))

2. Register in main.py

# src/main.py
from .scheduler.my_job import MyCustomJob

# Inside the scheduler registration block:
scheduler.register(
"my_custom_job",
3600, # Interval in seconds
MyCustomJob(
redis_url=settings.redis_url,
notify_chat_id=settings.scheduler_notify_chat_id,
),
)

3. Rebuild

cd /home/agent && docker compose build agent-core && docker compose up -d agent-core

Adding a Memory Type

1. Create the Memory Module

# src/memory/my_memory.py
from sqlalchemy.ext.asyncio import AsyncSession
from ..db.models import MyMemoryTable # Create model first

async def store(session: AsyncSession, key: str, value: dict) -> None:
entry = MyMemoryTable(key=key, value=value)
session.add(entry)
await session.commit()

async def retrieve(session: AsyncSession, key: str) -> dict | None:
result = await session.execute(
select(MyMemoryTable).where(MyMemoryTable.key == key)
)
entry = result.scalar_one_or_none()
return entry.value if entry else None

async def format_for_context(session: AsyncSession) -> str:
"""Format relevant data for injection into system prompt."""
entries = await get_recent(session, limit=5)
if not entries:
return ""
lines = [f"• {e.key}: {e.value}" for e in entries]
return "[MY MEMORY]\n" + "\n".join(lines)

2. Add Database Model

# src/db/models.py
class MyMemoryTable(Base):
__tablename__ = "my_memory"
id = Column(UUID, primary_key=True, default=uuid4)
key = Column(String(255), index=True)
value = Column(JSONB)
created_at = Column(DateTime(timezone=True), server_default=func.now())

SQLAlchemy create_all() runs at startup — the table is created automatically.

3. Inject into Context

# src/agent/context.py — add to build_context()
async def _my_memory():
async with async_session() as session:
return await my_memory.format_for_context(session)

# Add to asyncio.gather() call
my_block, = await asyncio.gather(_my_memory())

# Add to system prompt assembly
system_parts.append(my_block)

Adding an Integration Connector

1. Create the Connector

# src/integrations/connectors/my_service.py
from .. import ConnectorBase, ConnectorDefinition, ConnectorResult, CapabilityLevel

class MyServiceConnector(ConnectorBase):
def definition(self) -> ConnectorDefinition:
return ConnectorDefinition(
id="my-service",
name="My Service",
description="Integrates with My Service API",
risk_level="medium",
capability_level=CapabilityLevel.CONTROLLED,
required_credentials=["api_key"],
actions=["send_message", "list_items"],
)

async def execute(
self,
action: str,
params: dict,
credentials: dict
) -> ConnectorResult:
api_key = credentials.get("api_key")

if action == "send_message":
# Your implementation
response = await self._send(api_key, params["message"])
return ConnectorResult(output=f"Sent: {response}")

elif action == "list_items":
items = await self._list(api_key)
return ConnectorResult(output="\n".join(items))

return ConnectorResult(error=f"Unknown action: {action}")

async def _send(self, api_key: str, message: str) -> str:
import httpx
async with httpx.AsyncClient() as client:
r = await client.post(
"https://api.myservice.com/send",
headers={"Authorization": f"Bearer {api_key}"},
json={"message": message}
)
return r.json().get("id")

2. Register in main.py

# src/main.py
from .integrations.connectors.my_service import MyServiceConnector

# In the connector registration block:
for _connector in [
# ... existing connectors ...
MyServiceConnector(),
]:
integration_registry.register(_connector)

3. Configure via Dashboard

After rebuild, the connector appears in /integrations. Configure credentials via the dashboard:

  1. Go to /integrations
  2. Find "My Service"
  3. Enter api_key
  4. Set autonomy policy

Or via API:

curl -b cookies.txt -X POST https://agentwasp.com/api/integrations/my-service/configure \
-H "Content-Type: application/json" \
-d '{"credentials": {"api_key": "your-key"}, "autonomy_mode": "semi"}'

Adding a Dashboard Route

1. Create Route Handler

# src/dashboard/routes/my_page.py
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates

router = APIRouter()
templates = Jinja2Templates(directory="src/dashboard/templates")

@router.get("/my-page", response_class=HTMLResponse)
async def my_page(request: Request):
return templates.TemplateResponse("my_page.html", {"request": request})

2. Create Template

<!-- src/dashboard/templates/my_page.html -->
{% extends "base.html" %}
{% block content %}
<div class="container">
<h1>My Custom Page</h1>
<!-- Content here -->
</div>
{% endblock %}

3. Register Route

# src/dashboard/app.py
from .routes.my_page import router as my_page_router

app.include_router(my_page_router)

Adding a New LLM Provider

New providers are added via ModelManager:

# src/models/providers/openai_provider.py (extend for new provider)
MY_SERVICE_MODELS = ["my-model-v1", "my-model-v2"]

# In auto_detect_providers():
if settings.my_service_api_key:
await self.register_provider(
"my_service",
settings.my_service_api_key,
base_url="https://api.myservice.com/v1"
)

Providers that are OpenAI-compatible require minimal code changes.