← Docs

Adding a New Workflow

Define a `WorkflowDef` in `orchestrator/definitions.py`, append it to `SEEDED_WORKFLOWS`, optionally add a synthesis fixture to `mock_claude._SYNTHESIS_FIXTURES`, and what to test.

Before you start

Decide:

  • Slugsnake_case identifier. This is what callers pass as workflow_slug and what mock_claude._SYNTHESIS_FIXTURES keys against.
  • Outcome — does the brief intake form need a new outcome value? Today the tiktok_brief_outcome enum has profile_audit, campaign_launch, post_launch_loop (~/projects/tiktok-army/migration/008_tiktok_briefs.py:51). Adding a new outcome means a migration. Until you do, run new workflows as ad-hoc (with handle + target_type directly, no brief_id).
  • Read-only or write-the-world? Decide your approval-gate posture. Anything that publishes / sends / spends should have an APPROVAL_NODE upstream. Read-only workflows (like Profile Audit) typically have none.
  • Final synthesis? If the workflow should produce a Markdown deliverable at the end, include a SYNTHESIS_NODE step depending on every step whose output you want considered.

Step 1: Define the `WorkflowDef`

Edit ~/projects/tiktok-army/tiktok_army/orchestrator/definitions.py. Add your workflow as a module-level constant:

MY_WORKFLOW = WorkflowDef(
    slug="my_workflow",
    name="My Workflow",
    description=(
        "What this workflow does, in 1–3 sentences. Surfaced in the dashboard's "
        "workflow list and used as context by the synthesis step."
    ),
    steps=[
        WorkflowStepDef(
            key="step_one",
            agent_name="account_health",
            label="Audit account health",
            input_map={"handle": "brief.handle"},
        ),
        WorkflowStepDef(
            key="step_two",
            agent_name="trend_watcher",
            label="Find rising trends",
            depends_on=["step_one"],
            input_map={
                "handle": "brief.handle",
                "category": "step_one.output.category",  # use upstream output
            },
            options={"region": "US", "freshness_hours": 168},
        ),
        # Optional approval before something destructive
        WorkflowStepDef(
            key="approve_thing",
            agent_name=APPROVAL_NODE,
            label="Approve trends to use",
            depends_on=["step_two"],
            target_output_field="rising_hashtags",
        ),
        # Optional final synthesis
        WorkflowStepDef(
            key="synthesize",
            agent_name=SYNTHESIS_NODE,
            label="Write the report",
            depends_on=["step_one", "step_two", "approve_thing"],
        ),
    ],
)

Constraints to mind:

  • key is unique within the workflow. It's what depends_on and input_map reference.
  • agent_name must exist in AGENT_REGISTRY (or be APPROVAL_NODE / SYNTHESIS_NODE). Otherwise the runner fails with unknown agent: <name>.
  • depends_on keys must exist. Topo sort raises ValueError if you reference an unknown step.
  • No cycles. The runner will raise ValueError("workflow DAG has a cycle") and fail the run.
  • target_output_field is required for APPROVAL_NODE. Without it the dashboard's Review card has nothing to render.
  • Steps that don't share dependencies can run in parallel — the topo sort allows it — but today's runner executes serially. Order them with that in mind for now.

Step 2: Append to `SEEDED_WORKFLOWS`

At the bottom of definitions.py:

SEEDED_WORKFLOWS: list[WorkflowDef] = [
    PROFILE_AUDIT,
    CAMPAIGN_LAUNCH,
    POST_LAUNCH_LOOP,
    MY_WORKFLOW,
]

This makes the workflow visible in GET /api/dashboard/workflows (which falls back to SEEDED_WORKFLOWS if the DB has no rows), and runnable by slug from POST /api/dashboard/workflows/run.

The first run of the workflow against a workspace will materialize a tiktok_workflows row via _ensure_workflow_row (~/projects/tiktok-army/tiktok_army/routers/workflows_api.py:366).

Step 3: Add a synthesis fixture (if using `SYNTHESIS_NODE` and running in mock mode)

Edit ~/projects/tiktok-army/tiktok_army/lib/mock_claude.py. Add an entry to _SYNTHESIS_FIXTURES keyed by your workflow's slug:

_SYNTHESIS_FIXTURES: dict[str, _MockResponse] = {
    "profile_audit": _MockResponse(text=...),
    "campaign_launch": _MockResponse(text=...),
    "post_launch_loop": _MockResponse(text=...),
    "my_workflow": _MockResponse(
        text=(
            "# My Workflow — Mock Report\n\n"
            "## Summary\n"
            "What ran, what we found.\n\n"
            "## Section A\n"
            "...\n\n"
            "## Next actions\n"
            "- Action 1\n"
            "- Action 2\n\n"
            "*Mock report — set `CLAUDE_MODE=real` to run against live Claude.*\n"
        )
    ),
}

Without this, mock mode falls back to the generic per-agent fixture for the synthesis step — which produces a {"status": "ok", "note": "..."} JSON object instead of Markdown. The dashboard's Markdown renderer will display it as a quoted JSON blob, which is ugly but not broken.

For real-mode runs, this step doesn't matter — Claude generates the synthesis Markdown live based on the upstream outputs.

