The contract in one sentence
Every agent inherits from BaseAgent (~/projects/tiktok-army/tiktok_army/agents/base.py:72), sets a name class attribute, and implements async _execute(self, ctx: AgentContext) -> AgentResult. Everything else — the tiktok_agent_runs row, audit events, contextvar setup, error handling, cost rollup — is handled by BaseAgent.run().
What `BaseAgent.run()` does for you
The public entry point is run() (base.py:82). Don't override it. It does:
- INSERT a
tiktok_agent_runsrow with statusrunning,started_at,input_jsonb, and youragent_name. (_insert_run_row,base.py:210.) - Publish
agent_run.startedto theaudit-eventPub/Sub topic (which has a BigQuery sink subscriber). - Set tracing contextvars (
current_agent_run_id,current_workspace_id,current_agent_name) and reset the per-run step counter. The Claude wrapper and provider wrappers read these to attribute theirtiktok_agent_stepsrows to your run without you threading the run_id. - Call
_execute(ctx)wrapped in a try/except. Any exception becomes anAgentError. - On success: UPDATE the run row with status
succeeded, output,cost_usd,latency_ms,model_used. Publishagent_run.succeeded. - On failure: UPDATE with status
failed, full traceback inerror_message, and publishagent_run.failed. - Reset contextvars in a
finallyblock so sibling/parent agent runs in the same task don't leak context.
You stamp results with AgentResult(output=..., cost_usd=..., model_used=...). BaseAgent.run() overwrites result.run_id with the freshly-created run's UUID before returning it, so the orchestrator can link the result back to its tiktok_agent_runs row + tiktok_agent_steps traces.
`AgentContext` and `AgentResult`
Defined in ~/projects/tiktok-army/tiktok_army/agents/base.py:42 and :54:
@dataclass
class AgentContext:
run_id: UUID
workspace_id: UUID
brand_id: UUID | None
trigger_type: AgentTriggerType
trigger_event_id: str | None
input: dict[str, Any]
@dataclass
class AgentResult:
output: dict[str, Any]
cost_usd: float = 0.0
model_used: str | None = None
run_id: UUID | None = None # populated by BaseAgent.run()ctx.input is whatever the orchestrator (or direct caller) passed in — for a workflow step, this is the merged dict of brief fields + resolved upstream outputs (see Workflow Contract).
result.output should be a flat-ish JSON-serializable dict matching the agent's AgentSpec.outputs declaration in _catalog.py. The dashboard renders it.
result.cost_usd is the sum of all call_claude_cached(...) cost_usd returns plus any other costs you incurred. You're responsible for summing them; the framework doesn't introspect.
Don't do these
- •Don't override
run(). It exists to enforce the lifecycle. If you find yourself wanting to, you probably want to add functionality to_executeor to a helper. - •Don't INSERT
tiktok_agent_runsmanually.BaseAgentowns the row. - •Don't call
AsyncAnthropicdirectly. Usecall_claude_cached(lib/claude.py:134). You'll lose prompt caching, miss the trace insert, and break the cost rollup. - •Don't open DB sessions without
session_for_workspace. RLS won't be set and queries will silently return nothing or (worse) leak cross-tenant. - •Don't put per-request data in the system prompt. The cache requires byte-identical system prompts. Per-request data goes in
user_message.
Contextvars: how tracing finds your run
~/projects/tiktok-army/tiktok_army/lib/_trace_context.py defines four contextvars:
- •
current_agent_run_id: ContextVar[UUID | None] - •
current_workspace_id: ContextVar[UUID | None] - •
current_agent_name: ContextVar[str | None] - •
_step_counter: ContextVar[int]— monotonically incremented per agent run.
BaseAgent.run() sets these at the top and resets via tokens at the bottom. The Claude wrapper reads them in _persist_llm_step_trace (lib/claude.py:263) to know which agent_run_id to attach the LLM step row to. Outside an agent run (one-off CLI use, a router that calls Claude directly), the vars are unset and the trace insert is skipped silently.
Side effect: this means any Claude call made anywhere down the call stack from _execute will be attributed to your run. If your agent calls a helper function in tiktok_army/skills/ that itself calls Claude, the trace lands on your row automatically.
next_step_idx() returns the next monotonically-increasing 0-based step index per run. The Claude wrapper bumps this for every tiktok_agent_steps insert, so steps render in chronological order in the trace UI.
The three agent patterns
The 15 agents split into three patterns. Each pattern has a reference template you should copy from when filling in skeletons.
Pattern 1: `studio_consuming` — Studio + LLM + TikTok publish
The agent briefs Studio for a generated asset, waits for human approval, transcodes, publishes to TikTok.
Reference template: ~/projects/tiktok-army/tiktok_army/agents/content_producer.py.
Flow:
- Load brand profile from DB.
- Plan content via Claude (
call_claude_cachedwith cached system prompt = brand voice). lib.spend_cap.charge(...)BEFORE Studio generation.studio_client.request_generation(...)with brief that explicitly says "no logo, no on-screen text".studio_client.wait_for_approval(...)— blocks until human approves the rendered asset.studio_client.fetch_asset_to_local(...)to get the file.lib.transcoding.to_tiktok_vertical(...)— shells out to ffmpeg for 9:16.lib.tiktok_publisher.publish(...)— posts to TikTok.- INSERT
tiktok_postsrow. - Publish
tiktok.postedPub/Sub event.
Agents using this pattern:
- •
content_producer(full template) - •
catalog_sync(skeleton — Studio for product photo regeneration when SKUs change) - •
live_stream_ops(skeleton — Studio for branded thumbnails + b-roll) - •
ad_campaign_director(skeleton — Studio for variant ad creatives)
Pattern 2: `llm_pure` — LLM only, no Studio
The agent loads context, calls Claude (cached), parses JSON, persists to DB, emits Pub/Sub events.
Reference template: ~/projects/tiktok-army/tiktok_army/agents/comment_triage.py.
Flow:
- Load inputs from DB (rows to classify, brand profile).
- Build cached system prompt (deterministic per brand).
- Per-row:
call_claude_cached(system_stable=brand_prompt, user_message=row_data, model="claude-haiku-4-5-20251001"). - Parse JSON response (with tolerance for Claude's markdown-fence quirks — see
comment_triage._parse_response). - UPDATE the row with classification results.
- Publish per-row Pub/Sub events.
Agents using this pattern:
- •
comment_triage(full template) - •
listing_optimizer - •
audience_mapper - •
creator_outreach - •
compliance - •
trend_watcher - •
shadowban_sentinel(data-monitoring at its core, with an optional Haiku synthesis call when the per-signal evidence trips the confidence threshold — see Pattern 3 note)
Pattern 3: `data_monitoring` — data computation, light or no LLM
The agent reads from providers and DB, computes scores or detects changes, writes summary outputs. May or may not call Claude — when it does, it's typically Haiku for a quick summarization at the end.
Reference: ~/projects/tiktok-army/tiktok_army/agents/account_health.py (skeleton with significant scaffolding).
Flow:
- Read posts/metrics/health signals from providers (mock or real).
- Compute scores in Python (no LLM needed — this is the cheap fast path).
- Optionally call Claude (Haiku) to produce a one-sentence diagnosis or recommended-action list.
- Return structured output. Optionally INSERT alert rows or emit a Pub/Sub event if a threshold tripped.
Agents using this pattern:
- •
account_health - •
performance_feedback - •
inventory_sync - •
ldr_compliance— pulls TikTok Shop's rolling LDR %, computes at-risk orders, dedupes alerts. Pure Python, no LLM. Mirrorsaccount_health.py's shape. - •
shadowban_sentinel— also fits here when run without the optional summary; it's listed under Pattern 2 because the synthesis hop is its distinguishing feature, but the four signal computations are all data-monitoring. - •(
catalog_syncstraddles patterns 1 and 3 depending on whether it triggers regeneration.)
The `AGENT_REGISTRY`
~/projects/tiktok-army/tiktok_army/agents/__init__.py:45 exposes:
AGENT_REGISTRY: dict[str, type[BaseAgent]] = {
AccountHealthAgent.name: AccountHealthAgent,
...
}This is the orchestrator's lookup table. When a WorkflowStepDef.agent_name is "comment_triage", the runner does AGENT_REGISTRY["comment_triage"]() to instantiate. New agents must be added here or the runner will fail with unknown agent: <name>.
The registry is also what dynamic dispatch from Cloud Tasks uses (a queue message says "run agent X with input Y" and the worker looks up the class by name).
The `AgentSpec` catalog
~/projects/tiktok-army/tiktok_army/agents/_catalog.py holds an AgentSpec for every agent. The spec describes:
- •
name,display_name,purpose— display strings. - •
pattern—studio_consuming/llm_pure/data_monitoring. - •
typical_model— default Claude model. - •
inputs,options,outputs— typedAgentFieldlists. Used by the dashboard to auto-render forms. - •
human_touchpoints—HumanTouchpointentries describing where approvals show up. - •
workflows— list of seeded workflow slugs that include this agent.
The dashboard's /agents/catalog page renders directly from SPECS. The brief intake form auto-generates input fields. Adding a new agent without a spec means it won't show up in either UI.
For now, the spec is read-only descriptive metadata — agents don't introspect their spec at runtime to validate inputs. That's a future refinement. They just use the ctx.input dict.
Adding a new agent
The full step-by-step is in Adding a New Agent. Short version: subclass BaseAgent, set name, implement _execute, register in AGENT_REGISTRY, write an AgentSpec in _catalog.py, add a fixture in mock_claude._FIXTURES, optionally add to a workflow's seed.