← Docs

Workflow Contract

`WorkflowDef` + `WorkflowStepDef`, the special node types (`APPROVAL_NODE`, `SYNTHESIS_NODE`), `input_map` dotted paths, the topo sort, and where the runner persists state.

The data classes

Defined in ~/projects/tiktok-army/tiktok_army/orchestrator/definitions.py:42:

@dataclass
class WorkflowStepDef:
    key: str                    # unique within the workflow
    agent_name: str             # an entry in AGENT_REGISTRY, or APPROVAL_NODE / SYNTHESIS_NODE
    label: str                  # human-readable label for the dashboard
    depends_on: list[str]       # upstream step keys
    input_map: dict[str, str]   # field_name → dotted path
    options: dict[str, Any]     # per-step option overrides

    # Approval-only:
    target_output_field: str | None     # which upstream output to render in the Review card
    approval_required_by: str           # role hint (today: "any_workspace_member")


@dataclass
class WorkflowDef:
    slug: str
    name: str
    description: str
    steps: list[WorkflowStepDef]
    version: int = 1

WorkflowDef.to_dag_jsonb() and from_dag_jsonb() round-trip the structure into the dag_jsonb column of tiktok_workflows.

Special nodes

Two agent_name values are reserved sentinels — they don't correspond to entries in AGENT_REGISTRY. The runner handles them inline.

`APPROVAL_NODE` = `"approval_gate"`

