Wintermute Framework, Part 6: The Orchestrator — Ticket to TestPlan to TestCaseRun Fan-Out

Wintermute Framework, Part 6: The Orchestrator

Part 5 gave us a single-test-case agent. In this post we build the orchestrator: a higher-level agent that ingests a ticket, derives engagement scope, attaches a TestPlan, generates TestCaseRuns, picks a starting test, and sequences the work. This is the backbone the per-run sub-agents in Part 7 will plug into.

I’ll lean on the LangGraph pattern from examples/08-LangGraph-Pentest-Agent.ipynb as the structural skeleton — but everything underneath is plain Wintermute, so this works equally well with LangGraph, plain tool_calling_chat, or a hand-rolled state machine.

Why an Orchestrator (and Not Just a Bigger Prompt)

Two reasons not to throw the whole engagement at one Claude call:

  1. Context budget. A real engagement has 10–50 test cases, each with
    3–10 reproduction steps and possibly multiple bound peripherals. The
    merged tool outputs over a full plan exceed any context window. The
    orchestrator’s job is to keep each step small and stash intermediate
    state in the Operation.
  2. Mixed cost/latency profiles. Reasoning (“which test case next?”,
    “this finding looks like an instance of CVE-X”) wants Claude Sonnet.
    Tool plumbing (“dump the EEPROM, run strings, classify”) wants Groq.
    The orchestrator runs on the slow/expensive lane; the per-run sub-agents
    run on the fast/cheap lane. That mapping is built into Router.choose‘s
    task_tag policy.

The Orchestrator’s Five Nodes

   Ticket  ───────►  read_ticket   ───►   plan_node    ───►   dispatch_node
                                                                   │
                                                                   ▼
                                                            execute_runs_node
                                                                   │
                                                                   ▼
                                                              report_node


  1. read_ticket — pull the ticket via Ticket.read(...), parse scope hints.
  2. plan_node — pick or generate a TestPlan, attach it to the operation,
    call op.generateTestRuns().
  3. dispatch_node — for each generated TestCaseRun, decide:
    a) execute it now,
    b) skip it (not_applicable),
    c) queue for human review (blocked).
  4. execute_runs_node — for each “execute now” run, spawn a Part-7-style
    sub-agent (we hand-wave it here, fully build it in Part 7).
  5. report_nodeReport.save(spec, [op], "out.docx") once everything
    has terminated.

Shared State

from typing import TypedDict, List, Optional, Any
from wintermute.core import Operation, TestCaseRun
from wintermute.tickets import Ticket
class OrchestratorState(TypedDict, total=False):
operation: Operation
ticket: Ticket
scope: dict[str, Any] # {target_host, bus, address, tags, ...}
plan_code: str # which TestPlan we attached
pending_runs: list[str] # run_ids still to execute
completed_runs: list[str] # run_ids that finished (any terminal)
skipped_runs: list[str] # run_ids marked not_applicable
failures: list[str] # run_ids with vulnerabilities
report_path: Optional[str]

The orchestrator’s only persistent state is the Operation. The OrchestratorState is in-memory plumbing for the LangGraph nodes — every result they care about (TestCaseRun.status, run.findings, op.test_plans) lives on the operation and is auto-persisted on op.save().

Node 1 — read_ticket

import re
from wintermute.tickets import Ticket
def read_ticket_node(state: OrchestratorState) -> OrchestratorState:
"""Pull the ticket, parse scope hints, seed state['scope']."""
tid = state["ticket"]
if isinstance(tid, str):
ticket = Ticket.read(tid)
else:
ticket = tid
desc = ticket.data.description or ""
# Lightweight, deterministic regex parsing so the orchestrator never
# gambles on the LLM hallucinating a target IP. The agent reasoning
# happens in plan_node and dispatch_node, not here.
def find(pat: str) -> str:
m = re.search(pat, desc)
return m.group(1) if m else ""
scope = {
"target_host": find(r"Target:\s*(\S+)"),
"bus": find(r"Bus:\s*(\S+),?"),
"address": find(r"Address:\s*(0x[0-9a-fA-F]+)"),
"tags": re.findall(r"#(\w+)", desc), # `#i2c #eeprom #blackbox`
"title": ticket.data.title,
}
state["ticket"] = ticket
state["scope"] = scope
return state

This is intentionally non-LLM. Scope parsing failures should produce operator-visible errors, not silent drift. If the ticket is unstructured (no Target: field), the orchestrator falls back to an LLM-driven scope extraction in plan_node — but the first attempt is always deterministic.

