Amy
Recipes

Stream events from a turn

Goal: render a live UI from a turn's SSE event stream, spinners, agent names, validator verdicts, and the answer streaming in token by token. The "watch Amy think" feel from the CLI, in any client.

Goal: render a live UI from a turn's SSE event stream, spinners, agent names, validator verdicts, and the answer streaming in token by token. The "watch Amy think" feel from the CLI, in any client.

This recipe goes deeper than Ask a question. It covers the full event catalog, the right code for browsers / Node / RN, the reconnect protocol, graceful cancellation, and a worked CLI-style trace renderer.


STEP 1, Know which event drives which UI element

The SSE stream is the source of truth for everything that happens during a turn. Eight event types, each with a different job in the UI.

EventField you readDrives in the UI
turn.startedturn_id, atOpen the chat bubble. Start the elapsed-time counter.
agent.startedagentHeader label: "Data Science is running…". Spinner on.
agent.thoughtagent, deltaThe token stream. Append delta to the visible answer.
agent.completedagent, output_summaryHeader label: "Data Science done". Optionally collapse a sub-trace.
validator.gategate, verdict, evidenceA small inline pill: VALIDATED / REJECTED / CONDITIONAL.
validator.criticverdict, reasoningThe "Critic" line in the trace.
turn.completedturn_id, resultClose the bubble. Spinner off. Render result.answer as the final text.
turn.failedturn_id, errorRed banner with error.message. Stop the spinner.

A simple rule of thumb: agent.* drives the "who's working" and the streaming text; validator.* drives the trust indicators; turn.* drives the lifecycle.

Full payload shapes: SDK: TypeScript / Event types.


STEP 2, Subscribe in your runtime

The SDK works in Node, Bun, browsers, and React Native. Pick the column that matches yours.

RuntimeWhat to useHeaders?
Node / BunThe SDK's async iterator (amy.turns.stream(id))Handled by the SDK
BrowserThe SDK's async iterator, or raw EventSource (no auth header support, see workaround)EventSource can't set headers; the SDK uses fetch under the hood for this
React Nativereact-native-event-source + amy.turns.eventSourceInit(id)Passed via the constructor options

Node / Bun

import { Amy } from "@amy/sdk";

const amy = new Amy({
  apiKey: process.env.AMY_API_KEY!,
  baseUrl: process.env.AMY_BASE_URL!,
});

const turn = await amy.turns.create({
  messages: [{ role: "user", content: "How's my recovery?" }],
});

for await (const event of amy.turns.stream(turn.id)) {
  switch (event.type) {
    case "turn.started":     console.log("→ turn started"); break;
    case "agent.started":    console.log(`→ ${event.agent} running…`); break;
    case "agent.thought":    process.stdout.write(event.delta); break;
    case "agent.completed":  console.log(`\n  ${event.agent} done`); break;
    case "validator.gate":   console.log(`  gate ${event.gate}: ${event.verdict}`); break;
    case "validator.critic": console.log(`  critic: ${event.verdict}`); break;
    case "turn.completed":   console.log("\n✓", event.result.answer); break;
    case "turn.failed":      console.error("✗", event.error.message); break;
  }
}

Browser (EventSource)

EventSource doesn't accept custom headers, so you can't pass the Authorization: Bearer … header directly. The SDK works around this internally by using fetch with a ReadableStream. If you want raw EventSource, you can include the API key in a one-shot session cookie (see Browser notes), but the SDK is the easier path:

import { Amy } from "@amy/sdk";

const amy = new Amy({
  apiKey: localStorage.getItem("amy_api_key")!,
  baseUrl: "https://amy.heyamy.xyz",
});

const turn = await amy.turns.create({
  messages: [{ role: "user", content: "How's my recovery?" }],
});

const $bubble = document.querySelector("#answer")!;

for await (const event of amy.turns.stream(turn.id)) {
  if (event.type === "agent.thought") {
    $bubble.textContent += event.delta;
  }
  if (event.type === "turn.completed") {
    $bubble.textContent = event.result.answer; // canonical final text
  }
}

