The system in one diagram
┌──────────────────────────────────────┐
│ Next.js dashboard (port 3001) │
│ /briefs /workflows /agents │
│ /approvals /runs /alerts │
└───────────────┬──────────────────────┘
│ /api/dashboard/*
▼
┌─────────────────────────────────────────────┐
│ FastAPI (tiktok_army/main.py) │
│ │
│ routers/ │
│ ├── briefs POST/GET briefs │
│ ├── workflows_api runs + SSE stream │
│ ├── approvals approve / reject │
│ ├── agents_catalog /agents/catalog │
│ ├── dashboard_api aggregate views │
│ ├── webhooks tiktok / pubsub │
│ └── copilot chat-style ops │
└─────────┬───────────────────┬───────────────┘
│ │
▼ ▼
┌───────────────────────┐ ┌───────────────────────┐
│ orchestrator/ │ │ agents/ │
│ ├── definitions.py │ │ ├── base.py (BaseAgent)│
│ ├── runner.py │──▶│ ├── 15 agent classes │
│ └── events.py (SSE) │ │ └── _catalog.py │
└───────────┬───────────┘ └────┬──────────────────┘
│ │
│ each agent calls │
│ ▼ ▼ ▼ ▼
▼
┌────────────────────────┐ ┌────────────────────────┐
│ lib/ │ │ providers/ │
│ ├── claude.py │ │ ├── tiktok_research │
│ ├── mock_claude.py │ │ ├── tiktok_business │
│ ├── db.py (RLS) │ │ ├── tiktok_shop │
│ ├── studio_client.py │ │ ├── tiktok_ads │
│ ├── spend_cap.py │ │ ├── shopify_inventory│
│ ├── tiktok_publisher │ │ └── _mock_data.py │
│ ├── transcoding.py │ └────────────────────────┘
│ ├── tasks.py │
│ └── audit.py │
└─────────┬──────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ Postgres (Cloud SQL in prod, local docker in dev) │
│ │
│ tiktok_briefs ──┐ │
│ tiktok_workflows│ │
│ tiktok_workflow_runs ──┐ │
│ tiktok_workflow_steps ─┴── tiktok_agent_runs │
│ └── tiktok_agent_steps │
│ tiktok_accounts / posts / metrics / comments / ... │
│ │
│ All tables: workspace_id + RLS + FORCE │
└─────────────────────────────────────────────────────┘
│
│ Pub/Sub: audit-event, tiktok.posted, ...
▼
┌─────────────────────────────────────────────────────┐
│ BigQuery sink (audit-event subscriber) │
└─────────────────────────────────────────────────────┘Major modules
`tiktok_army/agents/`
The 15 agent classes plus BaseAgent. Every agent has a real implementation (the original content_producer.py and comment_triage.py are still the cleanest reference templates for the studio-consuming and llm-pure patterns respectively). Every agent inherits from BaseAgent (agents/base.py) which handles the lifecycle around _execute() — see Agent Contract.
agents/_catalog.py is the single source of truth for what each agent does: inputs, options, outputs, human touchpoints, which workflows include it. The dashboard's /agents/catalog page renders directly from this dict, and the brief intake form auto-generates input fields from it. All 15 agents have AgentSpec entries.
`tiktok_army/orchestrator/`
Three files:
- •
definitions.py—WorkflowDef+WorkflowStepDefdataclasses, plus the three seeded workflows (Profile Audit, Campaign Launch, Post-Launch Loop). - •
runner.py—WorkflowRunnerclass. Topo-sorts steps, resolvesinput_maps, calls agents (or the specialAPPROVAL_NODE/SYNTHESIS_NODEhandlers), persiststiktok_workflow_stepsrows, emits SSE events. - •
events.py— In-process pub/sub for workflow lifecycle events with a 200-event ring buffer per run so late SSE subscribers can catch up.
See Workflow Contract for the data model.
`tiktok_army/providers/`
One file per external API surface:
- •
tiktok_research.py— read-only TikTok research (handles, posts, trends, audience). - •
tiktok_business.py— Business account ops. - •
tiktok_shop.py— Shop catalog + listings. - •
tiktok_ads.py— Ads launch / campaign management. - •
shopify_inventory.py— pulls from Shopify when the brand is on Shopify. - •
_mock_data.py— fixture set forTIKTOK_PROVIDER_MODE=mock(the local dev default).
Providers are stubs in this repo. They short-circuit to _mock_data when settings.is_provider_mock_mode is true, and will hit real TikTok when credentials are wired.
`tiktok_army/routers/`
FastAPI routers. The ones that matter most for the dashboard:
- •
briefs.py— brief intake CRUD. - •
workflows_api.py— workflow CRUD + run + the SSE stream endpoint. This is where the live DAG updates come from. - •
approvals.py— approval gate approve/reject + pending queue. - •
agents_catalog.py—/agents/catalogreads fromagents/_catalog.py:SPECS. - •
dashboard_api.py— aggregate views (recent runs, pending counts, etc.). - •
webhooks.py— TikTok HMAC-signed webhooks + Pub/Sub OIDC push subscriptions.
`tiktok_army/lib/`
Cross-cutting concerns. The big ones:
- •
claude.py—call_claude_cached(), the only legal way to call Anthropic. Handles prompt caching, mock-mode routing, and per-call trace insertion. See Trace Pipeline. - •
mock_claude.py— fixture-based mock forCLAUDE_MODE=mock. Deterministic per-prompt. - •
_trace_context.py— contextvars (current_agent_run_id,current_workspace_id, etc.) so the Claude wrapper knows which agent run to attribute its trace row to without every call site threading the run_id explicitly. - •
db.py— async SQLAlchemy withsession_for_workspace(workspace_id)settingSET LOCAL app.workspace_idfor RLS. Mandatory for every DB op except cross-workspace admin work. - •
studio_client.py— typed Studio API client. Service-to-service auth via Google-signed ID token whosetarget_audienceis Studio's Cloud Run URL. - •
spend_cap.py— wrapsaxion_studio.lib.spend_cap.chargefor hard-stopping paid TikTok Ads launches before they exceed daily caps. - •
audit.py— Pub/Sub publish for theaudit-eventtopic (BigQuery sink) plus generic event publish. - •
tiktok_publisher.py— uploads to TikTok feed / shop with HMAC-signed requests. - •
transcoding.py— shells out toffmpegto produce 9:16 MP4s suitable for TikTok ingestion. - •
tasks.py— Cloud Tasks client wrapper (queue creation, OIDC-bearer task enqueue).
`tiktok_army/models/`
Pydantic v2 models that mirror the DB tables (the migration files in ~/projects/tiktok-army/migration/ are the source of truth — Pydantic models are convenience wrappers). Plus the Pub/Sub event payload models (TikTokPostedEvent, TikTokCommentReceivedEvent, etc.).
`dashboard/`
Next.js 14 app router. Routes:
- •
/— workspace overview. - •
/briefs— brief intake form + list. - •
/workflows— workflow definitions (read-only today, visual editor planned). - •
/workflows/run— kick off a run; redirects to/runs/[id]with the live DAG. - •
/runs— run history. - •
/runs/[id]— single run page with live SSE-powered DAG, per-step trace drilldown, and the Markdown synthesis report. - •
/agents— catalog (renders from/api/dashboard/agents/catalog). - •
/agents/[name]— single agent detail. - •
/approvals— pending approval queue.
The dashboard's API routes (dashboard/app/api/dashboard/*) are thin proxies to the FastAPI backend — the backend URL is process.env.TIKTOK_ARMY_API_URL.
Data flow: brief → run → step → trace → render
Concrete walkthrough of a Profile Audit run:
- User submits brief.
POST /api/dashboard/briefs→routers/briefs.py:create_brief()→ INSERT intotiktok_briefswith statuspending.
- User clicks Run.
POST /api/dashboard/workflows/runwith{brief_id, workflow_slug: "profile_audit"}→routers/workflows_api.py:run_workflow():
- Loads the WorkflowDef (DB-stored if present, otherwise from SEEDED_WORKFLOWS). - Resolves the brief into a brief: dict (handle, target_type, notes, brand_id). - Materializes the workflow row via _ensure_workflow_row if missing. - INSERTs tiktok_workflow_runs with status running. - Updates the brief to status dispatched. - Spawns asyncio.create_task(runner.execute()). - Returns {workflow_run_id, stream_url} immediately.
- Runner executes.
orchestrator/runner.py:WorkflowRunner.execute():
- Topo-sorts steps via Kahn's algorithm. - For each step: - Resolves inputs via _resolve_step_input (brief fields + upstream outputs). - INSERTs a tiktok_workflow_steps row (status pending). - Publishes workflow_step.started to the in-process event bus. - Calls the agent (agent_cls().run(...)) OR runs the special APPROVAL_NODE / SYNTHESIS_NODE handler. - Updates the step row with output + cost + latency. - Publishes workflow_step.succeeded / workflow_step.failed.
- Agent runs.
BaseAgent.run():
- INSERTs tiktok_agent_runs with status running. - Publishes agent_run.started to the audit-event Pub/Sub topic (BigQuery sink). - Sets contextvars: current_agent_run_id, current_workspace_id, current_agent_name. Resets the step counter. - Calls _execute(ctx). Inside, the agent calls call_claude_cached(...) and/or providers and/or DB ops. Each call_claude_cached writes a tiktok_agent_steps row with kind llm_call, full prompts, tokens, cost. Provider/skill/db writes can write rows with kind provider_call / skill_call / db_write via lib.claude.trace_step(). - On success, UPDATEs tiktok_agent_runs to succeeded, publishes agent_run.succeeded. - On failure, raises AgentError after UPDATE+publish to failed. - Resets contextvars.
- Synthesis step. Last step in the DAG.
runner.pyhandlesSYNTHESIS_NODEdirectly: builds a synthesis prompt from upstream outputs, callscall_claude_cached(is_synthesis=True, workflow_slug=...), writes the resulting Markdown totiktok_workflow_runs.report_md.
- Run finalization. UPDATE
tiktok_workflow_runswith statussucceeded(orfailed),total_cost_usd,total_latency_ms,completed_at. Publishworkflow_run.completed.
- Dashboard renders. The browser opened an SSE connection to
/api/dashboard/workflows/runs/{id}/streamafter step 2. As events arrive, the React component updates the live DAG. Whenworkflow_run.completedfires, the Markdown report is fetched viaGET /workflows/runs/{id}and rendered.
Key architectural decisions
- •Inline runner today, Cloud Tasks fan-out later. The runner runs synchronously in one async task per workflow. Acceptable for tonight's demo; production will fan steps out via Cloud Tasks (one queue message per step), keying off the same persisted state. The
tiktok_workflow_stepstable is designed for this — each step is independently restartable.
- •In-process SSE bus today, Pub/Sub later.
orchestrator/events.pyis in-process. For multi-replica Cloud Run, the same events route through Pub/Sub (already provisioned on the publish side), and the dashboard polls Postgres for terminal state.
- •Mock mode is first-class. Both
CLAUDE_MODE=mockandTIKTOK_PROVIDER_MODE=mockexist as runtime toggles, not test-only fixtures. The default in local dev is mock for both. This lets you run a full workflow end-to-end without any real API keys.
- •RLS is mandatory. Every DB op uses
session_for_workspace(workspace_id). Cross-workspace work usessession_unscoped()and is deliberately rare (currently used only in cross-tenant admin paths).
- •Trace fidelity is non-negotiable. Every Claude call writes the full system prompt + user prompt + response text + tokens + cost to
tiktok_agent_steps. The dashboard's "show me everything that happened" view needs this to actually reverse-engineer a run.