Node 2 — plan_node

plan_node is where the LLM earns its keep. It looks at the parsed scope and the device fleet on the operation, decides which TestPlan from disk fits (or builds one ad-hoc), attaches it, and generates runs.

import json
from pathlib import Path
from wintermute.ai.use import simple_chat
from wintermute.core import TestPlan
PLANS_DIR = Path("TestPlans")
def plan_node(state: OrchestratorState) -> OrchestratorState:
op = state["operation"]
scope = state["scope"]
# 1. Deterministic mapping for common scopes — keeps the LLM out of
# the path when we already know the answer.
plan_path: Path | None = None
if "blackbox" in scope["tags"] and "hardware" in scope["tags"]:
plan_path = PLANS_DIR / "TP-HW-BLACKBOX-001.json"
elif "iam" in scope["tags"] and "aws" in scope["tags"]:
plan_path = PLANS_DIR / "TP-AWS-IAM-ROLE-001.json"
elif "redteam" in scope["tags"] and "aws" in scope["tags"]:
plan_path = PLANS_DIR / "TP-AWS-RED-MEGA-001.json"
# 2. LLM fallback — describe the engagement and ask which plan applies.
# We pin the answer to the catalogue so the model can't invent
# a plan code that does not exist on disk.
if plan_path is None:
catalogue = "\n".join(
f"- {p.stem}: {json.loads(p.read_text())['name']}"
for p in sorted(PLANS_DIR.glob("*.json"))
)
prompt = (
f"Engagement scope: {json.dumps(scope)}\n"
f"Operation devices: {[d.hostname for d in op.devices]}\n"
f"Cloud accounts: {[a.name for a in op.cloud_accounts]}\n\n"
f"Pick the single best test plan for this scope from:\n{catalogue}\n\n"
"Reply ONLY with the plan code (e.g. TP-HW-BLACKBOX-001)."
)
# Cheap path — no tool use, just one classification call.
plan_code = simple_chat(state["router"], prompt, task_tag="cheap").strip()
plan_path = PLANS_DIR / f"{plan_code}.json"
if not plan_path.is_file():
raise RuntimeError(f"plan_node: model picked a non-existent plan: {plan_code}")
# 3. Attach + generate
plan = TestPlan.from_dict(json.loads(plan_path.read_text()))
op.addTestPlan(plan)
new_runs = op.generateTestRuns()
op.save()
state["plan_code"] = plan.code
state["pending_runs"] = [r.run_id for r in new_runs if r.status.value == "not_run"]
state["completed_runs"] = []
state["skipped_runs"] = []
state["failures"] = []
return state

Two ways this matters in a real engagement:

  • Hardware blackbox + I²C scopeTP-HW-BLACKBOX-001.json. Generates
    runs for board survey, debug interfaces, UART/JTAG/I²C/SPI, boot chain,
    TPM 2.0. For our IoT camera ticket, that’s ~17 runs.
  • AWS red team scopeTP-AWS-RED-MEGA-001.json. Generates runs for
    IAM role enumeration, S3, lambda, ECS, EKS — bound to the
    acme-prod account if it exists in the operation, none if it doesn’t.

The same orchestrator handles both because test plans are JSON. Add a plan, the orchestrator picks it up. No code changes.

Node 3 — dispatch_node

dispatch_node is the orchestrator’s first interesting decision. For each pending TestCaseRun it picks one of execute, skip, or defer. Naive implementations just execute everything — that wastes time on N/A cases (the I²C test bound to a device with no I²C peripheral was already filtered out by resolveBindings, but execution-blocking concerns like “do not interrupt boot during business hours” are real).

