Before you start
Decide:
- •Slug —
snake_caseidentifier. This is what callers pass asworkflow_slugand whatmock_claude._SYNTHESIS_FIXTURESkeys against. - •Outcome — does the brief intake form need a new outcome value? Today the
tiktok_brief_outcomeenum hasprofile_audit,campaign_launch,post_launch_loop(~/projects/tiktok-army/migration/008_tiktok_briefs.py:51). Adding a new outcome means a migration. Until you do, run new workflows as ad-hoc (withhandle+target_typedirectly, nobrief_id). - •Read-only or write-the-world? Decide your approval-gate posture. Anything that publishes / sends / spends should have an
APPROVAL_NODEupstream. Read-only workflows (like Profile Audit) typically have none. - •Final synthesis? If the workflow should produce a Markdown deliverable at the end, include a
SYNTHESIS_NODEstep depending on every step whose output you want considered.
Step 1: Define the `WorkflowDef`
Edit ~/projects/tiktok-army/tiktok_army/orchestrator/definitions.py. Add your workflow as a module-level constant:
MY_WORKFLOW = WorkflowDef(
slug="my_workflow",
name="My Workflow",
description=(
"What this workflow does, in 1–3 sentences. Surfaced in the dashboard's "
"workflow list and used as context by the synthesis step."
),
steps=[
WorkflowStepDef(
key="step_one",
agent_name="account_health",
label="Audit account health",
input_map={"handle": "brief.handle"},
),
WorkflowStepDef(
key="step_two",
agent_name="trend_watcher",
label="Find rising trends",
depends_on=["step_one"],
input_map={
"handle": "brief.handle",
"category": "step_one.output.category", # use upstream output
},
options={"region": "US", "freshness_hours": 168},
),
# Optional approval before something destructive
WorkflowStepDef(
key="approve_thing",
agent_name=APPROVAL_NODE,
label="Approve trends to use",
depends_on=["step_two"],
target_output_field="rising_hashtags",
),
# Optional final synthesis
WorkflowStepDef(
key="synthesize",
agent_name=SYNTHESIS_NODE,
label="Write the report",
depends_on=["step_one", "step_two", "approve_thing"],
),
],
)Constraints to mind:
- •
keyis unique within the workflow. It's whatdepends_onandinput_mapreference. - •
agent_namemust exist inAGENT_REGISTRY(or beAPPROVAL_NODE/SYNTHESIS_NODE). Otherwise the runner fails withunknown agent: <name>. - •
depends_onkeys must exist. Topo sort raisesValueErrorif you reference an unknown step. - •No cycles. The runner will raise
ValueError("workflow DAG has a cycle")and fail the run. - •
target_output_fieldis required forAPPROVAL_NODE. Without it the dashboard's Review card has nothing to render. - •Steps that don't share dependencies can run in parallel — the topo sort allows it — but today's runner executes serially. Order them with that in mind for now.
Step 2: Append to `SEEDED_WORKFLOWS`
At the bottom of definitions.py:
SEEDED_WORKFLOWS: list[WorkflowDef] = [
PROFILE_AUDIT,
CAMPAIGN_LAUNCH,
POST_LAUNCH_LOOP,
MY_WORKFLOW,
]This makes the workflow visible in GET /api/dashboard/workflows (which falls back to SEEDED_WORKFLOWS if the DB has no rows), and runnable by slug from POST /api/dashboard/workflows/run.
The first run of the workflow against a workspace will materialize a tiktok_workflows row via _ensure_workflow_row (~/projects/tiktok-army/tiktok_army/routers/workflows_api.py:366).
Step 3: Add a synthesis fixture (if using `SYNTHESIS_NODE` and running in mock mode)
Edit ~/projects/tiktok-army/tiktok_army/lib/mock_claude.py. Add an entry to _SYNTHESIS_FIXTURES keyed by your workflow's slug:
_SYNTHESIS_FIXTURES: dict[str, _MockResponse] = {
"profile_audit": _MockResponse(text=...),
"campaign_launch": _MockResponse(text=...),
"post_launch_loop": _MockResponse(text=...),
"my_workflow": _MockResponse(
text=(
"# My Workflow — Mock Report\n\n"
"## Summary\n"
"What ran, what we found.\n\n"
"## Section A\n"
"...\n\n"
"## Next actions\n"
"- Action 1\n"
"- Action 2\n\n"
"*Mock report — set `CLAUDE_MODE=real` to run against live Claude.*\n"
)
),
}Without this, mock mode falls back to the generic per-agent fixture for the synthesis step — which produces a {"status": "ok", "note": "..."} JSON object instead of Markdown. The dashboard's Markdown renderer will display it as a quoted JSON blob, which is ugly but not broken.
For real-mode runs, this step doesn't matter — Claude generates the synthesis Markdown live based on the upstream outputs.
Step 4: (Optional) Add a brief outcome
If you want users to be able to pick this workflow from the brief intake form, the tiktok_brief_outcome enum needs to know about your slug.
- Write a new alembic migration in
~/projects/tiktok-army/migration/:
def upgrade() -> None:
op.execute("ALTER TYPE tiktok_brief_outcome ADD VALUE IF NOT EXISTS 'my_workflow'")
def downgrade() -> None:
# Postgres doesn't support removing enum values cleanly. Document and skip.
pass- Update
routers/briefs.py:BriefCreateto allow the new outcome in its regex pattern.
- The dashboard's brief form pulls outcomes from the API — once the backend allows it, the form will too.
Until then, you can run the workflow ad-hoc (skip the brief, supply handle + target_type directly to POST /workflows/run).
Step 5: Test it end-to-end
Two levels.
Unit-ish test of the WorkflowDef. Just verify it round-trips through JSONB and that the topo sort works:
from tiktok_army.orchestrator.definitions import MY_WORKFLOW, WorkflowDef
from tiktok_army.orchestrator.runner import _topo_sort
def test_my_workflow_topo_sorts():
ordered = _topo_sort(MY_WORKFLOW.steps)
assert len(ordered) == len(MY_WORKFLOW.steps)
# Verify dependencies come before dependents
seen: set[str] = set()
for step in ordered:
for dep in step.depends_on:
assert dep in seen, f"{step.key} runs before its dep {dep}"
seen.add(step.key)
def test_my_workflow_serializes():
dag = MY_WORKFLOW.to_dag_jsonb()
rebuilt = WorkflowDef.from_dag_jsonb(
slug=MY_WORKFLOW.slug,
name=MY_WORKFLOW.name,
description=MY_WORKFLOW.description,
dag=dag,
)
assert len(rebuilt.steps) == len(MY_WORKFLOW.steps)
assert {s.key for s in rebuilt.steps} == {s.key for s in MY_WORKFLOW.steps}End-to-end mock-mode run. Pattern to copy (will require a test DB or sufficient mocking — see Testing for the current state of fixtures):
from uuid import uuid4
from tiktok_army.orchestrator.runner import run_workflow_inline
from tiktok_army.orchestrator.definitions import MY_WORKFLOW
async def test_my_workflow_runs_end_to_end():
workflow_run_id = uuid4()
result_md = await run_workflow_inline(
workspace_id=uuid4(),
workflow=MY_WORKFLOW,
brief={"handle": "lakucosmetics", "target_type": "third_party", "notes": None},
workflow_run_id=workflow_run_id,
brand_id=None,
)
# If the workflow has approval gates, run will return None and you need
# to drive the approve flow separately.
# If it has only a synthesis terminal step:
if MY_WORKFLOW.steps[-1].agent_name == "synthesis":
assert result_md is not None
assert "# " in result_md # has a Markdown headingThe end-to-end test exercises:
- •Topo sort
- •Input map resolution
- •Each agent's
_executeagainst mock fixtures - •Trace persistence
- •The synthesis step pulling the right
_SYNTHESIS_FIXTURESentry
Step 6: Sanity-check via the dashboard
In dev:
# Backend
cd ~/projects/tiktok-army
uv run uvicorn tiktok_army.main:app --reload --port 8000
# Dashboard (in another terminal)
cd ~/projects/tiktok-army/dashboard
TIKTOK_ARMY_API_URL=http://localhost:8000 npm run devHit http://localhost:3001. Your workflow should appear in /workflows. Trigger a run via the UI, watch the live DAG, click into each step to see the trace.
If something's off:
- •Step doesn't appear? → Check the topo sort isn't dropping it (cycle? unknown dependency?).
- •Step shows as
pendingforever? → It's an approval gate; check the Approvals queue. - •Step fails? → Click into it; the
error_messageand trace will tell you why.
Common gotchas
- •Synthesis step has no
input_map. It doesn't need one — the runner builds the synthesis prompt from the brief + every step's output that's inoutputs[step.key]. But itsdepends_onmatters: only steps it depends on (transitively) will have completed when it runs, but in practice you should depend on every step you want considered, since the runner doesn't execute downstream steps that aren't transitively reachable. - •Cycle detection only happens at run time. The topo sort raises on the first execution. There's no static check at module load; if you write a cycle, it ships fine and explodes when someone hits Run. Add a unit test (
_topo_sort(MY_WORKFLOW.steps)shouldn't raise). - •Approval gates are leaves today.
routers/approvals.py:107documents this. If your workflow has an approval gate followed by more agent steps, the current runner won't continue after approval — re-entry is a future refinement. Either keep approval gates as leaves (with synthesis as the actual final step), or wait for the re-entry feature. - •
brief_idconstraint. When users submit a brief, theoutcomevalue must match theworkflow_slugof the run. If you didn't add a brief outcome enum (Step 4), you can only run ad-hoc. - •Don't shadow built-in slugs. If you name your slug
profile_audit, you'll quietly override the seeded one. Use a unique slug.