Wintermute Framework, Part 6: The Orchestrator
Part 5 gave us a single-test-case agent. In this post we build the orchestrator: a higher-level agent that ingests a ticket, derives engagement scope, attaches a TestPlan, generates TestCaseRuns, picks a starting test, and sequences the work. This is the backbone the per-run sub-agents in Part 7 will plug into.
I’ll lean on the LangGraph pattern from
examples/08-LangGraph-Pentest-Agent.ipynb
as the structural skeleton — but everything underneath is plain Wintermute,
so this works equally well with LangGraph, plain tool_calling_chat, or a
hand-rolled state machine.
Why an Orchestrator (and Not Just a Bigger Prompt)
Two reasons not to throw the whole engagement at one Claude call:
- Context budget. A real engagement has 10–50 test cases, each with
3–10 reproduction steps and possibly multiple bound peripherals. The
merged tool outputs over a full plan exceed any context window. The
orchestrator’s job is to keep each step small and stash intermediate
state in theOperation. - Mixed cost/latency profiles. Reasoning (“which test case next?”,
“this finding looks like an instance of CVE-X”) wants Claude Sonnet.
Tool plumbing (“dump the EEPROM, run strings, classify”) wants Groq.
The orchestrator runs on the slow/expensive lane; the per-run sub-agents
run on the fast/cheap lane. That mapping is built intoRouter.choose‘stask_tagpolicy.
The Orchestrator’s Five Nodes
Ticket ───────► read_ticket ───► plan_node ───► dispatch_node
│
▼
execute_runs_node
│
▼
report_node
read_ticket— pull the ticket viaTicket.read(...), parse scope hints.plan_node— pick or generate aTestPlan, attach it to the operation,
callop.generateTestRuns().dispatch_node— for each generatedTestCaseRun, decide:
a) execute it now,
b) skip it (not_applicable),
c) queue for human review (blocked).execute_runs_node— for each “execute now” run, spawn a Part-7-style
sub-agent (we hand-wave it here, fully build it in Part 7).report_node—Report.save(spec, [op], "out.docx")once everything
has terminated.
Shared State
from typing import TypedDict, List, Optional, Anyfrom wintermute.core import Operation, TestCaseRunfrom wintermute.tickets import Ticketclass OrchestratorState(TypedDict, total=False): operation: Operation ticket: Ticket scope: dict[str, Any] # {target_host, bus, address, tags, ...} plan_code: str # which TestPlan we attached pending_runs: list[str] # run_ids still to execute completed_runs: list[str] # run_ids that finished (any terminal) skipped_runs: list[str] # run_ids marked not_applicable failures: list[str] # run_ids with vulnerabilities report_path: Optional[str]
The orchestrator’s only persistent state is the Operation. The
OrchestratorState is in-memory plumbing for the LangGraph nodes — every
result they care about (TestCaseRun.status, run.findings, op.test_plans)
lives on the operation and is auto-persisted on op.save().
Node 1 — read_ticket
import refrom wintermute.tickets import Ticketdef read_ticket_node(state: OrchestratorState) -> OrchestratorState: """Pull the ticket, parse scope hints, seed state['scope'].""" tid = state["ticket"] if isinstance(tid, str): ticket = Ticket.read(tid) else: ticket = tid desc = ticket.data.description or "" # Lightweight, deterministic regex parsing so the orchestrator never # gambles on the LLM hallucinating a target IP. The agent reasoning # happens in plan_node and dispatch_node, not here. def find(pat: str) -> str: m = re.search(pat, desc) return m.group(1) if m else "" scope = { "target_host": find(r"Target:\s*(\S+)"), "bus": find(r"Bus:\s*(\S+),?"), "address": find(r"Address:\s*(0x[0-9a-fA-F]+)"), "tags": re.findall(r"#(\w+)", desc), # `#i2c #eeprom #blackbox` "title": ticket.data.title, } state["ticket"] = ticket state["scope"] = scope return state
This is intentionally non-LLM. Scope parsing failures should produce
operator-visible errors, not silent drift. If the ticket is unstructured
(no Target: field), the orchestrator falls back to an LLM-driven scope
extraction in plan_node — but the first attempt is always deterministic.
Node 2 — plan_node
plan_node is where the LLM earns its keep. It looks at the parsed scope
and the device fleet on the operation, decides which TestPlan from disk
fits (or builds one ad-hoc), attaches it, and generates runs.
import jsonfrom pathlib import Pathfrom wintermute.ai.use import simple_chatfrom wintermute.core import TestPlanPLANS_DIR = Path("TestPlans")def plan_node(state: OrchestratorState) -> OrchestratorState: op = state["operation"] scope = state["scope"] # 1. Deterministic mapping for common scopes — keeps the LLM out of # the path when we already know the answer. plan_path: Path | None = None if "blackbox" in scope["tags"] and "hardware" in scope["tags"]: plan_path = PLANS_DIR / "TP-HW-BLACKBOX-001.json" elif "iam" in scope["tags"] and "aws" in scope["tags"]: plan_path = PLANS_DIR / "TP-AWS-IAM-ROLE-001.json" elif "redteam" in scope["tags"] and "aws" in scope["tags"]: plan_path = PLANS_DIR / "TP-AWS-RED-MEGA-001.json" # 2. LLM fallback — describe the engagement and ask which plan applies. # We pin the answer to the catalogue so the model can't invent # a plan code that does not exist on disk. if plan_path is None: catalogue = "\n".join( f"- {p.stem}: {json.loads(p.read_text())['name']}" for p in sorted(PLANS_DIR.glob("*.json")) ) prompt = ( f"Engagement scope: {json.dumps(scope)}\n" f"Operation devices: {[d.hostname for d in op.devices]}\n" f"Cloud accounts: {[a.name for a in op.cloud_accounts]}\n\n" f"Pick the single best test plan for this scope from:\n{catalogue}\n\n" "Reply ONLY with the plan code (e.g. TP-HW-BLACKBOX-001)." ) # Cheap path — no tool use, just one classification call. plan_code = simple_chat(state["router"], prompt, task_tag="cheap").strip() plan_path = PLANS_DIR / f"{plan_code}.json" if not plan_path.is_file(): raise RuntimeError(f"plan_node: model picked a non-existent plan: {plan_code}") # 3. Attach + generate plan = TestPlan.from_dict(json.loads(plan_path.read_text())) op.addTestPlan(plan) new_runs = op.generateTestRuns() op.save() state["plan_code"] = plan.code state["pending_runs"] = [r.run_id for r in new_runs if r.status.value == "not_run"] state["completed_runs"] = [] state["skipped_runs"] = [] state["failures"] = [] return state
Two ways this matters in a real engagement:
- Hardware blackbox + I²C scope →
TP-HW-BLACKBOX-001.json. Generates
runs for board survey, debug interfaces, UART/JTAG/I²C/SPI, boot chain,
TPM 2.0. For our IoT camera ticket, that’s ~17 runs. - AWS red team scope →
TP-AWS-RED-MEGA-001.json. Generates runs for
IAM role enumeration, S3, lambda, ECS, EKS — bound to theacme-prodaccount if it exists in the operation, none if it doesn’t.
The same orchestrator handles both because test plans are JSON. Add a plan, the orchestrator picks it up. No code changes.
Node 3 — dispatch_node
dispatch_node is the orchestrator’s first interesting decision. For each
pending TestCaseRun it picks one of execute, skip, or defer. Naive
implementations just execute everything — that wastes time on N/A cases
(the I²C test bound to a device with no I²C peripheral was already filtered
out by resolveBindings, but execution-blocking concerns like “do not
interrupt boot during business hours” are real).
from wintermute.core import RunStatusDISPATCH_SYSTEM = """You are the dispatcher for a sanctioned hardwarepenetration test. You do NOT execute tests yourself. Your sole output isa JSON object mapping each run_id to one of: "execute" - run it now "skip" - mark not_applicable; the bound target makes the test moot "defer" - mark blocked; needs human input or a permitConstraints:- Skip a test only when the bound target clearly makes it irrelevant.- Defer a test only when destructive (e.g., DA lockout, glitching).- Default is "execute"."""def dispatch_node(state: OrchestratorState) -> OrchestratorState: op = state["operation"] pending = state["pending_runs"] # Build a compact run summary for the LLM — code, target, status, any # tags from the parent test case that hint at destructiveness. summary = [] for run_id in pending: run = next(r for r in op.test_runs if r.run_id == run_id) tc = next((t for t in op.iterTestCases() if t.code == run.test_case_code), None) summary.append({ "run_id": run_id, "test_case": tc.code if tc else "?", "name": tc.name if tc else "?", "tags": tc.target_scope.tags if tc else [], "execution_mode": tc.execution_mode.value if tc else "?", "bound": [{"alias": b.alias, "kind": b.kind, "object_id": b.object_id} for b in run.bound], }) prompt = ( f"Engagement scope: {state['scope']}\n" f"Pending runs: {json.dumps(summary, indent=2)}\n\n" "Reply with a JSON object: {\"<run_id>\": \"execute|skip|defer\", ...}" ) raw = simple_chat(state["router"], prompt, task_tag="cheap") # dispatch is cheap reasoning decisions: dict[str, str] = json.loads(raw) new_pending: list[str] = [] for run_id in pending: decision = decisions.get(run_id, "execute") run = next(r for r in op.test_runs if r.run_id == run_id) if decision == "skip": run.status = RunStatus.not_applicable run.finish() state["skipped_runs"].append(run_id) elif decision == "defer": run.status = RunStatus.blocked state["completed_runs"].append(run_id) # done for now else: new_pending.append(run_id) state["pending_runs"] = new_pending op.save() return state
Concrete behavior on the IoT camera ticket:
IOT-HW-GEN-001(engagement constraints) →defer(operator confirmation needed).IOT-HW-DISC-001(board survey, photos) →defer(manual).IOT-HW-UART-001(UART discovery) →execute.IOT-HW-I2C-001(EEPROM read) →execute.IOT-HW-FAULT-001(glitching) →defer(destructive).IOT-HW-TPM-001(DA lockout) →defer(the test is destructive on real silicon).
Out of ~17 runs, maybe 9 actually execute autonomously. The other 8 get flagged for the operator. This split is real, and the orchestrator’s ability to make it cleanly is one of the most valuable things it does.
Node 4 — execute_runs_node (Stub Now, Full Body in Part 7)
def execute_runs_node(state: OrchestratorState) -> OrchestratorState: """For each pending run, dispatch a sub-agent. Detailed in Part 7.""" from wintermute.subagents import run_subagent_for_test_case_run # Part 7 op = state["operation"] while state["pending_runs"]: run_id = state["pending_runs"].pop(0) run = next(r for r in op.test_runs if r.run_id == run_id) try: run_subagent_for_test_case_run(op, run, router=state["router"]) except Exception as exc: run.status = RunStatus.blocked run.notes = (run.notes + "\n" if run.notes else "") + f"sub-agent error: {exc}" run.finish() if run.status.value == "failed": state["failures"].append(run_id) state["completed_runs"].append(run_id) op.save() # checkpoint after each run return state
Two operational details:
- Per-run try/except. A sub-agent crash should never poison the whole
engagement. We catch, mark the runblocked, and move on. - Checkpoint after every run.
op.save()is cheap (one TinyDB or
DynamoDB write); the upside is that an orchestrator crash mid-engagement
loses at most one run.
Node 5 — report_node
from wintermute.reports import Report, ReportSpecfrom wintermute.backends.docx_reports import DocxTplPerVulnBackendReport.register_backend("docx", DocxTplPerVulnBackend( template_dir="templates", main_template="report_main.docx", vuln_template="report_vuln.docx",), make_default=True)def report_node(state: OrchestratorState) -> OrchestratorState: op = state["operation"] spec = ReportSpec( title=f"{op.operation_name} — Hardware Security Assessment", author="Wintermute Orchestrator", summary=( f"{state['plan_code']} executed against {len(op.devices)} device(s). " f"{len(state['completed_runs'])} runs completed, " f"{len(state['failures'])} findings, " f"{len(state['skipped_runs'])} N/A, " f"{sum(1 for r in op.test_runs if r.status.value == 'blocked')} blocked." ), ) out = f"./reports/{op.operation_name}.docx" Path("reports").mkdir(exist_ok=True) Report.save(spec, [op], out) state["report_path"] = out return state
The report walks the operation graph (every Vulnerability reachable from
every Device → Service, every Peripheral, every cloud account, plus the
TestCaseRun.findings lists) and produces a DOCX with:
- one section per
Devicesummarizing peripherals and findings, - one chapter per
Vulnerabilitywith reproduction steps and risk, - one appendix per
TestCaseRunwith status, notes, and bound targets.
Drop the customer logo into templates/report_main.docx, swap colors,
ship.
Compiling the Graph
Plain Python is enough; LangGraph just gives you nicer streaming and
visualization. From examples/08-LangGraph-Pentest-Agent.ipynb:
from langgraph.graph import END, StateGraphgraph = StateGraph(OrchestratorState)graph.add_node("read_ticket", read_ticket_node)graph.add_node("plan", plan_node)graph.add_node("dispatch", dispatch_node)graph.add_node("execute", execute_runs_node)graph.add_node("report", report_node)graph.set_entry_point("read_ticket")graph.add_edge("read_ticket", "plan")graph.add_edge("plan", "dispatch")graph.add_edge("dispatch", "execute")graph.add_edge("execute", "report")graph.add_edge("report", END)compiled = graph.compile()result = compiled.invoke({ "operation": op, # built in Part 2 "ticket": "T000001", # the I²C ticket "router": init_router(),})print(result["report_path"])
That single compiled.invoke(...) will:
- read ticket
T000001, - parse out
target=iot-cam-01,bus=I2C-2,address=0x50, - classify the scope as hardware blackbox,
- attach
TP-HW-BLACKBOX-001.jsonand generate ~17 runs, - dispatch each run to
execute/skip/defer, - spawn a sub-agent (Part 7) per
executerun, - checkpoint after each,
- emit
./reports/acme-iotcam-2026-Q2.docx.
Driving It From the Console (Without LangGraph)
You don’t need LangGraph at all — the same orchestrator runs from the Wintermute REPL or via MCP. From the operator side this looks like:
onoSendai > operation load acme-iotcam-2026-Q2onoSendai [acme-iotcam-2026-Q2] > setup_ticket_backend bugzilla \ --url https://bugs.acme.local --api-key $BZ_KEY \ --product IoT-Camera --component Security[*] Bugzilla backend registered.onoSendai [acme-iotcam-2026-Q2] > orchestrator run T000001[*] read_ticket: title="I2C EEPROM extraction on MCIO" tags=#i2c #eeprom #blackbox[*] plan_node: attached TP-HW-BLACKBOX-001 (24 cases) — generated 17 runs[*] dispatch_node: 9 execute, 6 defer, 2 skip[*] execute_runs_node: dispatching sub-agents... ...[*] report_node: ./reports/acme-iotcam-2026-Q2.docx
orchestrator run is the convenience command we implement on top of the
LangGraph (or hand-rolled state machine) — a one-liner that wraps
compiled.invoke(...) and streams updates to the console. It is not
shipped today; building it is the natural next REPL extension after the
sub-agents in Part 7 stabilize.
What Goes Wrong, and What the Framework Catches
plan_nodepicks a non-existent plan. Mitigated by validating
againstPLANS_DIR.glob("*.json")before attaching.dispatch_nodereturns malformed JSON. Wrap in try/except, retry
once with “your previous reply did not parse,” then default toexecutefor unspecified runs.- A sub-agent loops forever. Per-run wall-clock budget enforced inside
run_subagent_for_test_case_run(Part 7). The orchestrator only sees the
final status. - The Bedrock account hits a quota mid-execution.
op.save()after
each run means re-invoking the orchestrator on the same operation
resumes —pending_runsis rebuilt by checking[r.run_id for r in op.test_runs if r.status == not_run].
What’s Next
Part 7 — the per-test-case sub-agents. That is the body of run_subagent_for_test_case_run. Each sub-agent is a specialized Part-5-style loop with a curated tool surface tailored to the test case’s kind (UART, JTAG, I²C, TPM 2.0, AWS-IAM, …) and a verification step that pulls reproduction artifacts back into the orchestrator’s state. We close the series there with a worked end-to-end trace on the IoT-camera engagement.






Leave a Reply