← Docs

Trace Pipeline

The relationship between `tiktok_agent_runs`, `tiktok_agent_steps`, and `tiktok_workflow_steps`; what each captures; how `lib.claude.call_claude_cached` writes a trace row; how the SSE stream works; mock vs real mode.

Three tables, one drilldown

The dashboard's "show me everything that happened" view depends on three tables. They form a nested hierarchy.

tiktok_workflow_runs
   ↓ (1:N) workflow_run_id
tiktok_workflow_steps                  ← one row per workflow step (incl. approval/synthesis nodes)
   ↓ (1:1, nullable) agent_run_id
tiktok_agent_runs                      ← one row per BaseAgent.run() invocation
   ↓ (1:N) agent_run_id
tiktok_agent_steps                     ← one row per Claude call / provider call / tool call / db write

The dashboard drills: workflow run → step → agent run → agent steps. From a top-level run, you can click any step, see which agent ran, and see every Claude call (with full prompts and responses) that happened inside it.

`tiktok_agent_runs`

One row per BaseAgent.run() call. Created from migration 003. Captures:

  • id, workspace_id, brand_id
  • agent_name, trigger_type (manual / webhook / schedule / api)
  • status (pending / running / succeeded / failed / skipped)
  • input_jsonb, output_jsonb
  • started_at, completed_at
  • error_message, cost_usd, latency_ms, model_used

Inserted by BaseAgent._insert_run_row (base.py:210); updated by BaseAgent._update_run_row (base.py:237). The row is created before _execute runs (status running) and updated after (status succeeded or failed).

`tiktok_workflow_steps`

One row per step in a workflow run. Schema at ~/projects/tiktok-army/migration/009_tiktok_workflows.py:233. Per step:

  • workflow_run_id (FK to tiktok_workflow_runs)
  • idx — 0-based execution position. Unique per run.
  • agent_name, label, status
  • input_jsonb, output_jsonb — what was fed in, what came out.
  • agent_run_id (FK to tiktok_agent_runs, nullable) — populated only when the step ran a real agent. Approval and synthesis nodes don't go through BaseAgent.run(), so their agent_run_id is NULL.
  • cost_usd, latency_ms

Inserted by runner._insert_step_row (runner.py:198) before the agent runs; updated after.

The agent_run_id linkage is what lets the dashboard say "this workflow step → this agent run → these Claude calls."

`tiktok_agent_steps`

The fine-grained trace. One row per "interesting thing the agent did" inside a single tiktok_agent_runs row. Schema at ~/projects/tiktok-army/migration/010_tiktok_agent_steps.py:66. Five kind values:

KindWhat it is
llm_callA Claude API request — system + user + response, tokens, cost
tool_callAn Anthropic tool-use turn (name, input, output)
provider_callAn outbound HTTP to a TikTok provider (research / business / shop / ads). Stores request payload + response.
skill_callInvocation of a skill helper from tiktok_army/skills/
db_writeA notable write to a domain table (tiktok_posts insert, comment classification, alert raise)

Per row:

  • agent_run_id (FK), idx (0-based, unique per agent run, monotonically increasing)
  • kind, label
  • For llm_call: model, system_prompt, user_prompt, response_text, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, cost_usd
  • For non-LLM: input_jsonb, output_jsonb
  • started_at, completed_at, latency_ms, error_message

Full prompts are stored unredacted. This is a deliberate choice — the dashboard's reverse-engineering view shows exactly what was sent to Claude. RLS policies + workspace scoping make cross-tenant leakage impossible.

How `call_claude_cached` writes a trace row

Implementation in ~/projects/tiktok-army/tiktok_army/lib/claude.py:134. The relevant flow:

  1. Pick the wrapper path. If settings.is_mock_mode, route to mock_claude.mock_call. Otherwise call AsyncAnthropic.messages.create with cache_control: ephemeral on the system prompt.
  2. Build a ClaudeResult with text + tokens + cost.
  3. Persist the trace via _persist_llm_step_trace (claude.py:263):

- Read the contextvars current_agent_run_id and current_workspace_id. - If either is None, skip — we're not inside an agent run. - Otherwise, get next_step_idx() (the monotonically increasing per-run index from _trace_context.py). - INSERT into tiktok_agent_steps with kind='llm_call', full system + user + response, all token counts, computed cost_usd, latencies.

  1. Log a structured claude.call event for ops visibility (mode, latency, tokens, cost).

Errors during the trace insert are caught and logged but never raised. Instrumentation must not break the agent (claude.py:331).

For non-LLM steps, agents call lib.claude.trace_step(kind="provider_call", label=..., started_at=..., completed_at=..., input_jsonb=..., output_jsonb=...) — same no-op semantics outside an agent run, same RLS-scoped insert inside one (claude.py:338).

How the SSE stream works

The runner publishes events to an in-process event bus (orchestrator/events.py). The dashboard subscribes to that bus over Server-Sent Events.

Event types

Published by WorkflowRunner and friends:

  • workflow_run.started — at the top of execute().
  • workflow_step.started — when a step row is inserted.
  • workflow_step.succeeded — on agent or synthesis success.
  • workflow_step.failed — on agent or synthesis exception.
  • workflow_step.pending_approval — when an APPROVAL_NODE is hit.
  • workflow_step.skipped — reserved; not used by today's runner.
  • workflow_run.completed — when the runner finishes successfully.
  • workflow_run.failed — on any unrecovered error.
  • workflow_run.report_ready — emitted alongside completed when report_md is non-null.