from wintermute.core import RunStatus
DISPATCH_SYSTEM = """You are the dispatcher for a sanctioned hardware
penetration test. You do NOT execute tests yourself. Your sole output is
a JSON object mapping each run_id to one of:
"execute" - run it now
"skip" - mark not_applicable; the bound target makes the test moot
"defer" - mark blocked; needs human input or a permit
Constraints:
- Skip a test only when the bound target clearly makes it irrelevant.
- Defer a test only when destructive (e.g., DA lockout, glitching).
- Default is "execute".
"""
def dispatch_node(state: OrchestratorState) -> OrchestratorState:
op = state["operation"]
pending = state["pending_runs"]
# Build a compact run summary for the LLM — code, target, status, any
# tags from the parent test case that hint at destructiveness.
summary = []
for run_id in pending:
run = next(r for r in op.test_runs if r.run_id == run_id)
tc = next((t for t in op.iterTestCases() if t.code == run.test_case_code), None)
summary.append({
"run_id": run_id,
"test_case": tc.code if tc else "?",
"name": tc.name if tc else "?",
"tags": tc.target_scope.tags if tc else [],
"execution_mode": tc.execution_mode.value if tc else "?",
"bound": [{"alias": b.alias, "kind": b.kind, "object_id": b.object_id}
for b in run.bound],
})
prompt = (
f"Engagement scope: {state['scope']}\n"
f"Pending runs: {json.dumps(summary, indent=2)}\n\n"
"Reply with a JSON object: {\"<run_id>\": \"execute|skip|defer\", ...}"
)
raw = simple_chat(state["router"], prompt,
task_tag="cheap") # dispatch is cheap reasoning
decisions: dict[str, str] = json.loads(raw)
new_pending: list[str] = []
for run_id in pending:
decision = decisions.get(run_id, "execute")
run = next(r for r in op.test_runs if r.run_id == run_id)
if decision == "skip":
run.status = RunStatus.not_applicable
run.finish()
state["skipped_runs"].append(run_id)
elif decision == "defer":
run.status = RunStatus.blocked
state["completed_runs"].append(run_id) # done for now
else:
new_pending.append(run_id)
state["pending_runs"] = new_pending
op.save()
return state

Concrete behavior on the IoT camera ticket:

  • IOT-HW-GEN-001 (engagement constraints) → defer (operator confirmation needed).
  • IOT-HW-DISC-001 (board survey, photos) → defer (manual).
  • IOT-HW-UART-001 (UART discovery) → execute.
  • IOT-HW-I2C-001 (EEPROM read) → execute.
  • IOT-HW-FAULT-001 (glitching) → defer (destructive).
  • IOT-HW-TPM-001 (DA lockout) → defer (the test is destructive on real silicon).

Out of ~17 runs, maybe 9 actually execute autonomously. The other 8 get flagged for the operator. This split is real, and the orchestrator’s ability to make it cleanly is one of the most valuable things it does.

Node 4 — execute_runs_node (Stub Now, Full Body in Part 7)

def execute_runs_node(state: OrchestratorState) -> OrchestratorState:
"""For each pending run, dispatch a sub-agent. Detailed in Part 7."""
from wintermute.subagents import run_subagent_for_test_case_run # Part 7
op = state["operation"]
while state["pending_runs"]:
run_id = state["pending_runs"].pop(0)
run = next(r for r in op.test_runs if r.run_id == run_id)
try:
run_subagent_for_test_case_run(op, run, router=state["router"])
except Exception as exc:
run.status = RunStatus.blocked
run.notes = (run.notes + "\n" if run.notes else "") + f"sub-agent error: {exc}"
run.finish()
if run.status.value == "failed":
state["failures"].append(run_id)
state["completed_runs"].append(run_id)
op.save() # checkpoint after each run
return state

Two operational details:

  • Per-run try/except. A sub-agent crash should never poison the whole
    engagement. We catch, mark the run blocked, and move on.
  • Checkpoint after every run. op.save() is cheap (one TinyDB or
    DynamoDB write); the upside is that an orchestrator crash mid-engagement
    loses at most one run.

Node 5 — report_node

from wintermute.reports import Report, ReportSpec
from wintermute.backends.docx_reports import DocxTplPerVulnBackend
Report.register_backend("docx", DocxTplPerVulnBackend(
template_dir="templates",
main_template="report_main.docx",
vuln_template="report_vuln.docx",
), make_default=True)
def report_node(state: OrchestratorState) -> OrchestratorState:
op = state["operation"]
spec = ReportSpec(
title=f"{op.operation_name} — Hardware Security Assessment",
author="Wintermute Orchestrator",
summary=(
f"{state['plan_code']} executed against {len(op.devices)} device(s). "
f"{len(state['completed_runs'])} runs completed, "
f"{len(state['failures'])} findings, "
f"{len(state['skipped_runs'])} N/A, "
f"{sum(1 for r in op.test_runs if r.status.value == 'blocked')} blocked."
),
)
out = f"./reports/{op.operation_name}.docx"
Path("reports").mkdir(exist_ok=True)
Report.save(spec, [op], out)
state["report_path"] = out
return state

