Extending WASP
This guide covers how to extend WASP with new scheduler jobs, memory types, integration connectors, and other components.
Adding a Scheduler Job
1. Create the Job Class
# src/scheduler/my_job.py
import structlog
from .scheduler import BaseJob
logger = structlog.get_logger()
class MyCustomJob(BaseJob):
def __init__(self, redis_url: str, notify_chat_id: str = ""):
self.redis_url = redis_url
self.notify_chat_id = notify_chat_id
async def run(self) -> None:
logger.info("my_custom_job.starting")
try:
# Your job logic here
result = await do_something()
logger.info("my_custom_job.complete", result=result)
except Exception as e:
logger.error("my_custom_job.failed", error=str(e))
2. Register in main.py
# src/main.py
from .scheduler.my_job import MyCustomJob
# Inside the scheduler registration block:
scheduler.register(
"my_custom_job",
3600, # Interval in seconds
MyCustomJob(
redis_url=settings.redis_url,
notify_chat_id=settings.scheduler_notify_chat_id,
),
)
3. Rebuild
cd /home/agent && docker compose build agent-core && docker compose up -d agent-core
Adding a Memory Type
1. Create the Memory Module
# src/memory/my_memory.py
from sqlalchemy.ext.asyncio import AsyncSession
from ..db.models import MyMemoryTable # Create model first
async def store(session: AsyncSession, key: str, value: dict) -> None:
entry = MyMemoryTable(key=key, value=value)
session.add(entry)
await session.commit()
async def retrieve(session: AsyncSession, key: str) -> dict | None:
result = await session.execute(
select(MyMemoryTable).where(MyMemoryTable.key == key)
)
entry = result.scalar_one_or_none()
return entry.value if entry else None
async def format_for_context(session: AsyncSession) -> str:
"""Format relevant data for injection into system prompt."""
entries = await get_recent(session, limit=5)
if not entries:
return ""
lines = [f"• {e.key}: {e.value}" for e in entries]
return "[MY MEMORY]\n" + "\n".join(lines)
2. Add Database Model
# src/db/models.py
class MyMemoryTable(Base):
__tablename__ = "my_memory"
id = Column(UUID, primary_key=True, default=uuid4)
key = Column(String(255), index=True)
value = Column(JSONB)
created_at = Column(DateTime(timezone=True), server_default=func.now())
SQLAlchemy create_all() runs at startup — the table is created automatically.
3. Inject into Context
# src/agent/context.py — add to build_context()
async def _my_memory():
async with async_session() as session:
return await my_memory.format_for_context(session)
# Add to asyncio.gather() call
my_block, = await asyncio.gather(_my_memory())
# Add to system prompt assembly
system_parts.append(my_block)
Adding an Integration Connector
1. Create the Connector
# src/integrations/connectors/my_service.py
from .. import ConnectorBase, ConnectorDefinition, ConnectorResult, CapabilityLevel
class MyServiceConnector(ConnectorBase):
def definition(self) -> ConnectorDefinition:
return ConnectorDefinition(
id="my-service",
name="My Service",
description="Integrates with My Service API",
risk_level="medium",
capability_level=CapabilityLevel.CONTROLLED,
required_credentials=["api_key"],
actions=["send_message", "list_items"],
)
async def execute(
self,
action: str,
params: dict,
credentials: dict
) -> ConnectorResult:
api_key = credentials.get("api_key")
if action == "send_message":
# Your implementation
response = await self._send(api_key, params["message"])
return ConnectorResult(output=f"Sent: {response}")
elif action == "list_items":
items = await self._list(api_key)
return ConnectorResult(output="\n".join(items))
return ConnectorResult(error=f"Unknown action: {action}")
async def _send(self, api_key: str, message: str) -> str:
import httpx
async with httpx.AsyncClient() as client:
r = await client.post(
"https://api.myservice.com/send",
headers={"Authorization": f"Bearer {api_key}"},
json={"message": message}
)
return r.json().get("id")
2. Register in main.py
# src/main.py
from .integrations.connectors.my_service import MyServiceConnector
# In the connector registration block:
for _connector in [
# ... existing connectors ...
MyServiceConnector(),
]:
integration_registry.register(_connector)
3. Configure via Dashboard
After rebuild, the connector appears in /integrations. Configure credentials via the dashboard:
- Go to
/integrations - Find "My Service"
- Enter
api_key - Set autonomy policy
Or via API:
curl -b cookies.txt -X POST https://agentwasp.com/api/integrations/my-service/configure \
-H "Content-Type: application/json" \
-d '{"credentials": {"api_key": "your-key"}, "autonomy_mode": "semi"}'
Adding a Dashboard Route
1. Create Route Handler
# src/dashboard/routes/my_page.py
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
router = APIRouter()
templates = Jinja2Templates(directory="src/dashboard/templates")
@router.get("/my-page", response_class=HTMLResponse)
async def my_page(request: Request):
return templates.TemplateResponse("my_page.html", {"request": request})
2. Create Template
<!-- src/dashboard/templates/my_page.html -->
{% extends "base.html" %}
{% block content %}
<div class="container">
<h1>My Custom Page</h1>
<!-- Content here -->
</div>
{% endblock %}
3. Register Route
# src/dashboard/app.py
from .routes.my_page import router as my_page_router
app.include_router(my_page_router)
Adding a New LLM Provider
New providers are added via ModelManager:
# src/models/providers/openai_provider.py (extend for new provider)
MY_SERVICE_MODELS = ["my-model-v1", "my-model-v2"]
# In auto_detect_providers():
if settings.my_service_api_key:
await self.register_provider(
"my_service",
settings.my_service_api_key,
base_url="https://api.myservice.com/v1"
)
Providers that are OpenAI-compatible require minimal code changes.