Plus, after approve/reject:

  • workflow_step.succeeded from the approve handler.
  • workflow_run.failed from the reject handler.

The bus

_EventBus at events.py:57. Per-run subscriber lists in a dict[UUID, list[Queue]]. Plus a 200-event-per-run ring buffer (_buffer) so a late-attaching SSE subscriber can replay missed events on connect.

publish(event) appends to the buffer and pushes to every subscriber's queue.

subscribe(run_id) returns an async iterator: first yields buffered events, then yields live ones, exits when a terminal event (workflow_run.completed or workflow_run.failed) is consumed.

The HTTP surface

GET /api/dashboard/workflows/runs/{id}/stream (routers/workflows_api.py:555):

async def stream_run(run_id: UUID, ...) -> StreamingResponse:
    async def gen():
        async for event in subscribe(run_id):
            yield event.to_sse()

    return StreamingResponse(
        gen(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

event.to_sse() formats as event: <type>\ndata: {<json>}\n\n — standard SSE.

Why in-process today

For local dev and a single Cloud Run replica, in-process is fine. For multi-replica production, two replicas serving the dashboard might subscribe to a run while the runner is on a third replica — the bus is per-process so the SSE consumer wouldn't see anything.

The plan is to route the same events through Pub/Sub (already provisioned for tiktok.posted etc.) and have the dashboard's API poll Postgres for terminal state. The migration is straightforward because the current event types are already shape-stable.

Mock vs real mode

Two independent toggles in ~/projects/tiktok-army/tiktok_army/config.py:

  • CLAUDE_MODEmock (default) routes Claude calls through lib.mock_claude. real calls Anthropic.
  • TIKTOK_PROVIDER_MODEmock (default) routes provider calls through _mock_data. real (when implemented) hits TikTok APIs.

Mock mode for Claude

lib/mock_claude.py returns deterministic responses keyed by agent name + a hash of the user prompt:

  • _FIXTURES: dict[str, list[_MockResponse]] — per-agent canned JSON responses. The hash makes the same input always produce the same output (replay-friendly), but different prompts within an agent cycle through whatever variants are seeded.
  • _SYNTHESIS_FIXTURES: dict[str, _MockResponse] — per-workflow-slug Markdown synthesis responses (profile_audit, campaign_launch, post_launch_loop).
  • _DEFAULT_FIXTURE — fallback for agents without a fixture (returns a generic {"status": "ok", "note": "mock response — no fixture for this agent yet"}).

Latency is simulated as 80–600ms per call (mock_claude.py:459) so the live DAG view shows visible progression.

The mock returns a dict that the wrapper builds into a ClaudeResult with the same shape as a real call. Token counts are estimated from string lengths (4 chars ≈ 1 token).

Costs are simulated. The cost_usd rollup uses _PRICING and the estimated tokens, so you'll see realistic-looking dollar figures, but no money is spent.

Mock mode for providers

providers/_mock_data.py provides fixture data for two seeded handles (@lakucosmetics for own-brand demos, @competitor_cosmetics for third-party audit demos), with deterministic generated fallbacks for any other handle (hash-of-handle as RNG seed).

Providers check settings.is_provider_mock_mode and short-circuit. Real providers hit TikTok APIs (not yet implemented; we don't have credentials yet).

Why determinism matters

Two reasons.

  1. Replay tests. A workflow run in mock mode produces the same trace on every execution given the same inputs, so test assertions on output structure are stable.
  2. Reverse-engineering the dashboard. The dashboard's drilldown view shows exactly what each agent saw. If the same handle produced different mock data on two consecutive audits, the drilldown would be confusing.

How to read a trace

In the dashboard, navigate from a run page to an agent step → "View trace" tab. You'll see:

  • The list of tiktok_agent_steps rows for this agent_run_id, ordered by idx.
  • Each row shows kind, label, latency, cost.
  • Clicking an llm_call row expands it to show the full system prompt, user prompt, and response text. Tokens and cost breakdown are visible in a side panel.
  • Clicking a provider_call row shows the request payload (input_jsonb) and response payload (output_jsonb).

The same data is available via API at GET /api/dashboard/agent-runs/{agent_run_id}/steps (routers/workflows_api.py:581).

Things to watch out for

  • Don't bypass call_claude_cached. Direct AsyncAnthropic calls won't write a trace row, won't apply prompt caching, and won't roll up into the cost total. The wrapper exists for these three reasons; using it is non-negotiable.
  • Prompt cache requires byte-identical system prompts. Putting a UUID, timestamp, or per-request data in the system prompt invalidates the cache. The wrapper enforces the discipline by name (system_stable argument) but doesn't programmatically verify it. Be careful in templating.
  • Trace inserts can fail silently. They're caught + logged + swallowed in _persist_llm_step_trace and trace_step. If you notice missing trace rows in the dashboard, check the logs for claude.trace.persist_failed or trace_step.persist_failed.
  • Workflow events buffer is capped at 200 per run. If a workflow somehow emits more than 200 events (it shouldn't — typical runs emit 5–20), late-attaching subscribers will miss the earliest ones.