React Native

import RNEventSource from "react-native-event-source";
import { Amy } from "@amy/sdk";

const amy = new Amy({
  apiKey,
  baseUrl,
  EventSourceCtor: RNEventSource, // tell the SDK to use the RN polyfill
});

// …or, if you want to manage EventSource yourself:
const { url, headers } = amy.turns.eventSourceInit(turn.id);
const es = new RNEventSource(url, { headers });

es.addEventListener("agent.thought", (evt) => {
  const { delta } = JSON.parse(evt.data);
  setAnswer((prev) => prev + delta);
});

es.addEventListener("turn.completed", (evt) => {
  const { result } = JSON.parse(evt.data);
  setAnswer(result.answer);
  es.close();
});

STEP 3, Handle reconnects with Last-Event-Id

Streams break. Networks idle out, phones go to sleep, server processes restart. The SSE protocol has a built-in resume mechanism, the server sends an id: line with every event, and the client passes the last one it saw back via the Last-Event-Id header on reconnect.

The server replays from that ID forward. Replays are available for 1 hour after turn completion.

With the SDK (transparent)

The SDK reconnects automatically with exponential backoff. To observe it for debugging:

for await (const event of amy.turns.stream(turn.id, {
  onReconnect: (lastId) => console.log(`reconnecting from event ${lastId}`),
})) {
  // your handler
}

Manually

# Initial subscribe
curl -N -H "Authorization: Bearer $AMY_API_KEY" \
  "$AMY_BASE_URL/v1/turns/$TURN_ID/events"

# …network blip, last id seen was 42 …

# Resume from event 42
curl -N -H "Authorization: Bearer $AMY_API_KEY" \
  -H "Last-Event-Id: 42" \
  "$AMY_BASE_URL/v1/turns/$TURN_ID/events"

Server replays from event 43 forward.


STEP 4, Cancel a stream gracefully

You'll need this when the user closes the chat, navigates away, or hits a stop button. The SDK's iterator accepts an AbortSignal:

const controller = new AbortController();

// Wire to your UI's "stop" button
stopButton.onclick = () => controller.abort();

try {
  for await (const event of amy.turns.stream(turn.id, { signal: controller.signal })) {
    handleEvent(event);
  }
} catch (err) {
  if (err.name === "AbortError") {
    console.log("user cancelled the stream");
  } else {
    throw err;
  }
}

Aborting only closes the SSE connection on the client. The turn keeps running on the server, its result is still saved in D1 and retrievable later via GET /v1/turns/:id. There is no POST /v1/turns/:id/cancel endpoint in v1. If you want to ignore the result, just don't read it.


Worked example, CLI-style trace renderer

Below is a 60-line script that produces output close to the trace you see in the README's real-turn snippets, agent names, validator gates, costs, and the final answer streamed in.

// trace.ts
import { Amy } from "@amy/sdk";

const amy = new Amy({
  apiKey: process.env.AMY_API_KEY!,
  baseUrl: process.env.AMY_BASE_URL!,
});

const turn = await amy.turns.create({
  messages: [{ role: "user", content: process.argv[2] ?? "How's my recovery?" }],
});

const start = Date.now();
const ts = () => {
  const ms = Date.now() - start;
  const s = String(Math.floor(ms / 1000)).padStart(3, " ");
  return `[+${s}s]`;
};
const ANSI = { dim: "\x1b[2m", reset: "\x1b[0m", green: "\x1b[32m", red: "\x1b[31m", yellow: "\x1b[33m" };
const verdictColor = (v: string) =>
  v === "VALIDATED" ? ANSI.green : v === "REJECTED" ? ANSI.red : ANSI.yellow;

let currentAgent: string | null = null;
let answer = "";