The report walks the operation graph (every Vulnerability reachable from every Device → Service, every Peripheral, every cloud account, plus the TestCaseRun.findings lists) and produces a DOCX with:

  • one section per Device summarizing peripherals and findings,
  • one chapter per Vulnerability with reproduction steps and risk,
  • one appendix per TestCaseRun with status, notes, and bound targets.

Drop the customer logo into templates/report_main.docx, swap colors, ship.

Compiling the Graph

Plain Python is enough; LangGraph just gives you nicer streaming and visualization. From examples/08-LangGraph-Pentest-Agent.ipynb:

from langgraph.graph import END, StateGraph
graph = StateGraph(OrchestratorState)
graph.add_node("read_ticket", read_ticket_node)
graph.add_node("plan", plan_node)
graph.add_node("dispatch", dispatch_node)
graph.add_node("execute", execute_runs_node)
graph.add_node("report", report_node)
graph.set_entry_point("read_ticket")
graph.add_edge("read_ticket", "plan")
graph.add_edge("plan", "dispatch")
graph.add_edge("dispatch", "execute")
graph.add_edge("execute", "report")
graph.add_edge("report", END)
compiled = graph.compile()
result = compiled.invoke({
"operation": op, # built in Part 2
"ticket": "T000001", # the I²C ticket
"router": init_router(),
})
print(result["report_path"])

That single compiled.invoke(...) will:

  1. read ticket T000001,
  2. parse out target=iot-cam-01, bus=I2C-2, address=0x50,
  3. classify the scope as hardware blackbox,
  4. attach TP-HW-BLACKBOX-001.json and generate ~17 runs,
  5. dispatch each run to execute/skip/defer,
  6. spawn a sub-agent (Part 7) per execute run,
  7. checkpoint after each,
  8. emit ./reports/acme-iotcam-2026-Q2.docx.

Driving It From the Console (Without LangGraph)

You don’t need LangGraph at all — the same orchestrator runs from the Wintermute REPL or via MCP. From the operator side this looks like:

onoSendai > operation load acme-iotcam-2026-Q2
onoSendai [acme-iotcam-2026-Q2] > setup_ticket_backend bugzilla \
--url https://bugs.acme.local --api-key $BZ_KEY \
--product IoT-Camera --component Security
[*] Bugzilla backend registered.
onoSendai [acme-iotcam-2026-Q2] > orchestrator run T000001
[*] read_ticket: title="I2C EEPROM extraction on MCIO" tags=#i2c #eeprom #blackbox
[*] plan_node: attached TP-HW-BLACKBOX-001 (24 cases) — generated 17 runs
[*] dispatch_node: 9 execute, 6 defer, 2 skip
[*] execute_runs_node: dispatching sub-agents...
...
[*] report_node: ./reports/acme-iotcam-2026-Q2.docx

orchestrator run is the convenience command we implement on top of the LangGraph (or hand-rolled state machine) — a one-liner that wraps compiled.invoke(...) and streams updates to the console. It is not shipped today; building it is the natural next REPL extension after the sub-agents in Part 7 stabilize.

What Goes Wrong, and What the Framework Catches

  • plan_node picks a non-existent plan. Mitigated by validating
    against PLANS_DIR.glob("*.json") before attaching.
  • dispatch_node returns malformed JSON. Wrap in try/except, retry
    once with “your previous reply did not parse,” then default to
    execute for unspecified runs.
  • A sub-agent loops forever. Per-run wall-clock budget enforced inside
    run_subagent_for_test_case_run (Part 7). The orchestrator only sees the
    final status.
  • The Bedrock account hits a quota mid-execution. op.save() after
    each run means re-invoking the orchestrator on the same operation
    resumes — pending_runs is rebuilt by checking
    [r.run_id for r in op.test_runs if r.status == not_run].

What’s Next

Part 7 — the per-test-case sub-agents. That is the body of run_subagent_for_test_case_run. Each sub-agent is a specialized Part-5-style loop with a curated tool surface tailored to the test case’s kind (UART, JTAG, I²C, TPM 2.0, AWS-IAM, …) and a verification step that pulls reproduction artifacts back into the orchestrator’s state. We close the series there with a worked end-to-end trace on the IoT-camera engagement.

Leave a Reply

Hey!

I’m Bedrock. Discover the ultimate Minetest resource – your go-to guide for expert tutorials, stunning mods, and exclusive stories. Elevate your game with insider knowledge and tips from seasoned Minetest enthusiasts.

Join the club

Stay updated with our latest tips and other news by joining our newsletter.

Discover more from Exploit.Ninja

Subscribe now to keep reading and get access to the full archive.

Continue reading