Pauses the workflow until a human acts. When the runner hits one of these nodes (runner.py:381):

  1. UPDATE the tiktok_workflow_steps row to status pending (the enum doesn't have pending_approval yet — reusing pending for now; comment at runner.py:390).
  2. Publish workflow_step.pending_approval to the SSE bus, including the upstream output(s) so the dashboard can render the Review card.
  3. Return None from execute(). The inline runner halts. The workflow run row stays in status running.
  4. The dashboard surfaces the Review card. User clicks Approve or Reject, which hits POST /api/dashboard/approvals/{step_id}/{approve|reject} (routers/approvals.py).
  5. On approve, the step is updated to succeeded. On reject, the step is failed and the run is failed.

Today's wrinkle. Approving doesn't re-enter the runner to continue downstream steps — routers/approvals.py:107 has a comment about this. In the seeded campaign_launch workflow this is intentional: all approval gates are leaves in the DAG (their downstream is the final synthesis, which the current orchestrator runs after the last non-approval step succeeds — the synthesis runs because it depends on the approval gates, but the runner already exited). For workflows where approval gates are not leaves, you'd need to re-enter the runner with the approval as additional input, which is a future refinement.

The approval node uses target_output_field to tell the dashboard which upstream output to render in the Review card. For example, target_output_field="variants" means render the variants field of the upstream Listing Optimizer output.

`SYNTHESIS_NODE` = `"synthesis"`

Final step that takes all upstream outputs and asks Claude (Sonnet by default) to produce a Markdown deliverable. Implementation at runner.py:423:

  1. Build a synthesis user prompt from the brief + every upstream step's output (_build_synthesis_user_prompt, runner.py:163).
  2. Call call_claude_cached(is_synthesis=True, workflow_slug=...). The is_synthesis flag tells mock_claude.mock_call to look up the workflow-specific synthesis fixture in _SYNTHESIS_FIXTURES rather than the generic agent fixtures.
  3. Save the resulting Markdown to outputs[step.key] = {"report_md": ...} and to the tiktok_workflow_runs.report_md column on run finalization.

The system prompt for synthesis is fixed (runner.py:152):

_SYNTHESIS_SYSTEM = (
    "You are the synthesis step of a TikTok marketing workflow run. ..."
)

It's stable across calls so the cache hits.

`input_map` dotted paths

Each step's input_map declares how to populate the agent's input from the brief and upstream outputs. Resolved by _resolve_path (runner.py:94).

Three forms:

  • brief.<field> — value of the field from the original brief dict. Example: "brief.handle"brief["handle"].
  • <step_key>.output.<field> — value of a field on an upstream step's output. Example: "map_audience.output.segments".
  • <step_key>.output — the entire output dict of an upstream step. Useful when threading a whole structured object forward.

The output token is conventional but tolerated as omitted for brevity — "map_audience.segments" resolves the same way as "map_audience.output.segments".

The merged input dict is built by _resolve_step_input (runner.py:127):

  1. Start with a copy of the full brief dict (so common fields like handle, target_type, notes are always available).
  2. Overlay each input_map resolution.
  3. Overlay step-level options under their own keys (so an agent that reads lookback_days from ctx.input["lookback_days"] finds it whether the workflow set it via options or the brief carried it through).

The options dict on a step is for per-step tuning (e.g., {"region": "UK"} overrides the agent's default region).

The topo sort

_topo_sort at runner.py:69. Plain Kahn's algorithm:

  • Build adjacency from depends_on.
  • Initialize ready with all 0-indegree nodes.
  • Pop, append to output, decrement children's indegrees, push children whose indegree drops to 0.
  • If output length doesn't match input, there's a cycle — raise ValueError("workflow DAG has a cycle").

Steps with no depends_on and no shared dependencies can run in parallel. Today's runner doesn't actually parallelize them — it iterates through the topo order sequentially. Parallelization is a future refinement once we move to Cloud Tasks fan-out. For now, the topo sort is correct but execution is serial.

Where the runner persists state

Three tables. See Trace Pipeline for a deeper look.

`tiktok_workflows`

Workflow definitions. Versioned per workspace (the (workspace_id, slug, version) unique constraint). Seeded definitions get materialized into this table on first use via _ensure_workflow_row in routers/workflows_api.py:366.

The dag_jsonb column stores the serialized WorkflowDef.to_dag_jsonb() shape. New versions are written via POST /api/dashboard/workflows; old versions stay so old runs can reference what they actually ran.

`tiktok_workflow_runs`

One row per execution. Schema in ~/projects/tiktok-army/migration/009_tiktok_workflows.py:129. Key columns:

  • workflow_id — FK to the specific tiktok_workflows row (which version of the workflow ran).
  • brief_id — FK to the tiktok_briefs row that triggered the run (nullable for system-triggered runs).
  • statuspending / running / succeeded / failed / skipped.
  • input_jsonb — frozen copy of the brief inputs at run-start.
  • report_md — the synthesized Markdown deliverable (filled by the synthesis step).
  • total_cost_usd, total_latency_ms — rolled up from steps.

`tiktok_workflow_steps`

One row per step in a run. Schema at migration/009_tiktok_workflows.py:233. Key columns:

  • workflow_run_id — FK back to the run.
  • idx — 0-based position in the topo-sorted execution order. Unique per run.
  • agent_name — what the workflow definition said to run. May be approval_gate or synthesis for special nodes.
  • status — same enum as runs.
  • input_jsonb — what was actually fed to the agent (post-resolution).
  • output_jsonb — what the agent returned. Or for an approval node, the recorded approval/rejection.
  • agent_run_id — FK to tiktok_agent_runs. Nullable (because a workflow step row exists before the agent's row; also null for approval/synthesis nodes that don't go through BaseAgent.run()).
  • cost_usd, latency_ms — per-step.

Persistence helpers live at runner.py:198 (_insert_step_row), :233 (_update_step_row), :276 (_update_run_row).

How a run actually flows through the runner

Pseudocode-ish from runner.py:326:

async def execute(self) -> str | None:
    publish("workflow_run.started")
    ordered = _topo_sort(self.workflow.steps)

    outputs: dict[str, dict] = {}
    total_cost = 0.0
    report_md: str | None = None

    for idx, step in enumerate(ordered):
        input_data = _resolve_step_input(step, brief=self.brief, outputs=outputs)
        step_id = await _insert_step_row(idx=idx, step=step, input_data=input_data)
        publish("workflow_step.started")

        if step.agent_name == APPROVAL_NODE:
            # Persist pending, publish pending_approval, halt the runner.
            return None

        if step.agent_name == SYNTHESIS_NODE:
            user_msg = _build_synthesis_user_prompt(workflow, brief, outputs)
            result = await call_claude_cached(is_synthesis=True, workflow_slug=workflow.slug, ...)
            report_md = result.text
            outputs[step.key] = {"report_md": report_md}
            await _update_step_row(status=SUCCEEDED, ...)
            continue

        agent = AGENT_REGISTRY[step.agent_name]()
        agent_result = await agent.run(workspace_id=..., input_data=input_data)
        outputs[step.key] = dict(agent_result.output)
        total_cost += agent_result.cost_usd
        await _update_step_row(status=SUCCEEDED, output_jsonb=agent_result.output, ...)
        publish("workflow_step.succeeded")

    await _update_run_row(status=SUCCEEDED, report_md=report_md, total_cost_usd=total_cost, ...)
    publish("workflow_run.completed")
    return report_md

Failures at any step short-circuit via _fail_step + _fail_run (runner.py:561, :586).

A complete worked example

The Profile Audit definition (definitions.py:115):

PROFILE_AUDIT = WorkflowDef(
    slug="profile_audit",
    name="Profile Audit",
    description="Point at any TikTok handle and produce a Markdown audit ...",
    steps=[
        WorkflowStepDef(
            key="audit_health",
            agent_name="account_health",
            label="Audit account health",
            input_map={"handle": "brief.handle"},
        ),
        WorkflowStepDef(
            key="watch_trends",
            agent_name="trend_watcher",
            label="Identify rising trends",
            depends_on=["audit_health"],
            input_map={"handle": "brief.handle", "region": "brief.region"},
            options={"region": "US"},
        ),
        WorkflowStepDef(
            key="map_audience",
            agent_name="audience_mapper",
            label="Map audience segments",
            depends_on=["audit_health"],
            input_map={"handle": "brief.handle"},
        ),
        WorkflowStepDef(
            key="check_compliance",
            agent_name="compliance",
            label="Run compliance checks",
            depends_on=["audit_health"],
            input_map={"handle": "brief.handle"},
        ),
        WorkflowStepDef(
            key="synthesize",
            agent_name=SYNTHESIS_NODE,
            label="Synthesize audit report",
            depends_on=["watch_trends", "map_audience", "check_compliance"],
        ),
    ],
)

Executed against a brief {"handle": "lakucosmetics", "target_type": "third_party", "region": "UK"}:

  1. Topo order: audit_health → (watch_trends, map_audience, check_compliance in some order) → synthesize.
  2. audit_health runs with input {"handle": "lakucosmetics", ...brief_fields}. Output gets stored in outputs["audit_health"].
  3. watch_trends resolves: handle"lakucosmetics" from brief; region"UK" from brief (note: brief.region won the resolution because input_map is applied before options); the per-step options {"region": "US"} would be set under the region key only if input_map hadn't already populated it (setdefault, see runner.py:144).
  4. map_audience and check_compliance resolve in turn.
  5. synthesize builds a user prompt concatenating outputs["watch_trends"], outputs["map_audience"], outputs["check_compliance"] and asks Claude for the Markdown report.
  6. The Markdown is saved to tiktok_workflow_runs.report_md. Run is marked succeeded.

Adding a new workflow

Full guide in Adding a New Workflow. Short version: define a WorkflowDef in definitions.py, append it to SEEDED_WORKFLOWS, optionally add a synthesis fixture to mock_claude._SYNTHESIS_FIXTURES, and write a test that runs it end-to-end in mock mode.