Step 4: (Optional) Add a brief outcome

If you want users to be able to pick this workflow from the brief intake form, the tiktok_brief_outcome enum needs to know about your slug.

  1. Write a new alembic migration in ~/projects/tiktok-army/migration/:
def upgrade() -> None:
    op.execute("ALTER TYPE tiktok_brief_outcome ADD VALUE IF NOT EXISTS 'my_workflow'")


def downgrade() -> None:
    # Postgres doesn't support removing enum values cleanly. Document and skip.
    pass
  1. Update routers/briefs.py:BriefCreate to allow the new outcome in its regex pattern.
  1. The dashboard's brief form pulls outcomes from the API — once the backend allows it, the form will too.

Until then, you can run the workflow ad-hoc (skip the brief, supply handle + target_type directly to POST /workflows/run).

Step 5: Test it end-to-end

Two levels.

Unit-ish test of the WorkflowDef. Just verify it round-trips through JSONB and that the topo sort works:

from tiktok_army.orchestrator.definitions import MY_WORKFLOW, WorkflowDef
from tiktok_army.orchestrator.runner import _topo_sort


def test_my_workflow_topo_sorts():
    ordered = _topo_sort(MY_WORKFLOW.steps)
    assert len(ordered) == len(MY_WORKFLOW.steps)
    # Verify dependencies come before dependents
    seen: set[str] = set()
    for step in ordered:
        for dep in step.depends_on:
            assert dep in seen, f"{step.key} runs before its dep {dep}"
        seen.add(step.key)


def test_my_workflow_serializes():
    dag = MY_WORKFLOW.to_dag_jsonb()
    rebuilt = WorkflowDef.from_dag_jsonb(
        slug=MY_WORKFLOW.slug,
        name=MY_WORKFLOW.name,
        description=MY_WORKFLOW.description,
        dag=dag,
    )
    assert len(rebuilt.steps) == len(MY_WORKFLOW.steps)
    assert {s.key for s in rebuilt.steps} == {s.key for s in MY_WORKFLOW.steps}

End-to-end mock-mode run. Pattern to copy (will require a test DB or sufficient mocking — see Testing for the current state of fixtures):

from uuid import uuid4
from tiktok_army.orchestrator.runner import run_workflow_inline
from tiktok_army.orchestrator.definitions import MY_WORKFLOW


async def test_my_workflow_runs_end_to_end():
    workflow_run_id = uuid4()
    result_md = await run_workflow_inline(
        workspace_id=uuid4(),
        workflow=MY_WORKFLOW,
        brief={"handle": "lakucosmetics", "target_type": "third_party", "notes": None},
        workflow_run_id=workflow_run_id,
        brand_id=None,
    )
    # If the workflow has approval gates, run will return None and you need
    # to drive the approve flow separately.
    # If it has only a synthesis terminal step:
    if MY_WORKFLOW.steps[-1].agent_name == "synthesis":
        assert result_md is not None
        assert "# " in result_md  # has a Markdown heading

The end-to-end test exercises:

  • Topo sort
  • Input map resolution
  • Each agent's _execute against mock fixtures
  • Trace persistence
  • The synthesis step pulling the right _SYNTHESIS_FIXTURES entry

Step 6: Sanity-check via the dashboard

In dev:

# Backend
cd ~/projects/tiktok-army
uv run uvicorn tiktok_army.main:app --reload --port 8000

# Dashboard (in another terminal)
cd ~/projects/tiktok-army/dashboard
TIKTOK_ARMY_API_URL=http://localhost:8000 npm run dev

Hit http://localhost:3001. Your workflow should appear in /workflows. Trigger a run via the UI, watch the live DAG, click into each step to see the trace.

If something's off:

  • Step doesn't appear? → Check the topo sort isn't dropping it (cycle? unknown dependency?).
  • Step shows as pending forever? → It's an approval gate; check the Approvals queue.
  • Step fails? → Click into it; the error_message and trace will tell you why.

Common gotchas

  • Synthesis step has no input_map. It doesn't need one — the runner builds the synthesis prompt from the brief + every step's output that's in outputs[step.key]. But its depends_on matters: only steps it depends on (transitively) will have completed when it runs, but in practice you should depend on every step you want considered, since the runner doesn't execute downstream steps that aren't transitively reachable.
  • Cycle detection only happens at run time. The topo sort raises on the first execution. There's no static check at module load; if you write a cycle, it ships fine and explodes when someone hits Run. Add a unit test (_topo_sort(MY_WORKFLOW.steps) shouldn't raise).
  • Approval gates are leaves today. routers/approvals.py:107 documents this. If your workflow has an approval gate followed by more agent steps, the current runner won't continue after approval — re-entry is a future refinement. Either keep approval gates as leaves (with synthesis as the actual final step), or wait for the re-entry feature.
  • brief_id constraint. When users submit a brief, the outcome value must match the workflow_slug of the run. If you didn't add a brief outcome enum (Step 4), you can only run ad-hoc.
  • Don't shadow built-in slugs. If you name your slug profile_audit, you'll quietly override the seeded one. Use a unique slug.