The data classes
Defined in ~/projects/tiktok-army/tiktok_army/orchestrator/definitions.py:42:
@dataclass
class WorkflowStepDef:
key: str # unique within the workflow
agent_name: str # an entry in AGENT_REGISTRY, or APPROVAL_NODE / SYNTHESIS_NODE
label: str # human-readable label for the dashboard
depends_on: list[str] # upstream step keys
input_map: dict[str, str] # field_name → dotted path
options: dict[str, Any] # per-step option overrides
# Approval-only:
target_output_field: str | None # which upstream output to render in the Review card
approval_required_by: str # role hint (today: "any_workspace_member")
@dataclass
class WorkflowDef:
slug: str
name: str
description: str
steps: list[WorkflowStepDef]
version: int = 1WorkflowDef.to_dag_jsonb() and from_dag_jsonb() round-trip the structure into the dag_jsonb column of tiktok_workflows.
Special nodes
Two agent_name values are reserved sentinels — they don't correspond to entries in AGENT_REGISTRY. The runner handles them inline.
`APPROVAL_NODE` = `"approval_gate"`
Pauses the workflow until a human acts. When the runner hits one of these nodes (runner.py:381):
- UPDATE the
tiktok_workflow_stepsrow to statuspending(the enum doesn't havepending_approvalyet — reusingpendingfor now; comment atrunner.py:390). - Publish
workflow_step.pending_approvalto the SSE bus, including the upstream output(s) so the dashboard can render the Review card. - Return
Nonefromexecute(). The inline runner halts. The workflow run row stays in statusrunning. - The dashboard surfaces the Review card. User clicks Approve or Reject, which hits
POST /api/dashboard/approvals/{step_id}/{approve|reject}(routers/approvals.py). - On approve, the step is updated to
succeeded. On reject, the step isfailedand the run isfailed.
Today's wrinkle. Approving doesn't re-enter the runner to continue downstream steps — routers/approvals.py:107 has a comment about this. In the seeded campaign_launch workflow this is intentional: all approval gates are leaves in the DAG (their downstream is the final synthesis, which the current orchestrator runs after the last non-approval step succeeds — the synthesis runs because it depends on the approval gates, but the runner already exited). For workflows where approval gates are not leaves, you'd need to re-enter the runner with the approval as additional input, which is a future refinement.
The approval node uses target_output_field to tell the dashboard which upstream output to render in the Review card. For example, target_output_field="variants" means render the variants field of the upstream Listing Optimizer output.
`SYNTHESIS_NODE` = `"synthesis"`
Final step that takes all upstream outputs and asks Claude (Sonnet by default) to produce a Markdown deliverable. Implementation at runner.py:423:
- Build a synthesis user prompt from the brief + every upstream step's output (
_build_synthesis_user_prompt,runner.py:163). - Call
call_claude_cached(is_synthesis=True, workflow_slug=...). Theis_synthesisflag tellsmock_claude.mock_callto look up the workflow-specific synthesis fixture in_SYNTHESIS_FIXTURESrather than the generic agent fixtures. - Save the resulting Markdown to
outputs[step.key] = {"report_md": ...}and to thetiktok_workflow_runs.report_mdcolumn on run finalization.
The system prompt for synthesis is fixed (runner.py:152):
_SYNTHESIS_SYSTEM = (
"You are the synthesis step of a TikTok marketing workflow run. ..."
)It's stable across calls so the cache hits.
`input_map` dotted paths
Each step's input_map declares how to populate the agent's input from the brief and upstream outputs. Resolved by _resolve_path (runner.py:94).
Three forms:
- •
brief.<field>— value of the field from the original brief dict. Example:"brief.handle"→brief["handle"]. - •
<step_key>.output.<field>— value of a field on an upstream step's output. Example:"map_audience.output.segments". - •
<step_key>.output— the entire output dict of an upstream step. Useful when threading a whole structured object forward.
The output token is conventional but tolerated as omitted for brevity — "map_audience.segments" resolves the same way as "map_audience.output.segments".
The merged input dict is built by _resolve_step_input (runner.py:127):
- Start with a copy of the full
briefdict (so common fields likehandle,target_type,notesare always available). - Overlay each
input_mapresolution. - Overlay step-level
optionsunder their own keys (so an agent that readslookback_daysfromctx.input["lookback_days"]finds it whether the workflow set it viaoptionsor the brief carried it through).
The options dict on a step is for per-step tuning (e.g., {"region": "UK"} overrides the agent's default region).
The topo sort
_topo_sort at runner.py:69. Plain Kahn's algorithm:
- •Build adjacency from
depends_on. - •Initialize
readywith all 0-indegree nodes. - •Pop, append to output, decrement children's indegrees, push children whose indegree drops to 0.
- •If output length doesn't match input, there's a cycle — raise
ValueError("workflow DAG has a cycle").
Steps with no depends_on and no shared dependencies can run in parallel. Today's runner doesn't actually parallelize them — it iterates through the topo order sequentially. Parallelization is a future refinement once we move to Cloud Tasks fan-out. For now, the topo sort is correct but execution is serial.
Where the runner persists state
Three tables. See Trace Pipeline for a deeper look.
`tiktok_workflows`
Workflow definitions. Versioned per workspace (the (workspace_id, slug, version) unique constraint). Seeded definitions get materialized into this table on first use via _ensure_workflow_row in routers/workflows_api.py:366.
The dag_jsonb column stores the serialized WorkflowDef.to_dag_jsonb() shape. New versions are written via POST /api/dashboard/workflows; old versions stay so old runs can reference what they actually ran.
`tiktok_workflow_runs`
One row per execution. Schema in ~/projects/tiktok-army/migration/009_tiktok_workflows.py:129. Key columns:
- •
workflow_id— FK to the specifictiktok_workflowsrow (which version of the workflow ran). - •
brief_id— FK to thetiktok_briefsrow that triggered the run (nullable for system-triggered runs). - •
status—pending/running/succeeded/failed/skipped. - •
input_jsonb— frozen copy of the brief inputs at run-start. - •
report_md— the synthesized Markdown deliverable (filled by the synthesis step). - •
total_cost_usd,total_latency_ms— rolled up from steps.
`tiktok_workflow_steps`
One row per step in a run. Schema at migration/009_tiktok_workflows.py:233. Key columns:
- •
workflow_run_id— FK back to the run. - •
idx— 0-based position in the topo-sorted execution order. Unique per run. - •
agent_name— what the workflow definition said to run. May beapproval_gateorsynthesisfor special nodes. - •
status— same enum as runs. - •
input_jsonb— what was actually fed to the agent (post-resolution). - •
output_jsonb— what the agent returned. Or for an approval node, the recorded approval/rejection. - •
agent_run_id— FK totiktok_agent_runs. Nullable (because a workflow step row exists before the agent's row; also null for approval/synthesis nodes that don't go throughBaseAgent.run()). - •
cost_usd,latency_ms— per-step.
Persistence helpers live at runner.py:198 (_insert_step_row), :233 (_update_step_row), :276 (_update_run_row).
How a run actually flows through the runner
Pseudocode-ish from runner.py:326:
async def execute(self) -> str | None:
publish("workflow_run.started")
ordered = _topo_sort(self.workflow.steps)
outputs: dict[str, dict] = {}
total_cost = 0.0
report_md: str | None = None
for idx, step in enumerate(ordered):
input_data = _resolve_step_input(step, brief=self.brief, outputs=outputs)
step_id = await _insert_step_row(idx=idx, step=step, input_data=input_data)
publish("workflow_step.started")
if step.agent_name == APPROVAL_NODE:
# Persist pending, publish pending_approval, halt the runner.
return None
if step.agent_name == SYNTHESIS_NODE:
user_msg = _build_synthesis_user_prompt(workflow, brief, outputs)
result = await call_claude_cached(is_synthesis=True, workflow_slug=workflow.slug, ...)
report_md = result.text
outputs[step.key] = {"report_md": report_md}
await _update_step_row(status=SUCCEEDED, ...)
continue
agent = AGENT_REGISTRY[step.agent_name]()
agent_result = await agent.run(workspace_id=..., input_data=input_data)
outputs[step.key] = dict(agent_result.output)
total_cost += agent_result.cost_usd
await _update_step_row(status=SUCCEEDED, output_jsonb=agent_result.output, ...)
publish("workflow_step.succeeded")
await _update_run_row(status=SUCCEEDED, report_md=report_md, total_cost_usd=total_cost, ...)
publish("workflow_run.completed")
return report_mdFailures at any step short-circuit via _fail_step + _fail_run (runner.py:561, :586).
A complete worked example
The Profile Audit definition (definitions.py:115):
PROFILE_AUDIT = WorkflowDef(
slug="profile_audit",
name="Profile Audit",
description="Point at any TikTok handle and produce a Markdown audit ...",
steps=[
WorkflowStepDef(
key="audit_health",
agent_name="account_health",
label="Audit account health",
input_map={"handle": "brief.handle"},
),
WorkflowStepDef(
key="watch_trends",
agent_name="trend_watcher",
label="Identify rising trends",
depends_on=["audit_health"],
input_map={"handle": "brief.handle", "region": "brief.region"},
options={"region": "US"},
),
WorkflowStepDef(
key="map_audience",
agent_name="audience_mapper",
label="Map audience segments",
depends_on=["audit_health"],
input_map={"handle": "brief.handle"},
),
WorkflowStepDef(
key="check_compliance",
agent_name="compliance",
label="Run compliance checks",
depends_on=["audit_health"],
input_map={"handle": "brief.handle"},
),
WorkflowStepDef(
key="synthesize",
agent_name=SYNTHESIS_NODE,
label="Synthesize audit report",
depends_on=["watch_trends", "map_audience", "check_compliance"],
),
],
)Executed against a brief {"handle": "lakucosmetics", "target_type": "third_party", "region": "UK"}:
- Topo order:
audit_health→ (watch_trends,map_audience,check_compliancein some order) →synthesize. audit_healthruns with input{"handle": "lakucosmetics", ...brief_fields}. Output gets stored inoutputs["audit_health"].watch_trendsresolves:handle←"lakucosmetics"from brief;region←"UK"from brief (note:brief.regionwon the resolution becauseinput_mapis applied beforeoptions); the per-stepoptions{"region": "US"}would be set under theregionkey only ifinput_maphadn't already populated it (setdefault, seerunner.py:144).map_audienceandcheck_complianceresolve in turn.synthesizebuilds a user prompt concatenatingoutputs["watch_trends"],outputs["map_audience"],outputs["check_compliance"]and asks Claude for the Markdown report.- The Markdown is saved to
tiktok_workflow_runs.report_md. Run is markedsucceeded.
Adding a new workflow
Full guide in Adding a New Workflow. Short version: define a WorkflowDef in definitions.py, append it to SEEDED_WORKFLOWS, optionally add a synthesis fixture to mock_claude._SYNTHESIS_FIXTURES, and write a test that runs it end-to-end in mock mode.