dendrux
v0.2.0a1 · alphaGet started

Every HTTP endpoint dendrux exposes through make_read_router, every SSE event type a client can receive, and every agent method you wrap in your own write routes. All parameters, all payloads, all exceptions.

HTTP API surface

Dendrux is a library. It does not run a server for you. You import make_read_router, pass it to app.include_router(...), and you own the rest (auth, prefix, CORS, TLS, deploy). For write-side actions (submit tool results, submit input, submit approval, cancel) you wrap Agent methods in your own routes.

This page is the full inventory: read endpoints, the SSE stream, the agent write methods, the event types that land on the stream, and the stable exceptions to catch. Everything below is lifted from the code in dendrux/http/read_router.py, dendrux/store/__init__.py, dendrux/agent.py, dendrux/errors.py, and dendrux/types.py.

Mounting the read router

from fastapi import FastAPI, Depends, HTTPException
from dendrux.http import make_read_router
from dendrux.store import RunStore
 
app = FastAPI()
 
async def authorize(request: Request) -> None:
    token = request.headers.get("authorization")
    if token != f"Bearer {EXPECTED_TOKEN}":
        raise HTTPException(status_code=401, detail="unauthorized")
 
store = RunStore.from_database_url("postgresql+asyncpg://...")
router = make_read_router(store=store, authorize=authorize)
app.include_router(router, prefix="/dendrux")

authorize runs on every route except /health. Raise HTTPException inside to deny; return any value (ignored) to allow. The factory does not interpret your return value or impose an auth shape.

Read endpoints

All routes are read-only. All pagination uses exclusive cursors or limit/offset. The router returns JSON bodies serialized from frozen dataclasses defined in dendrux/store/__init__.py.

GET /health

Unauthenticated. For Kubernetes probes and load balancers.

Response: {"status": "ok"}

GET /runs

List runs with SQL-side filtering and pagination.

Query parameters:

NameTypeDefaultMeaning
statuslist[str] (repeatable)allFilter by RunStatus values. Repeat for OR: ?status=running&status=waiting_approval.
agent_namestrallExact-match agent name.
parent_run_idstrallReturn only child runs of this parent.
tenant_idstrallMulti-tenant isolation filter.
started_afterISO-8601 datetimenoneInclusive lower bound on created_at.
started_beforeISO-8601 datetimenoneExclusive upper bound on created_at.
limitint 1-100050Page size.
offsetint >= 00Rows to skip.

Response:

{
  "items": [
    {
      "run_id": "01HXYZ...",
      "agent_name": "calculator",
      "status": "success",
      "created_at": "2026-04-18T12:00:00Z",
      "updated_at": "2026-04-18T12:00:04Z",
      "iteration_count": 2,
      "total_input_tokens": 120,
      "total_output_tokens": 48,
      "total_cache_read_tokens": 7706,
      "total_cache_creation_tokens": 0,
      "total_cost_usd": 0.0012,
      "model": "claude-haiku-4-5",
      "parent_run_id": null,
      "delegation_level": 0
    }
  ],
  "total": 1,
  "limit": 50,
  "offset": 0
}

GET /runs/{run_id}

Single run detail. Returns the same summary fields plus strategy, input_data, answer, error, failure_reason.

Errors: 404 if not found.

GET /runs/{run_id}/events

Paginated durable event log (polling variant).

Query parameters:

NameTypeDefaultMeaning
afterintnoneExclusive cursor. Pass the previous batch's last sequence_index.
limitint 1-1000100Page size.

Response:

{
  "items": [
    {
      "sequence_index": 4,
      "iteration_index": 1,
      "event_type": "llm.completed",
      "correlation_id": null,
      "data": {"input_tokens": 9, "output_tokens": 18, "model": "claude-haiku-4-5", "has_tool_calls": true},
      "created_at": "2026-04-18T12:00:01Z"
    }
  ],
  "next_cursor": 4
}

Empty-batch rule: next_cursor echoes your input after when the page is empty. That means "nothing new yet," not "end of stream." Do not reset the cursor to null on empty batches; you will miss future events.

GET /runs/{run_id}/events/stream

Server-Sent Events. Reads from run_events in a per-connection polling loop. No shared queue, no fan-out manager. Each connection is an independent async iterator.

Cursor resolution (first match wins):

  1. Last-Event-ID header (SSE reconnect convention).
  2. ?after=N query param.
  3. None (stream from the beginning of the log).

A malformed Last-Event-ID falls back silently to ?after=N. Reverse proxies sometimes inject garbage on reconnect; treating that as a 500 would break reconnection.

Response headers:

Content-Type: text/event-stream
Cache-Control: no-cache
X-Accel-Buffering: no

Frame format:

