dendrux
v0.2.0a1 · alphaGet started

The pluggable side-channel hook for observing a run as it happens, with fail-open semantics so a broken notifier never kills a run.

Notifier

A notifier is an object you pass to agent.run(...) that gets called at every point where the loop mutates conversation history, completes an LLM call, finishes a tool, or emits a governance event. It is the official extension point for terminal output, metrics, logs, Slack/webhook fanout, real-time dashboards, and anything else that wants to watch a run without owning its durability.

Two rules apply everywhere: a notifier never blocks the run, and a notifier that raises never kills the run. Both are enforced by the framework, not by convention.

Where notifier fits next to recorder

Dendrux has two observers on the same four hooks: the recorder and the notifier. They look similar but they do different jobs and have opposite failure policies.

PropertyLoopRecorderLoopNotifier
PurposeDurable audit trail in the DBSide-channel for humans and external systems
PluggableNo, framework-owned (PersistenceRecorder)Yes, any implementation
Where you pass itNot a user parameteragent.run(..., notifier=...), per call
Failure policyFail-closed for authoritative writesFail-open, exceptions swallowed
What readers use it forReplay, SSE, audit queriesLive output, metrics, alerts

See Recorder for the other side of this pair.

The protocol

From dendrux/loops/base.py:

@runtime_checkable
class LoopNotifier(Protocol):
    """Notifier for loop events — best-effort notification hook.
 
    The loop fires these callbacks at the exact points where history mutates
    and provider.complete() returns. Implementations decide what to do:
    log, emit metrics, stream to SSE, print to console, etc.
 
    Failure policy: notifiers should not raise. If they do, the loop logs
    a warning and continues execution. Notification failures must not
    kill agent runs.
    """
 
    async def on_message_appended(self, message, iteration): ...
    async def on_llm_call_completed(self, response, iteration, *, ...): ...
    async def on_tool_completed(self, tool_call, tool_result, iteration): ...
    async def on_governance_event(self, event_type, iteration, data, correlation_id): ...

Four methods. Same shape as the recorder's protocol. A class that implements all four satisfies the LoopNotifier check automatically thanks to runtime_checkable.

Plugging one in

Unlike the recorder, the notifier is a per-call argument, not a constructor keyword. You pass it each time you call agent.run() or a resume method:

from dendrux import Agent
from dendrux.llm.anthropic import AnthropicProvider
from dendrux.notifiers import ConsoleNotifier
 
async with Agent(
    provider=AnthropicProvider(model="claude-haiku-4-5"),
    prompt="You are a calculator. Use the add tool.",
    tools=[add],
    database_url="sqlite+aiosqlite:///demo.db",
) as agent:
    await agent.run("What is 15 + 27?", notifier=ConsoleNotifier())

Per-call makes sense: a batch run wants metrics, a dev-loop invocation wants terminal output, and a production webhook wants Slack fanout. The same Agent can serve all three with different notifiers on different calls.

All submit_* and resume methods accept the same notifier= argument. If you pass a notifier to run() and a different one to submit_approval(), both are applied on their respective turns; nothing is carried over between calls.

A minimal custom notifier

The simplest useful notifier records every callback it receives. Here is the full implementation:

from dendrux.loops.base import LoopNotifier
 
class CaptureNotifier(LoopNotifier):
    """Minimal notifier that records every callback into a list."""
 
    def __init__(self):
        self.log = []
 
    async def on_message_appended(self, message, iteration):
        self.log.append(f"on_message_appended  iter={iteration} role={message.role.value}")
 
    async def on_llm_call_completed(self, response, iteration, **kwargs):
        u = response.usage
        self.log.append(f"on_llm_call_completed  iter={iteration} input={u.input_tokens}")
 
    async def on_tool_completed(self, tool_call, tool_result, iteration):
        self.log.append(f"on_tool_completed      iter={iteration} tool={tool_call.name!r}")
 
    async def on_governance_event(self, event_type, iteration, data, correlation_id=None):
        self.log.append(f"on_governance_event    iter={iteration} event={event_type!r}")

Running a simple ReAct query ("What is 15 + 27?" with an add tool) and printing capture.log afterwards:

on_message_appended    iter=0 role=user       content='What is 15 + 27?'
on_llm_call_completed  iter=1 input=585 output=69
on_message_appended    iter=1 role=assistant  content=''
on_tool_completed      iter=1 tool='add' success=True
on_message_appended    iter=1 role=tool       content='42'
on_llm_call_completed  iter=2 input=667 output=13
on_message_appended    iter=2 role=assistant  content='15 + 27 = **42**'
 
total callbacks received: 7

Seven callbacks for a two-iteration run. The ordering is deterministic: messages are announced as the loop appends them, and on_llm_call_completed fires between the assistant message and any tool execution it triggered. Everything a human dashboard needs to render a live transcript is here.

Note that the assistant message on iter=1 has empty text content because the LLM's response was a pure tool call (no prose). The response.tool_calls field on on_llm_call_completed carries that shape if you want to display it.

ConsoleNotifier: the built-in for terminal output

Dendrux ships two notifiers in dendrux.notifiers: ConsoleNotifier and CompositeNotifier. ConsoleNotifier uses rich to render the same run as a terminal panel with per-iteration steps:

╭──────────────────────────────────────────────────────╮
│ What is 15 + 27?                                     │
╰──────────────────────────────────────────────────────╯
    llm 654 tokens in 2.3s
 
  Step 1
    calling add a=15, b=27
    done    add 0.0s
    llm 676 tokens in 0.8s
 
  Step 2

Good for local dev. The class exposes three constructor options (show_llm_text, max_text_length, show_params) so you can tune what it prints.

CompositeNotifier is the other one, and its only job is to fan a single set of callbacks out to a list of inner notifiers. If you want both terminal output and a metrics sink, wrap them in CompositeNotifier([console, metrics]) and pass that.

Fail-open semantics

The loop does not call your notifier directly. It calls a thin wrapper in dendrux/loops/_helpers.py that swallows exceptions:

async def notify_message(notifier, message, iteration, warnings=None):
    """Notify notifier of a message append, swallowing exceptions."""
    if notifier is None:
        return
    try:
        await notifier.on_message_appended(message, iteration)
    except Exception:
        logger.warning("Notifier.on_message_appended failed", exc_info=True)
        if warnings is not None:
            warnings.append(f"on_message_appended failed at iteration {iteration}")

Four things to pull out of that wrapper:

  1. None is fine. Passing no notifier is the common path. The wrapper short-circuits.
  2. Exceptions do not propagate. If your Slack webhook times out, the run carries on.
  3. The warning is logged. You will see the traceback in your log stream at warning level, so the bug is not silent, it is just not fatal.
  4. Warnings are collected on the run. A text label is appended to a per-run warnings list, which surfaces on the final RunResult.meta["notifier_warnings"] and, when persisted by the runner, on run_events. The run still succeeds, and the operator can see which callback failed where.

The same pattern applies to notify_llm, notify_tool, and notify_governance. Every notifier hook is wrapped the same way.

Why a side-channel at all

You might ask: if the recorder already writes run_events and an SSE client can read them back in order, do you need a notifier?

Yes, for three reasons.

  1. Latency. run_events is a DB round-trip, then an SSE poll interval, then a client render. A notifier runs in the same event loop as the LLM call, and sees the event within a coroutine await. For terminal output, live metrics, or anything that wants sub-millisecond reaction, the notifier is the right channel.
  2. Richer payloads. A notifier receives the full Message, LLMResponse, ToolCall, and ToolResult objects. The DB event log stores a condensed projection (token counts, tool name, iteration, a correlation id). If you want the entire prompt, the entire response, or the full tool result, you get it in-memory in the callback. Persisting all of that would bloat the DB; putting it in the notifier avoids the tradeoff.
  3. Out-of-band destinations. Slack, Datadog, a custom websocket, a tqdm progress bar: none of these are storage. They do not want SSE. A notifier lets them hook in without pretending to be a durability layer.

The recorder is the run's canonical record. The notifier is its live broadcast. They coexist because their jobs are different, and the failure policies match those jobs: the canonical record refuses to drop rows, the live broadcast refuses to block the source.