Amy
Recipes

Add a new wearable adapter

Goal: make Amy understand a wearable it doesn't already speak to. Two paths: either Terra already supports it (config-only) or it doesn't (a small custom adapter conforming to Amy's normalizer contra…

Goal: make Amy understand a wearable it doesn't already speak to. Two paths: either Terra already supports it (config-only) or it doesn't (a small custom adapter conforming to Amy's normalizer contract). This recipe walks both, plus the testing and validation story.

The whole reason Amy can answer a single question across Whoop, Oura, Garmin, Fitbit, and Apple Watch is that every source normalizes into the same internal tables: daily_summary, sleep_sessions, activities, body_measurements, biomarkers. Once a new device's data hits those tables, the agents, Data Science, Domain Expert, Health Coach, work over it without any code change.

Adding an adapter is mostly about getting raw data into those tables. There are two paths.


STEP 1, Check if Terra already supports it

Terra brokers the OAuth + raw data for 30+ wearables. If your target vendor is in Terra's provider list, you don't write code. You add the provider key to the allowlist that backs the POST /v1/sources/terra/connect endpoint and the existing normalizer pipeline picks up the rest.

The change is two lines of config, conceptually:

// cloud/src/routes/connect.ts (sketch)
const SUPPORTED_PROVIDERS = new Set([
  "WHOOP", "OURA", "GARMIN", "FITBIT", "APPLE",
  "POLAR",   // ← new
]);

After that:

  1. POST /v1/sources/terra/connect accepts { provider: "POLAR" } (or whatever your client passes).
  2. Terra returns a widget URL that lets the user authorize Polar.
  3. Polar starts posting to Terra; Terra posts to POST /webhooks/terra.
  4. The existing normalizer (cloud/src/normalize/index.ts) dispatches on event_type (daily / sleep / activity / body) and writes to the same tables.

Step 4 is the magic: Terra has already normalized Polar's payload into Terra's standard event schema. The Amy normalizer doesn't care which vendor it came from, it sees provider: "polar" and writes a row.

End-to-end test for a Terra-supported provider: run the connect-a-wearable flow with the new provider, wait for a daily webhook (check the Worker logs), and verify a row appears in daily_summary via GET /v1/data/summaries/:today. No code paths changed.


STEP 2, If Terra doesn't support it: write a custom adapter

For a vendor Terra doesn't broker yet (a niche scale, a continuous glucose monitor, a manual CSV import), you need a custom adapter.

The job of an adapter is exactly three things:

  1. Authenticate with the vendor (OAuth, API key, whatever).
  2. Intake raw data, either by polling or by accepting webhook callbacks.
  3. Normalize each payload into the same row shape the Terra path uses, and upsert into the same tables.

The internal normalizer contract is the seam.

The DataAdapter contract

Today's code has Terra-specific normalizers at cloud/src/normalize/ (one file per event type: daily.ts, sleep.ts, activity.ts, body.ts, lab.ts). They share a common shape, what an adapter exposes to the queue consumer:

// packages/contracts/src/adapter.ts (the contract you'd want)
export interface DataAdapter<RawPayload = unknown> {
  /** Short stable name, e.g. "polar", "dexcom", "csv". */
  readonly id: string;

  /** Which event types this adapter emits. Subset of:
   *  "daily" | "sleep" | "activity" | "body" | "lab" | "auth". */
  readonly eventTypes: readonly string[];

  /** Optional: a webhook handler. Returns true if the request was for this adapter. */
  handleWebhook?(req: Request, env: Env): Promise<Response | null>;

  /** Optional: a polling tick (cron). Returns the number of raw events ingested. */
  poll?(env: Env, userId: string): Promise<number>;

  /** Convert one raw payload into one or more "normalized" upserts. */
  normalize(
    env: Env,
    userId: string,
    eventType: string,
    payload: RawPayload,
  ): Promise<NormalizeResult>;
}

export interface NormalizeResult {
  ok: boolean;
  rows?: number;       // how many table rows were written
  skipped?: string;    // reason for skip (e.g. "no_user")
  error?: string;
}

This is the contract you'd build to. It mirrors the existing NormalizeResult in cloud/src/normalize/index.ts; the new pieces are handleWebhook and poll so the adapter owns its own intake.

What normalize actually has to do

Map your vendor's payload to columns on the canonical tables. The existing daily/sleep normalizers are the templates, they use a small helper, buildCoalesceUpsert, that emits an INSERT … ON CONFLICT … DO UPDATE statement where null from your adapter never overwrites an existing value. That's how a sleep event can backfill HRV after a daily event already wrote steps for the same date.

The canonical tables (the shape your adapter writes into):

TablePrimary keyWhat lives here
daily_summary(user_id, source, datetime)One row per day per source, sleep score, HRV, RHR, strain, steps, recovery, …
sleep_sessions(user_id, source, start_time)One row per sleep session, durations, stages, raw payload
activities(user_id, source, start_time)One row per workout
body_measurements(user_id, source, measured_at)Weight, body-fat %, etc.
biomarkers(user_id, lab_id, name)Lab results: cholesterol, glucose, …

A minimal normalizer for a fictional "MyScale" body-comp device:

// cloud/src/adapters/myscale/normalize.ts
import type { Env } from "../../types";
import { buildCoalesceUpsert, utcDate } from "../../normalize/utils";

const BODY_COLS = ["weight_kg", "body_fat_percent", "muscle_mass_kg"];
const BODY_SQL = buildCoalesceUpsert(
  "body_measurements",
  ["user_id", "source", "measured_at"],
  BODY_COLS,
);

interface MyScalePayload {
  ts: string;             // ISO timestamp
  weight_kg: number;
  body_fat_pct?: number;
  muscle_kg?: number;
}

export async function normalize(env: Env, userId: string, payload: MyScalePayload) {
  await env.DB.prepare(BODY_SQL)
    .bind(
      userId,
      "myscale",
      payload.ts,
      payload.weight_kg,
      payload.body_fat_pct ?? null,
      payload.muscle_kg ?? null,
    )
    .run();
  return { ok: true, rows: 1 };
}

That's it. The Data Science agent's pandas code already knows how to SELECT * FROM body_measurements WHERE user_id = ?, it doesn't care that the source string is "myscale" instead of "whoop".

Wiring intake (webhook or poll)

Webhook-driven (the vendor pushes to you):

// cloud/src/adapters/myscale/webhook.ts
export async function handleWebhook(req: Request, env: Env) {
  if (!req.url.endsWith("/webhooks/myscale")) return null;

  const sig = req.headers.get("x-myscale-signature");
  if (!verifyHmac(sig, await req.clone().text(), env.MYSCALE_SECRET)) {
    return new Response("bad signature", { status: 401 });
  }

  const body = await req.json();
  // Resolve userId from your own mapping (you set this up at OAuth time).
  const userId = await lookupUserByMyscaleId(env, body.device_id);
  if (!userId) return new Response("ok", { status: 200 }); // unknown device, no-op

  // Enqueue for the same queue consumer the Terra path uses.
  await env.RAW_EVENTS.send({
    source: "myscale",
    user_id: userId,
    event_type: "body",
    payload: body,
  });
  return new Response("ok", { status: 200 });
}

Poll-driven (the vendor doesn't push, so you pull on a cron):

// cloud/src/adapters/myscale/poll.ts
export async function poll(env: Env, userId: string) {
  const token = await getAccessToken(env, userId);
  const since = await getLastCursor(env, userId, "myscale");

  const res = await fetch(`https://api.myscale.example/v1/measurements?since=${since}`, {
    headers: { Authorization: `Bearer ${token}` },
  });
  const { measurements, next_cursor } = await res.json();

  for (const m of measurements) {
    await env.RAW_EVENTS.send({ source: "myscale", user_id: userId, event_type: "body", payload: m });
  }
  await setLastCursor(env, userId, "myscale", next_cursor);
  return measurements.length;
}

Hook into the existing cron in cloud/src/cron.ts by iterating connected adapters and calling each one's poll on its preferred cadence (15 min for fitness, 1 hour for body, daily for labs is a reasonable default).


STEP 3, Test the adapter offline with replay fixtures

You don't want to bind your test loop to a vendor's sandbox. Capture a few real payloads as JSON fixtures and replay them against the normalizer directly.

// packages/agents/test/adapters/myscale.test.ts
import { describe, it, expect } from "vitest";
import { normalize } from "../../../cloud/src/adapters/myscale/normalize";
import { mockEnv } from "../mocks/env";
import fixture from "./fixtures/myscale-weigh-in.json";

describe("myscale adapter", () => {
  it("normalizes a weigh-in payload to body_measurements", async () => {
    const env = mockEnv();
    const result = await normalize(env, "user_test", fixture);

    expect(result.ok).toBe(true);
    expect(result.rows).toBe(1);

    const row = await env.DB.prepare(
      "select weight_kg, body_fat_percent from body_measurements where user_id = ?",
    )
      .bind("user_test")
      .first();
    expect(row?.weight_kg).toBeCloseTo(82.4);
  });
});

A useful fixture layout:

test/fixtures/
└── myscale/
    ├── weigh-in.json          ← happy path
    ├── partial.json           ← weight only, no body-fat
    ├── invalid-timestamp.json ← garbage, expect ok:false
    └── duplicate.json         ← upsert idempotency

The same fixtures double as regression evals: when you change the normalizer later, replay the fixtures and diff the resulting D1 rows. Any drift is a behavior change worth confirming.


STEP 4, Let the validator flag out-of-distribution data

Here's the subtlety. When a new source produces a value the Data Science agent has never seen at scale (a CGM dumping a value every 5 minutes; a chest-strap reporting RHR 10 bpm lower than the wrist band), the validator is what catches it.

The validator's gates are deterministic checks that run on every quantitative claim in the Fact Sheet, sample size, effect-vs-noise, bootstrap stability, tautology detection. A new source that produces unusual values won't poison the answers, because the gates will reject claims that the data doesn't support:

  • band-flag rejects claims that fall outside the user's own historical ±1 SD band.
  • recent-vs-prior rejects "your X dropped" claims if the recent window's confidence interval overlaps the prior.
  • The Critic (an adversarial LLM pass) flags claims a domain expert would push back on, "a 7% dip in this signal isn't meaningful."

So when you add a new adapter, you don't have to also teach every agent about it. The validator's job is to refuse to talk about numbers it can't defend. If the new source's data is bad, the user gets "we don't have enough data to say", not a wrong answer.

That said: you should still eyeball the Fact Sheet for the first few real turns after a new adapter lands. If you see lots of REJECTED gates, either:

  • the normalizer is bucketing data into the wrong column (look for unit mismatches, bpm vs ms, kg vs lbs), or
  • the source genuinely produces values the validator should adapt to (in which case, file an issue against packages/agents/, the validator thresholds may need tuning for the new modality).

Common mistakes

Inventing a new column on daily_summary

If your new source produces a metric no other source has (say, "hydration_score"), the temptation is to add a column to daily_summary. Don't, at least not first. Instead:

  • If it's roughly equivalent to an existing column (sleep score is sleep score, even if the vendor's algorithm differs), write to the existing column.
  • If it's genuinely new, put it in a generic side table (e.g. custom_metrics(user_id, source, datetime, name, value, unit)). Adding a column to daily_summary triggers a Data Science agent schema refresh and a Fact Sheet migration, much bigger blast radius.

Forgetting source in the primary key

Every canonical table has source in its primary key for a reason: the same user can have a Whoop and an Oura, and both will write a row for the same datetime. If you drop source from your upsert's conflict target, you'll silently overwrite the other source's row.

Skipping HMAC verification on webhooks

The Terra webhook handler at POST /webhooks/terra verifies terra-signature with HMAC-SHA256 (see Webhooks) and rejects timestamps older than 5 minutes. Your new adapter's webhook should do the same. An unverified webhook is a write path that anyone on the internet can hit.

Confusing provider and source

The provider field in the connect API is the vendor name you hand to Terra (WHOOP, OURA, POLAR). The source column on the data tables is the lowercased, normalized version (whoop, oura, polar). canonicalProvider() in cloud/src/normalize/utils.ts is the translator, use it everywhere on the way in.

Treating the adapter as the only place data validation lives

The adapter is responsible for shape, types, units, idempotency. It's not responsible for meaning, whether the value is plausible for this user at this time. That's the validator's job, and the validator only runs at turn time, not at ingest time. If you try to filter out "weird" values in the adapter, you'll silently drop real anomalies (illness, training spikes) that the validator would have handled correctly.


Where to next

On this page