id: 5
event: message
data: {"sequence_index": 5, "iteration_index": 1, "event_type": "run.paused", "correlation_id": null, "timestamp": "2026-04-18T12:00:02Z", "data": {...}}
 

Every frame uses event: message. The dendrux event type lives inside the JSON under event_type, so a generic EventSource client does not need to pre-enroll every dendrux event name.

Heartbeat: : keepalive\n\n every 15s of idleness. Prevents nginx (default 60s proxy_read_timeout) from closing the connection.

Poll cadence: 0.5s per connection when idle. That is ~2 DB queries per second per open SSE connection. Size your connection pool accordingly until Postgres LISTEN/NOTIFY ships.

Termination: the stream stays open until the client disconnects. Terminal events (run.completed, run.error, run.cancelled) arrive as normal frames; the client decides when to close. If the client drops, the generator ends on the next asyncio.sleep and the connection releases.

Reconnect pattern from a browser:

const es = new EventSource("/dendrux/runs/01HXYZ/events/stream?after=5", {
  withCredentials: true
});
es.addEventListener("message", (e) => {
  const frame = JSON.parse(e.data);
  // e.lastEventId is frame.sequence_index, automatically sent back on reconnect
  handle(frame);
});

GET /runs/{run_id}/llm-calls

LLM round-trips recorded during the run.

Query parameters:

NameTypeDefaultMeaning
iterationintallFilter to one iteration index.
limitint 1-1000100Page size.
offsetint >= 00Rows to skip.

Response items carry iteration, provider, model, input_tokens, output_tokens, total_tokens, cache_read_input_tokens, cache_creation_input_tokens, cost_usd, duration_ms, provider_request, provider_response, created_at. provider_request and provider_response are raw adapter-boundary JSON; treat as opaque.

GET /runs/{run_id}/tool-calls

Server tools plus submitted client and approval tools. Denied tools are not in this table (see Access control).

Query parameters: same shape as /llm-calls.

Response items: iteration, tool_name, tool_call_id, provider_tool_call_id, target (server / client), params, result, success, error, duration_ms, created_at.

GET /runs/{run_id}/traces

Full conversation trace (user, assistant, tool messages) in order_index order.

Query parameters: limit, offset.

Response items: role, content, order_index, meta, created_at. content is always the raw stored value — DB is ground truth. PII guardrails redact at the LLM-call boundary only; replay against agent_runs.pii_mapping to render the LLM-eye view.

GET /runs/{run_id}/pauses

Pause/resume cycles derived from run_events. Never reads AgentRun.pause_data (that column is execution state — it can include a pii_mapping with raw values for continuity across pause/resume — and is cleared on finalize).

Response items:

{
  "pause_sequence_index": 5,
  "pause_at": "2026-04-18T12:00:02Z",
  "resume_sequence_index": 8,
  "resume_at": "2026-04-18T12:00:30Z",
  "reason": "waiting_client_tool",
  "pending_tool_calls": [{"id": "...", "name": "read_file", "target": "client", "params": {...}}],
  "submitted_results": [{"name": "read_file", "call_id": "...", "payload": "...", "success": true}],
  "user_input": null
}

An unpaired run.paused at the end of the log represents an active pause (resume_sequence_index is null).

SSE event inventory

Every event_type that can land on the stream. Lifecycle events come from the runner; governance events come from the loop and guardrail engine. Field shapes are extracted from dendrux/runtime/runner.py, dendrux/runtime/persistence.py, and dendrux/loops/react.py.

Lifecycle events

