Amy
Internals

Storage

Every column, every index, every key pattern. Amy uses three storage primitives, D1 (relational), R2 (blobs), KV (caches / ephemeral), plus an on-disk SQLite copy on the CLI machine. The definitive…

Every column, every index, every key pattern. Amy uses three storage primitives, D1 (relational), R2 (blobs), KV (caches / ephemeral), plus an on-disk SQLite copy on the CLI machine. The definitive source is the .sql migration files in cloud/migrations/; this page mirrors them column-for-column and adds the rationale, indexes, and query patterns.

There is no Workflow state today (TurnWorkflow hasn't shipped; see architecture.md → Migration sketch). When it does, step state will live in the Workflows runtime, covered briefly in Workflow state (future) below.


Quick navigation


D1, amy-db

Binding: DB (declared in wrangler.toml lines 9-13). Database ID: f2c1dc51-6237-46b8-a86d-d6a52b988a42. Migrations dir: cloud/migrations/.

Three migrations to date:

  • 0001_init.sql, full initial schema.
  • 0002_observability.sql, adds trace_events.
  • 0003_lab_terra_upload_id.sql, adds lab_uploads.terra_upload_id column + index.

Identity

users

create table users (
  id text primary key,                          -- Clerk subject ("user_2abc...")
  email text not null default '',
  created_at text not null default (datetime('now'))
);

Notes:

  • id is the Clerk sub claim verbatim. We never generate user IDs ourselves.
  • Lazily upserted on every authed request in middleware/clerk.ts: insert into users (id, email) values (?, ?) on conflict(id) do nothing.
  • email is best-effort, Amy JWT path doesn't carry it, so existing rows may have '' if the user originally signed in via the CLI JWT flow.

terra_connections

create table terra_connections (
  id text primary key,                        -- random hex16 / UUID
  user_id text not null references users(id) on delete cascade,
  terra_user_id text not null unique,         -- Terra's per-provider opaque id
  provider text not null,                     -- lowercased: "whoop", "oura", ...
  reference_id text not null,                 -- = user_id at widget creation
  scopes text,
  connected_at text not null default (datetime('now')),
  deactivated_at text
);
create index idx_terra_conn_user
  on terra_connections(user_id, provider)
  where deactivated_at is null;

Notes:

  • One row per (user, provider) connection. A user with both Whoop and Oura has two rows.
  • terra_user_id is UNIQUE globally, Terra assigns a fresh id every time a user re-auths the same provider (the auth_success normalizer's upsert clears deactivated_at and refreshes user_id).
  • The partial index where deactivated_at is null keeps the hot read path (/v1/me) on active rows only.

Audit / source of truth

raw_events

create table raw_events (
  id integer primary key autoincrement,
  received_at text not null default (datetime('now')),
  event_type text not null,                  -- "daily" | "sleep" | ... | "lab_report"
  terra_user_id text,
  reference_id text,                          -- = user_id for wearable; Terra's upload_id for lab_report
  provider text,
  payload text not null,                      -- raw JSON body, verbatim
  signature_verified integer not null,        -- 1 always (we reject unverified at the edge)
  processed_at text,
  process_error text,
  dedup_key text not null,                    -- sha256Hex(payload)
  unique (event_type, terra_user_id, dedup_key)
);
create index idx_raw_unprocessed
  on raw_events(processed_at)
  where processed_at is null;

Notes:

  • Every Terra webhook lands here, verbatim. This is the audit log; normalized tables are derived state. Replay of any event is just env.TERRA_EVENTS.send({ rawEventId: row.id }).
  • The UNIQUE (event_type, terra_user_id, dedup_key) makes duplicate webhook deliveries a no-op, the catch branch in routes/webhook-terra.ts detects "UNIQUE constraint" in the error message and returns 200 { ok: true, duplicate: true }.
  • The partial index idx_raw_unprocessed is the index that the cron drain SQL hits. Tiny in steady-state (~0 rows under healthy operation); only grows during outages.
  • See data-pipeline.md → State machine for the lifecycle of processed_at / process_error.

Wearable normalized tables

daily_summary

The widest table. One row per (user, source, calendar date). Most agent queries hit this.

create table daily_summary (
  user_id text not null references users(id) on delete cascade,
  source text not null,                       -- "whoop" / "oura" / etc.
  datetime text not null,                     -- YYYY-MM-DD (local wake date)
  steps real,
  sleep_minutes real,
  bed_time text,                              -- ISO 8601 UTC
  wake_up_time text,                          -- ISO 8601 UTC
  resting_heart_rate real,
  heart_rate_variability real,
  active_zone_minutes real,
  fatburn_active_zone_minutes real,
  cardio_active_zone_minutes real,
  peak_active_zone_minutes real,
  deep_sleep_minutes real,
  rem_sleep_minutes real,
  light_sleep_minutes real,
  awake_minutes real,
  deep_sleep_percent real,
  rem_sleep_percent real,
  light_sleep_percent real,
  awake_percent real,
  stress_management_score real,
  sleep_score real,
  spo2 real,
  skin_temperature real,
  respiratory_rate real,
  strain real,                                -- Whoop 0–21
  recovery_score real,                        -- Whoop 0–100
  updated_at text not null default (datetime('now')),
  primary key (user_id, source, datetime)
);
create index idx_daily_user_updated
  on daily_summary(user_id, updated_at);

Notes:

  • PK is (user_id, source, datetime), same physical day for one user on two devices produces two rows.
  • All numeric columns are real (SQLite has no integer steps, they're stored as floats; cast at read time if needed).
  • The bed_time and wake_up_time are timestamps not dates, so a cross-midnight sleep keeps the correct point-in-time information.
  • Writes use the COALESCE upsert pattern from normalize/utils.ts:buildCoalesceUpsert so a sleep event can fill HRV after a daily event already wrote steps for the same date.

activities

create table activities (
  user_id text not null references users(id) on delete cascade,
  source text not null,
  start_time text not null,                   -- ISO 8601 UTC
  end_time text,
  activity_name text,
  distance real,                              -- meters
  duration real,                              -- seconds
  elevation_gain real,                        -- meters (or proxy: floors_climbed)
  average_heart_rate real,                    -- bpm
  calories real,
  steps real,
  active_zone_minutes real,
  strain real,
  raw text,                                   -- full Terra payload as JSON string
  updated_at text not null default (datetime('now')),
  primary key (user_id, source, start_time)
);
create index idx_activities_user_updated
  on activities(user_id, updated_at);

Notes:

  • The raw column preserves the full Terra payload, downstream agents can pull out exotic fields (power_data, position_data) without re-running the normalizer.
  • duration is seconds in the cloud table but the CLI sometimes treats minutes, careful when comparing across surfaces. The normalizer also derives active_zone_minutes = Math.round(durSec / 60) so there's a consistent minute view.

sleep_sessions

create table sleep_sessions (
  user_id text not null references users(id) on delete cascade,
  source text not null,
  start_time text not null,
  end_time text not null,
  raw text not null,                          -- full Terra sleep payload (JSON)
  updated_at text not null default (datetime('now')),
  primary key (user_id, source, start_time)
);
create index idx_sleep_user_updated
  on sleep_sessions(user_id, updated_at);

Notes:

  • One row per session, not per night. Naps produce their own rows.
  • The aggregate "best-of-the-night" data also lives in daily_summary keyed to the wake date (see the dual-write in normalize/sleep.ts).

Biomarkers

biomarkers_raw

The long-form storage. One row per (user, source, draw_date, marker code).

create table biomarkers_raw (
  user_id text not null references users(id) on delete cascade,
  source text not null,                       -- 'terra_lab' | 'whoop_body' | etc.
  draw_date text not null,                    -- YYYY-MM-DD
  code text not null,                         -- Terra biomarker enum, normalised
  value real,
  unit text,                                  -- UCUM where possible
  ref_low real,
  ref_high real,
  status text,                                -- 'optimal'|'borderline'|'out_of_range'
  raw text,
  updated_at text not null default (datetime('now')),
  primary key (user_id, source, draw_date, code)
);
create index idx_biomarkers_user_updated
  on biomarkers_raw(user_id, updated_at);

Notes:

  • The Terra biomarker enum is normalised through CODE_ALIASES before insert (e.g. cholesteroltotal_cholesterol). Unmapped codes still land here at full fidelity but won't surface in biomarkers_wide.
  • source = 'terra_lab' for blood panels; wearables that emit BP/weight use <provider>_body (e.g. withings_body).

biomarkers_wide (view)

A wide projection that matches the Zod Biomarkers shape the agents expect. ~25 markers pivoted out as named columns:

create view biomarkers_wide as
select
  user_id, source, draw_date,
  max(case when code = 'total_cholesterol'  then value end) as total_cholesterol,
  max(case when code = 'hdl_cholesterol'    then value end) as hdl,
  max(case when code = 'ldl_cholesterol'    then value end) as ldl,
  max(case when code = 'triglycerides'      then value end) as triglycerides,
  max(case when code = 'glucose'            then value end) as glucose,
  max(case when code = 'hba1c'              then value end) as hba1c,
  max(case when code = 'insulin'            then value end) as insulin,
  max(case when code = 'creatinine'         then value end) as creatinine,
  max(case when code = 'sodium'             then value end) as sodium,
  max(case when code = 'potassium'          then value end) as potassium,
  max(case when code = 'alt'                then value end) as alt,
  max(case when code = 'ast'                then value end) as ast,
  max(case when code = 'crp'                then value end) as crp,
  max(case when code = 'white_blood_cell'   then value end) as white_blood_cell,
  max(case when code = 'red_blood_cell'     then value end) as red_blood_cell,
  max(case when code = 'platelet'           then value end) as platelet,
  max(case when code = 'hemoglobin'         then value end) as hemoglobin,
  max(case when code = 'hematocrit'         then value end) as hematocrit,
  max(case when code = 'mcv'                then value end) as mcv,
  max(case when code = 'total_testosterone' then value end) as total_testosterone,
  max(case when code = 'tsh'                then value end) as tsh,
  max(case when code = 'systolic_bp'        then value end) as systolic_bp,
  max(case when code = 'diastolic_bp'       then value end) as diastolic_bp,
  max(updated_at)                                           as updated_at
from biomarkers_raw
group by user_id, source, draw_date;

Querying biomarkers_wide from the CLI's /v1/sync is how the agents get a panel-shaped view without knowing the long-form storage exists.

lab_uploads

create table lab_uploads (
  id text primary key,                        -- uploadId = crypto.randomUUID()
  user_id text not null references users(id) on delete cascade,
  storage_key text not null,                  -- "lab-uploads/<userId>/<uploadId>.<ext>"
  uploaded_at text not null default (datetime('now')),
  terra_status text,                          -- 'pending' | 'submitted' | 'parsed' | 'failed:<code>'
  terra_response text,                        -- raw JSON of Terra's 202 body
  parsed_at text
);
-- migration 0003:
alter table lab_uploads add column terra_upload_id text;
create index if not exists idx_lab_uploads_terra_upload
  on lab_uploads(terra_upload_id);

Notes:

  • terra_upload_id is the id Terra returns in its 202 response and the ONLY correlator on the async lab_report webhook. Without the index on it, the normalize path would do a full scan per lab webhook.
  • lab_uploads.id is our own UUID and is what the CLI exposes; users never see terra_upload_id.

Misc

profiles

create table profiles (
  user_id text primary key references users(id) on delete cascade,
  data text not null,                         -- JSON matching the CLI's Profile zod
  updated_at text not null default (datetime('now'))
);

Holds the user's static profile (age, sex, height, conditions, goal, ...). One row per user. The schema is opaque to D1; it's the CLI side that asserts shape via Zod.

sync_watermarks

create table sync_watermarks (
  user_id text not null,
  table_name text not null,
  last_synced_at text not null,
  primary key (user_id, table_name)
);

Note from the CLI Store.applyCloudMigrations:

sync_watermarks from Phase 5 is intentionally dropped here, the Phase 9 model has no watermarks.

The table is still in D1 (it was created in 0001_init.sql) but is no longer written by either side. Sync is driven by the ?since=<ISO> query param on /v1/sync, not by a server-side watermark. Safe to drop in a future migration, but harmless to leave.

Observability

trace_events

Added in 0002_observability.sql.

create table trace_events (
  id integer primary key autoincrement,
  ts text not null default (datetime('now')),

  request_id text,            -- UUID from webhook entry or CLI request
  user_id text,
  terra_user_id text,
  raw_event_id integer,       -- pointer into raw_events when relevant

  level text not null,        -- 'info' | 'warn' | 'error'
  event text not null,        -- 'webhook_received', 'normalize_complete', ...
  message text,
  data text,                  -- JSON blob with extra context
  duration_ms integer,        -- set on Logger.start().end() pairs

  error_name text,
  error_stack text
);

create index idx_trace_request on trace_events(request_id);
create index idx_trace_user on trace_events(user_id);
create index idx_trace_ts on trace_events(ts);
create index idx_trace_event on trace_events(event);
create index idx_trace_level on trace_events(level) where level != 'info';

Notes:

Common queries with EXPLAIN

D1's query planner is SQLite's. Each example shows the index that matters.

/v1/sync daily_summary fetch:

select <…columns…> from daily_summary
 where user_id = ? and updated_at > ?
 order by datetime asc;

-- EXPLAIN QUERY PLAN:
-- SEARCH daily_summary USING INDEX idx_daily_user_updated (user_id=? AND updated_at>?)
-- USE TEMP B-TREE FOR ORDER BY

The composite index (user_id, updated_at) covers the WHERE; the ORDER BY datetime does a small in-memory sort over the matched rows (acceptable: a single user's worth of recent updates is ≤ a few hundred rows).

Cron drain query:

select id from raw_events
 where processed_at is null
   and received_at > datetime('now', '-24 hours')
   and (process_error is null or process_error not like 'skipped:%')
 order by id asc limit 50;

-- EXPLAIN QUERY PLAN:
-- SEARCH raw_events USING INDEX idx_raw_unprocessed (processed_at=NULL)
-- (then filter by received_at + process_error in the scan)

The partial index on processed_at IS NULL is exactly what makes this cron O(unprocessed-rows), not O(total-events).

Per-user activity check (admin/user/:userId):

select count(*) as n from activities where user_id = ?;

-- EXPLAIN QUERY PLAN:
-- SEARCH activities USING INDEX idx_activities_user_updated (user_id=?)

Same (user_id, updated_at) composite, updated_at part is just along for the ride but doesn't hurt.

Looking up by terra_user_id:

select user_id from terra_connections where terra_user_id = ? limit 1;

-- EXPLAIN QUERY PLAN:
-- SEARCH terra_connections USING UNIQUE INDEX (terra_user_id=?)

The unique constraint creates an index for free; lookups are O(log n).

biomarkers_wide from /v1/sync:

select <…columns…> from biomarkers_wide
 where user_id = ? and updated_at > ?
 order by draw_date asc;

A view, not a table. SQLite inlines the GROUP BY query on every call. At ≤ ~100 biomarker rows per user this is fine; at 10k+ rows it would warrant materialisation. Not a near-term problem at beta scale.

Pagination patterns

Today's /v1/sync uses no pagination, the response easily fits in one shot for 10 beta users. The intended forward pattern matches the public API contract in api-reference.md:

GET /v1/<resource>?limit=20&cursor=eyJ0IjoxNzMy...

Cursor encodes the last seen ID (or ISO timestamp) in base64. Server returns:

{ "data": [...], "next_cursor": "...", "has_more": true }

For /v1/sync specifically the "cursor" is effectively since=<ISO>, the client passes back the previous response's now field as the next since. Strict monotonicity matters: updated_at = datetime('now') is millisecond-precision in SQLite, so two writes within the same ms can share a timestamp. If we ever hit that, switch to (updated_at, id) ordering and pass both in the cursor.


R2, amy-lab-reports

Binding: LAB_REPORTS (from wrangler.toml lines 15-17).

Layout

lab-uploads/
  ├── <userId>/
  │     ├── <uploadId>.pdf
  │     ├── <uploadId>.png
  │     └── <uploadId>.jpg
  └── ...

Set by routes/labs.ts:60:

const ext = file.name.endsWith(".pdf")
  ? "pdf"
  : file.name.endsWith(".png")
    ? "png"
    : /\.jpe?g$/i.test(file.name)
      ? "jpg"
      : "pdf";
const uploadId = crypto.randomUUID();
const storageKey = `lab-uploads/${userId}/${uploadId}.${ext}`;

Metadata

KeyValue
httpMetadata.contentTypefile.type or application/pdf fallback
customMetadata.user_idthe Clerk userId
customMetadata.original_namethe upload's original filename

Lifecycle

No expiry / lifecycle rules today. Lab PDFs stay forever unless manually deleted. The /admin/user/:userId/wipe endpoint deletes the metadata row but does NOT touch R2, to fully purge a user's blobs:

# List + delete (no built-in admin route; one-off script)
for key in $(wrangler r2 object list amy-lab-reports --prefix "lab-uploads/$USERID/"); do
  wrangler r2 object delete "amy-lab-reports/$key"
done

Terra's access

Terra never reads from R2 directly. The upload flow is:

  1. Worker stores bytes in R2 (durable copy).
  2. Worker also POSTs the same bytes to https://api.tryterra.co/v2/lab-reports in the SAME request handler.
  3. Terra holds onto the bytes for its OCR pass; once parsed, it webhooks the results back. The original R2 object is the canonical archive.

Throughput

OperationRate (today)Headroom
PUT (lab upload)≤ 1/day per usertrivial
GETalmost never (admin debug only)trivial

R2's Class A pricing is $4.50 / million writes; at single-digit users this is unmeasurably small. No bucket-level lifecycle / cors / public URL is configured today.


KV, CACHE

Binding: CACHE (from wrangler.toml lines 30-32). Namespace ID: b9b96afdc02c4d42b859d9a741f4959c.

Forward-compatible, not actively used. A grep of cloud/src/ finds no env.CACHE.get/put calls. The binding exists for two upcoming uses described in architecture.md:

Planned key patterns

PatternPurposeTTL
idempotency:{key}Cache of the response body for a write request with Idempotency-Key header. Returned verbatim on retry; 409 on body mismatch.24h
stream:{turn_id}:{seq}Buffered SSE event for GET /v1/turns/:id/events polling. seq is a per-turn monotonically increasing integer.1h after turn completion
cache:{userId}:meCached /v1/me reconcile result (optional, to skip the Terra listSubscriptions round-trip on every call).60s
cache:terra:subscriptionsWorker-global cache of Terra's subscriptions list (used by the reconcile path in /v1/me).30s

Eviction

KV's actual behaviour: values are evicted after their declared TTL, with eventual consistency on reads (a write may not be visible for ~1s in another region). This is fine for the planned uses, idempotency and stream replay are both read-after-write in the same region.

The TTL is set per put() via { expirationTtl: <seconds> }. Minimum TTL is 60s; for cache:terra:subscriptions (30s) we'd want to either bump it to 60s or use Worker memory cache for that one.


Local SQLite (CLI side)

The CLI's Data Science Agent reads its own SQLite, NOT D1 directly. The file lives at data/local/persona.sqlite on the user's machine and is populated by amy sync (which calls GET /v1/sync and replays into a fresh local DB).

Schema definition: src/data/store.ts lines 27-117. Three tables:

TableColumnsSource
summaryAll daily_summary columns (subset of the cloud's set), keyed by datetime only, single-source per local DB.daily_summary rows from GET /v1/sync
activitiesSubset of cloud activities columns, PK (start_time, activity_name).activities rows from /v1/sync
populationReference percentiles (age × sex × percentile → typical values). Static, shipped with the repo.Bundled JSON

This is read-only as far as the agent is concerned. amy sync re-pulls and overwrites; amy reset deletes the file entirely.

Why a separate local SQLite instead of querying D1 from the agent?

  • The DS Agent runs Python pandas (pd.read_sql_query over sqlite3) in a sandbox, local SQLite is the obvious fit.
  • Avoids round-tripping every agent query through HTTP + D1 + JSON.
  • Keeps raw health data on the user's machine. The cloud only brokers sync.

Composite features (computed at read time)

_attach_composites(summary_df) in src/agents/data-science/sandbox.ts adds derived columns to the in-memory DataFrame before the agent's code runs:

  • cardio_fitness_index = steps / resting_heart_rate
  • hrv_rhr_ratio = heart_rate_variability / resting_heart_rate
  • Rolling 30-day specs: sleep_duration_sd_30d, sleep_duration_cv_30d, bedtime_hour_sd_30d, wake_hour_mean_30d, rhr_sd_30d, rhr_mean_30d, steps_mean_30d, steps_cv_30d.

These are not stored anywhere, they're computed deterministically on every sandbox run so the agent can reach for them by name in pandas.


Workflow state (future)

When TurnWorkflow lands (see architecture.md → TurnWorkflow), step state will live in the Cloudflare Workflows runtime, not in D1. Each step.do(...) persists its output to managed storage; replay on a crash restarts at the failed step with all prior step outputs intact.

To inspect: Cloudflare dashboard → Workflows → TurnWorkflow → Instances. Each instance shows per-step input/output, errors, and sleep/wait state. There's no D1-side schema to design.

Today this section is forward-looking. None of cloud/src/ references Workflow types yet.


Migration story

Adding a column

D1 migrations are SQL files in cloud/migrations/ prefixed with a 4-digit ordinal. Example: 0003_lab_terra_upload_id.sql.

-- cloud/migrations/0004_my_new_column.sql
alter table daily_summary add column new_metric real;
create index if not exists idx_daily_new_metric on daily_summary(new_metric)
  where new_metric is not null;

Apply:

cd cloud
wrangler d1 migrations apply amy-db --local   # local dev DB
wrangler d1 migrations apply amy-db --remote  # production

Or use the package script: bun run db:migrate (remote) / bun run db:migrate:local.

Then update:

  1. cloud/src/schema.ts Zod (and src/data/schema.ts for the CLI side , see the duplication note in architecture.md → Migration sketch).
  2. The relevant cloud/src/normalize/*.ts to write the column.
  3. cloud/src/routes/sync.ts to include it in the select.
  4. The CLI's src/data/store.ts INSERT INTO summary to include the new column (with a backfill ALTER TABLE in applyCloudMigrations for existing local DBs).

Running a backfill on an existing column

Three options, depending on what you need:

  1. Recompute from raw_events, re-enqueue all of a user's events:

    wrangler d1 execute amy-db --remote \
      --command "update raw_events set processed_at = null
                  where reference_id = '<userId>'
                    and event_type = 'daily'"
    curl -X POST -H "x-admin-key: $AMY_ADMIN_KEY" \
      https://amy.heyamy.xyz/admin/drain

    The COALESCE upsert is idempotent, re-running normalize will fill the new column without disturbing other fields.

  2. Backfill from Terra (server-side), call POST /v1/import { days: 365 } for the affected user. Terra streams everything back, which re-enters the normal ingest pipeline.

  3. SQL-only backfill for derived data:

    wrangler d1 execute amy-db --remote --command \
      "update daily_summary set new_metric = compute_from_other_columns(...)"

    Watch the row-count limit; D1 may need a batch + LIMIT pattern.

Deprecating a field

Cloudflare D1 does NOT support ALTER TABLE DROP COLUMN reliably in all schemas. The pattern is:

  1. Stop writing to the column (remove from normalize SQL).
  2. Stop reading from the column (remove from /v1/sync select; update the Zod schema; bump SDK).
  3. Leave the column in place, D1 storage is cheap; column rename / drop risks corrupting the DB on rollback.

If you absolutely must remove it: create a new table, INSERT ... SELECT the surviving columns, drop the old, rename. Test on a local DB first.

Local-vs-remote D1 drift

wrangler dev uses a local SQLite file under .wrangler/state/v3/d1/<dbid>/ (gitignored). It applies its own migrations history independently from remote. To wipe and start fresh:

rm -rf .wrangler/state/v3/d1
wrangler d1 migrations apply amy-db --local

trace_events rotation

The observability table grows unbounded. There's no automatic rotation today. When it matters:

-- Drop everything older than 30 days
delete from trace_events where ts < datetime('now', '-30 days');

Stick this in a new cron tick (e.g. 30 3 * * * daily at 3:30 AM) when needed. Won't fragment the file long-term, D1 reclaims space on VACUUM (run via wrangler d1 execute amy-db --remote --command "VACUUM" once per quarter or as needed).


Where to next

On this page