for await (const event of amy.turns.stream(turn.id)) {
  switch (event.type) {
    case "turn.started":
      console.log(`${ANSI.dim}${ts()} orchestrator       → turn started${ANSI.reset}`);
      break;

    case "agent.started":
      currentAgent = event.agent;
      console.log(`${ANSI.dim}${ts()} ${event.agent.padEnd(18)} → running${ANSI.reset}`);
      break;

    case "agent.thought":
      // For the final synthesis agent, mirror tokens to stdout.
      if (event.agent === "synthesis") {
        process.stdout.write(event.delta);
        answer += event.delta;
      }
      break;

    case "agent.completed":
      console.log(`${ANSI.dim}${ts()} ${event.agent.padEnd(18)} ✓ done${ANSI.reset}`);
      break;

    case "validator.gate":
      console.log(
        `${ANSI.dim}${ts()} validator          ${verdictColor(event.verdict)}■ ${event.verdict.padEnd(11)}${ANSI.reset}${ANSI.dim} ${event.gate}${ANSI.reset}`,
      );
      break;

    case "validator.critic":
      console.log(`${ANSI.dim}${ts()} validator          → critic: ${event.verdict}${ANSI.reset}`);
      break;

    case "turn.completed":
      console.log(`\n${ANSI.dim}${ts()} orchestrator       ✓ turn complete ($${event.result.cost_usd.toFixed(4)})${ANSI.reset}`);
      // The synthesis stream is the canonical answer, but the final
      // result.answer is authoritative if you missed any tokens.
      if (!answer) console.log("\namy ", event.result.answer);
      break;

    case "turn.failed":
      console.error(`\n${ANSI.red}✗ turn failed: ${event.error.message}${ANSI.reset}`);
      process.exit(1);
  }
}

Run it:

bun trace.ts "is my -7.3% sleep score drop clinically meaningful?"

You'll get output like:

[+  0s] orchestrator       → turn started
[+  1s] data_science       → running
[+ 64s] data_science       ✓ done
[+ 64s] validator          ■ VALIDATED   ds-avg-rhr-all-data
[+ 91s] validator          → critic: accept
[+138s] domain_expert      → running
[+201s] domain_expert      ✓ done
[+201s] synthesis          → running
Short answer: no — by the most defensible read of your own data…
[+222s] synthesis          ✓ done
[+222s] orchestrator       ✓ turn complete ($0.1288)

Common mistakes

Rendering agent.thought for every agent

Most agents do their reasoning silently, only the synthesis agent streams the user-facing answer. If you render every agent.thought, the user sees a wall of internal monologue from investigator and data scientist. Filter on event.agent === "synthesis" for the visible answer, and use the other agents' thoughts only for an opt-in "verbose" mode.

Treating the stream's last agent.thought chunks as the final answer

The agent.thought stream is a best-effort token feed. The authoritative answer is turn.completed.result.answer, that's what was committed to D1 and what GET /v1/turns/:id returns. If you display the streamed tokens, swap them out for result.answer on turn.completed to handle any dropped chunks.

Forgetting that EventSource can't set Authorization

The browser's built-in EventSource API has no way to set request headers. If you try to use it directly with Authorization: Bearer, it silently sends no header and the server returns 401 (which EventSource reports as a generic onerror). Either use the SDK (which uses fetch + ReadableStream internally) or pass the API key via a one-shot signed URL.

Reconnecting from event id 0 after a disconnect

If you reconnect without Last-Event-Id, the server replays the entire stream from the start, and you'll re-render every event, re-stream every token, and confuse your UI. Track the last id you saw on every event and pass it on reconnect. (The SDK does this for you.)

Holding the connection open after turn.completed

The server closes the stream after turn.completed or turn.failed. If you keep your client's iterator alive, you'll eventually hit "stream closed" / EOF on the next read. Break out of the loop on turn.completed/turn.failed:

for await (const event of amy.turns.stream(turn.id)) {
  handleEvent(event);
  if (event.type === "turn.completed" || event.type === "turn.failed") break;
}

(The SDK's iterator does this automatically, break is only needed if you're parsing the raw SSE yourself.)


Where to next

On this page