event_typeWhendata payload
run.startedFirst event on any run{"agent_name": str, "system_prompt": str}
run.pausedRunner finalizes the current iteration into a pause{"status": "waiting_client_tool" | "waiting_human_input" | "waiting_approval", "pending_tool_calls": [{"id", "name", "target", "params"}]}
run.resumedSubmit method claims a paused run{"submitted_results": [...]} or {"user_input": str}
run.completedTerminal success or max_iterationsempty {}
run.cancelledPaused-run CAS cancel or in-flight run observes the flag{"reason": "cancel_requested"}
run.errorTerminal failure{"error": str} (truncated to 500 chars)
llm.completedprovider.complete() returns{"input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "cost_usd", "model", "has_tool_calls"}
tool.completedServer tool or submitted client/approval tool finishes{"tool_name", "target", "success", "duration_ms"}, correlation_id = tool call id

Governance events

From dendrux.types.GovernanceEventType. All thirteen values are deliverable through on_governance_event on notifiers and /events/stream.

event_typeFires whendata payload
policy.deniedLLM called a tool in the agent's deny= set{"tool_name", "call_id", "reason": "denied_by_policy"}, correlation_id = tool call id
approval.requestedTool in require_approval= set reached the executor{"tool_name", "call_id", "reason": "requires_approval"}, correlation_id = tool call id
approval.decidedHuman submitted approval decision{"decision": "approved" | "rejected", "run_id": str}
budget.thresholdUsage crossed a warn_at fraction{"fraction": float, "used": int, "max": int, "reason": "threshold_crossed"}
budget.exceededUsage reached max_tokens{"used": int, "max": int, "reason": "budget_exceeded"}
guardrail.detectedGuardrail engine found entities{"direction": "incoming" | "outgoing", "findings_count": int, "entities": list[str]}
guardrail.redactedIncoming scan replaced entities with placeholders before the LLM call{"direction": "incoming", "entities": list[str]}
guardrail.blockedGuardrail blocked a request or response{"direction": "incoming" | "outgoing", "error": str}
skill.registeredSkill loaded at run start{"skill_name": str, "description": str}
skill.deniedSkill filtered by deny_skills={"skill_name": str, "reason": "denied_by_policy"}
skill.invokedLLM called use_skill(name=...){"skill_name": str}
mcp.connectedMCP source discovery completed{"source_name": str, "tool_count": int, "tool_names": list[str]}
mcp.errorMCP discovery failed{"error": str} (truncated to 500 chars)

Each budget.threshold fraction fires at most once per run. budget.exceeded fires at most once per run. Pause/resume persists the fired set, so resumed runs do not re-fire.

Write-side agent methods

Dendrux does not ship HTTP write routes. You wrap these four methods in your own endpoints so auth, validation, and transport stay yours.

agent.submit_tool_results(run_id, results, *, notifier=None) -> RunResult

Submit results for a run paused in WAITING_CLIENT_TOOL. Race-safe: persist-first handoff + CAS claim in one atomic step. Blocks until the next pause or terminal state.

results is list[ToolResult]. Every pending call id must be covered; unknown or missing ids raise InvalidToolResultError and nothing is claimed. ToolResult fields:

@dataclass(frozen=True)
class ToolResult:
    name: str           # tool name
    call_id: str        # dendrux-owned ULID matching ToolCall.id
    payload: str        # JSON string — serialize once on your side
    success: bool = True
    error: str | None = None
    duration_ms: int = 0

Raises: PersistenceNotConfiguredError, RunNotFoundError, RunNotPausedError, PauseStatusMismatchError, RunAlreadyClaimedError, RunAlreadyTerminalError, InvalidToolResultError.

agent.submit_input(run_id, text, *, notifier=None) -> RunResult

Submit a clarification for a run paused in WAITING_HUMAN_INPUT. Same race-safety as submit_tool_results. Blocks until next pause or terminal.

Raises: PersistenceNotConfiguredError, RunNotFoundError, RunNotPausedError, PauseStatusMismatchError, RunAlreadyClaimedError, RunAlreadyTerminalError.

agent.submit_approval(run_id, *, approved, rejection_reason=None, notifier=None) -> RunResult

Submit an approval decision for a run paused in WAITING_APPROVAL.

  • approved=True: CAS-claims, executes the pending tools server-side, feeds results back to the LLM.
  • approved=False: builds one synthetic failed ToolResult per pending call, each carrying rejection_reason (default: "User declined to run this tool."). The LLM decides what to do next.

Per-tool approve/reject within a batch is not supported. Pre-split upstream if you need finer granularity.

Raises: same set as the other submit methods.

agent.cancel_run(run_id) -> RunResult

Cooperative cancellation with atomic finalize on the paused path.

Behavior by current state:

  • Paused (any process): atomic CAS finalize to CANCELLED in one round-trip. Emits run.cancelled with {"reason": "cancel_requested"} (sequenced after any prior paused event so SSE clients reading by after_sequence_index do not miss it).
  • Running (any process, persisted): sets cancel_requested=True. The runner observes the flag at the top of the next iteration and at the pre-pause checkpoint. The current iteration's LLM and tool calls complete; cancel is observed between steps, not mid-step.
  • In-process submit/resume task on this agent instance: the asyncio task is also cancelled synchronously.
  • Terminal: no-op. Returns current persisted state. Does not raise.
  • Non-persisted: unsupported. Raises PersistenceNotConfiguredError.

Raises: PersistenceNotConfiguredError, RunNotFoundError.

Minimal write-route wrappers

A complete set of write endpoints on top of FastAPI:

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from dendrux.errors import (
    InvalidToolResultError,
    PauseStatusMismatchError,
    PersistenceNotConfiguredError,
    RunAlreadyClaimedError,
    RunAlreadyTerminalError,
    RunNotFoundError,
    RunNotPausedError,
)
from dendrux.types import ToolResult
 
writes = APIRouter(dependencies=[Depends(authorize)])
 
class ToolResultBody(BaseModel):
    name: str
    call_id: str
    payload: str
    success: bool = True
    error: str | None = None
    duration_ms: int = 0
 
@writes.post("/runs/{run_id}/tool-results")
async def submit_tool_results(run_id: str, results: list[ToolResultBody]):
    try:
        result = await agent.submit_tool_results(
            run_id, [ToolResult(**r.model_dump()) for r in results]
        )
    except RunNotFoundError:
        raise HTTPException(404, "run not found")
    except InvalidToolResultError as e:
        raise HTTPException(400, str(e))
    except (RunNotPausedError, PauseStatusMismatchError,
            RunAlreadyClaimedError, RunAlreadyTerminalError) as e:
        raise HTTPException(409, str(e))
    except PersistenceNotConfiguredError:
        raise HTTPException(500, "persistence not configured")
    return {"status": result.status.value, "answer": result.answer}
 
@writes.post("/runs/{run_id}/input")
async def submit_input(run_id: str, body: dict):
    # (same try/except shape)
    result = await agent.submit_input(run_id, body["text"])
    return {"status": result.status.value, "answer": result.answer}
 
@writes.post("/runs/{run_id}/approval")
async def submit_approval(run_id: str, body: dict):
    result = await agent.submit_approval(
        run_id,
        approved=body["approved"],
        rejection_reason=body.get("rejection_reason"),
    )
    return {"status": result.status.value, "answer": result.answer}
 
@writes.post("/runs/{run_id}/cancel")
async def cancel_run(run_id: str):
    try:
        result = await agent.cancel_run(run_id)
    except RunNotFoundError:
        raise HTTPException(404, "run not found")
    except PersistenceNotConfiguredError:
        raise HTTPException(500, "persistence not configured")
    return {"status": result.status.value}
 
app.include_router(writes, prefix="/dendrux")

That is the full write surface. No hidden endpoints, no middleware magic.

Exceptions

From dendrux.errors. Suggested HTTP mappings in the docstring there; use whatever fits your API.

ExceptionBaseTypical HTTPMeaning
RunNotFoundErrorLookupError404run_id does not exist.
RunNotPausedErrorRuntimeError409Run is not in any paused state.
PauseStatusMismatchErrorRuntimeError409Run is paused, but for a different reason than this method expects.
RunAlreadyClaimedErrorRuntimeError409Concurrent submit won the CAS. Poll for completion; do not retry the submit.
RunAlreadyTerminalErrorRuntimeError409Run is already terminal; cannot resume. cancel_run does not raise this; terminal cancel is a silent no-op.
InvalidToolResultErrorValueError400Submitted results don't match pending calls (unknown / duplicate / missing ids). No partial claim.
PersistenceNotConfiguredErrorRuntimeError500Agent has no DB. Configuration bug, not runtime condition.

Read-side types

From dendrux.store:

  • RunSummary - list view row.
  • RunDetail - single run detail.
  • StoredEvent - one run_events row.
  • LLMCall - one llm_interactions row.
  • ToolInvocation - one tool_calls row.
  • TraceEntry - one react_traces row.
  • PausePair - derived pause/resume pair.

All frozen dataclasses. Top-level fields are immutable; nested dict/list values are plain JSON (mutating them only touches your in-memory copy, never the DB).

RunStore construction:

# Store-owned engine (typical)
async with RunStore.from_database_url("postgresql+asyncpg://...") as store:
    ...
 
# Caller-owned engine (when your app already has one)
store = RunStore.from_engine(engine)

Streaming via the Python API (if you do not want SSE):

from dendrux.store import RunStore, RunNotFoundError
 
async with RunStore.from_database_url("...") as store:
    async for event in store.stream_events(run_id, after_sequence_index=5):
        if event.event_type in {"run.completed", "run.error", "run.cancelled"}:
            break

stream_events raises RunNotFoundError before the first yield if the run does not exist. Default poll interval is 0.5s, matching the SSE route. Caller decides when to stop (break out of the loop or call aclose() on the generator).

Where this fits

  • Read endpoints and SSE: dendrux/http/read_router.py, mounted via make_read_router(store=, authorize=).
  • Read types: dendrux/store/__init__.py (RunStore, RunSummary, RunDetail, StoredEvent, LLMCall, ToolInvocation, TraceEntry, PausePair).
  • Write-side agent methods: dendrux/agent.py (submit_tool_results, submit_input, submit_approval, cancel_run).
  • Exceptions: dendrux/errors.py.
  • Event type enum: dendrux/types.py (GovernanceEventType, RunStatus).
  • Lifecycle event strings: dendrux/runtime/runner.py and dendrux/runtime/persistence.py.