Skip to content

Commit fdacd32

Browse files
PolyphonyRequiemDaniel GreenCopilot
authored
feat(config): add optional metadata dict to workflow definition (#107)
* feat(config): add optional metadata dict to workflow definition Add a metadata field to WorkflowDef that allows workflow authors to attach arbitrary key-value pairs for external tooling. The metadata is included verbatim in the workflow_started event, enabling downstream consumers (dashboards, trackers, enrichers) to adapt behavior without parsing the YAML source. Example usage in workflow YAML: workflow: name: twig-sdlc metadata: tracker: ado project_url: https://dev.azure.com/org/Project work_item_id_agent: intake work_item_id_field: epic_id The field defaults to an empty dict, so existing workflows are unaffected. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat(cli): add --metadata flag for runtime metadata injection Add --metadata / -m flag to 'conductor run' that accepts key=value pairs, merged on top of YAML-declared metadata. This enables callers to inject dynamic values at invocation time: conductor run twig-sdlc.yaml --metadata work_item_id=1814 CLI metadata is: - Parsed separately from --input (different binding path) - Merged on top of YAML metadata (CLI wins on conflicts) - Forwarded through --web-bg background process spawning - Included in the workflow_started event alongside YAML metadata Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * test: add metadata schema and loader tests 7 new tests verifying: - Schema: metadata defaults to empty dict, accepts arbitrary keys, independent from input/context fields - Loader: metadata round-trips through YAML, omission gives empty dict, nested values preserved, metadata and input are separate namespaces All 140 config tests pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix: wrap long help string to satisfy E501 line-length lint Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat: add run_id and log_file for deterministic run linking Propagate the event log's random hex suffix as a run_id across all systems: - EventLogSubscriber: expose run_id property (was already generated) - WorkflowEngine: accept run_id + log_file params, include in workflow_started event - PID files: include run_id + log_file fields - Web dashboard: add /api/info endpoint returning run_id, log_file, workflow_name, started_at, metadata This enables the central dashboard to match per-run dashboards to event logs by exact run_id instead of fragile name/timestamp heuristics. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat: add system metadata to workflow_started event and checkpoints Auto-inject runtime diagnostics (PID, platform, Python version, cwd, conductor version, started_at, run_id, log_file, bg_mode) into the workflow_started event. Dashboard port/URL included when --web is active; parent_pid included in --web-bg mode. System metadata flows through: - JSONL event log (via EventLogSubscriber) - Web dashboard /api/info endpoint - Checkpoint files (for resume context) PID files are intentionally left unchanged. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix: address PR #107 review feedback - Guard os.getcwd() with try/except OSError in _build_system_metadata() - Strip sensitive system info (PID, cwd, log_file) from /api/info endpoint - Add parse_metadata_flags() to keep metadata values as raw strings (no coercion) - Use _serialize_value() for metadata in bg_runner to handle nested dicts - Add public WebDashboard.port property, stop accessing _actual_port externally - Group informational params (run_id, log_file, dashboard_port, bg_mode) into RunContext dataclass Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * style: apply ruff formatting Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Daniel Green <dangreen@microsoft.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 0ccf1f9 commit fdacd32

12 files changed

Lines changed: 545 additions & 3 deletions

File tree

src/conductor/cli/app.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,17 @@ def run(
238238
help="Workflow inputs in name=value format. Can be repeated.",
239239
),
240240
] = None,
241+
raw_metadata: Annotated[
242+
list[str] | None,
243+
typer.Option(
244+
"--metadata",
245+
"-m",
246+
help=(
247+
"Workflow metadata in key=value format. "
248+
"Merged on top of YAML metadata. Can be repeated."
249+
),
250+
),
251+
] = None,
241252
dry_run: Annotated[
242253
bool,
243254
typer.Option(
@@ -300,12 +311,14 @@ def run(
300311
301312
Execute a multi-agent workflow defined in the specified YAML file.
302313
Workflow inputs can be provided using --input flags.
314+
Metadata can be provided using --metadata flags (merged on top of YAML metadata).
303315
304316
\b
305317
Examples:
306318
conductor run workflow.yaml
307319
conductor run workflow.yaml --input question="What is Python?"
308320
conductor run workflow.yaml -i question="Hello" -i context="Programming"
321+
conductor run workflow.yaml --metadata tracker=ado -m work_item_id=1814
309322
conductor run workflow.yaml --provider copilot
310323
conductor run workflow.yaml --dry-run
311324
conductor run workflow.yaml --skip-gates
@@ -350,6 +363,7 @@ def run(
350363
display_execution_plan,
351364
generate_log_path,
352365
parse_input_flags,
366+
parse_metadata_flags,
353367
run_workflow_async,
354368
)
355369

@@ -377,6 +391,11 @@ def run(
377391
# Also parse --input.name=value style from sys.argv
378392
inputs.update(InputCollector.extract_from_args())
379393

394+
# Parse --metadata key=value flags (no type coercion — values stay as strings)
395+
cli_metadata: dict[str, str] = {}
396+
if raw_metadata:
397+
cli_metadata.update(parse_metadata_flags(raw_metadata))
398+
380399
# Resolve log file path
381400
resolved_log_file: Path | None = None
382401
if log_file is not None:
@@ -398,6 +417,7 @@ def run(
398417
log_file=resolved_log_file,
399418
no_interactive=True, # Always non-interactive in background
400419
web_port=web_port,
420+
metadata=cli_metadata,
401421
)
402422
console.print(f"[bold cyan]Dashboard:[/bold cyan] {url}")
403423
console.print(
@@ -422,6 +442,7 @@ def run(
422442
web=web,
423443
web_port=web_port,
424444
web_bg=web_bg,
445+
metadata=cli_metadata,
425446
)
426447
)
427448

src/conductor/cli/bg_runner.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def launch_background(
6262
log_file: Path | None = None,
6363
no_interactive: bool = True,
6464
web_port: int = 0,
65+
metadata: dict[str, str] | None = None,
6566
) -> str:
6667
"""Fork a detached child process running the workflow with a web dashboard.
6768
@@ -77,6 +78,7 @@ def launch_background(
7778
log_file: Optional log file path.
7879
no_interactive: Whether to disable interactive mode (always True for bg).
7980
web_port: Desired port (0 = auto-select).
81+
metadata: Optional CLI metadata key=value pairs.
8082
8183
Returns:
8284
The dashboard URL (e.g. ``http://127.0.0.1:8080``).
@@ -107,6 +109,11 @@ def launch_background(
107109
for key, value in inputs.items():
108110
cmd.extend(["--input", f"{key}={_serialize_value(value)}"])
109111

112+
# Forward metadata
113+
if metadata:
114+
for key, value in metadata.items():
115+
cmd.extend(["--metadata", f"{key}={_serialize_value(value)}"])
116+
110117
if provider_override:
111118
cmd.extend(["--provider", provider_override])
112119

src/conductor/cli/pid.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,21 @@ def pid_dir() -> Path:
3838
return d
3939

4040

41-
def write_pid_file(pid: int, port: int, workflow_path: str | Path) -> Path:
41+
def write_pid_file(
42+
pid: int,
43+
port: int,
44+
workflow_path: str | Path,
45+
run_id: str = "",
46+
log_file: str = "",
47+
) -> Path:
4248
"""Write a PID file for a background workflow process.
4349
4450
Args:
4551
pid: Process ID of the background child.
4652
port: TCP port the web dashboard is listening on.
4753
workflow_path: Path to the workflow YAML file.
54+
run_id: Unique run identifier (from event log subscriber).
55+
log_file: Path to the JSONL event log file.
4856
4957
Returns:
5058
Path to the created PID file.
@@ -58,6 +66,8 @@ def write_pid_file(pid: int, port: int, workflow_path: str | Path) -> Path:
5866
"port": port,
5967
"workflow": str(workflow_path),
6068
"started_at": datetime.now(UTC).isoformat(),
69+
"run_id": run_id,
70+
"log_file": log_file,
6171
}
6272

6373
filepath.write_text(json.dumps(data, indent=2))

src/conductor/cli/run.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,41 @@ def parse_input_flags(raw_inputs: list[str]) -> dict[str, Any]:
841841
return inputs
842842

843843

844+
def parse_metadata_flags(raw_metadata: list[str]) -> dict[str, str]:
845+
"""Parse --metadata key=value flags into a dictionary.
846+
847+
Unlike ``parse_input_flags``, values are kept as raw strings with no
848+
type coercion — metadata is opaque key-value data.
849+
850+
Args:
851+
raw_metadata: List of "key=value" strings from CLI.
852+
853+
Returns:
854+
Dictionary of string key-value pairs.
855+
856+
Raises:
857+
typer.BadParameter: If metadata format is invalid.
858+
"""
859+
result: dict[str, str] = {}
860+
861+
for raw in raw_metadata:
862+
if "=" not in raw:
863+
raise typer.BadParameter(
864+
f"Invalid metadata format: '{raw}'. Expected format: key=value"
865+
)
866+
867+
key, value = raw.split("=", 1)
868+
key = key.strip()
869+
value = value.strip()
870+
871+
if not key:
872+
raise typer.BadParameter(f"Empty metadata key in: '{raw}'")
873+
874+
result[key] = value
875+
876+
return result
877+
878+
844879
def coerce_value(value: str) -> Any:
845880
"""Coerce a string value to an appropriate Python type.
846881
@@ -990,6 +1025,7 @@ async def run_workflow_async(
9901025
web: bool = False,
9911026
web_port: int = 0,
9921027
web_bg: bool = False,
1028+
metadata: dict[str, str] | None = None,
9931029
) -> dict[str, Any]:
9941030
"""Execute a workflow asynchronously.
9951031
@@ -1003,6 +1039,7 @@ async def run_workflow_async(
10031039
web: If True, start a real-time web dashboard.
10041040
web_port: Port for the web dashboard (0 = auto-select).
10051041
web_bg: If True, auto-shutdown dashboard after workflow + client disconnect.
1042+
metadata: Optional CLI metadata to merge on top of YAML-declared metadata.
10061043
10071044
Returns:
10081045
The workflow output as a dictionary.
@@ -1054,6 +1091,10 @@ async def run_workflow_async(
10541091
config = load_config(workflow_path)
10551092
verbose_log_timing("Configuration loaded", time.time() - load_start)
10561093

1094+
# Merge CLI metadata on top of YAML-declared metadata
1095+
if metadata:
1096+
config.workflow.metadata.update(metadata)
1097+
10571098
# Log workflow details
10581099
verbose_log(f"Workflow: {config.workflow.name}")
10591100
verbose_log(f"Entry point: {config.workflow.entry_point}")
@@ -1107,6 +1148,8 @@ async def run_workflow_async(
11071148
# so POST /api/stop can interrupt the running agent mid-execution
11081149
interrupt_event = asyncio.Event()
11091150

1151+
from conductor.engine.workflow import RunContext
1152+
11101153
engine = WorkflowEngine(
11111154
config,
11121155
registry=registry,
@@ -1116,6 +1159,12 @@ async def run_workflow_async(
11161159
event_emitter=emitter,
11171160
keyboard_listener=listener,
11181161
web_dashboard=dashboard,
1162+
run_context=RunContext(
1163+
run_id=event_log_subscriber.run_id if event_log_subscriber else "",
1164+
log_file=str(event_log_subscriber.path) if event_log_subscriber else "",
1165+
dashboard_port=(dashboard.port if dashboard is not None else None),
1166+
bg_mode=web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1",
1167+
),
11191168
)
11201169

11211170
# Share interrupt_event with dashboard so POST /api/stop can abort agents

src/conductor/config/schema.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,13 @@ class WorkflowDef(BaseModel):
738738
hooks: HooksConfig | None = None
739739
"""Lifecycle event hooks."""
740740

741+
metadata: dict[str, Any] = Field(default_factory=dict)
742+
"""Arbitrary key-value metadata for external tooling (dashboards, trackers, etc.).
743+
744+
Included verbatim in the ``workflow_started`` event so downstream
745+
consumers can use it for enrichment without parsing the YAML source.
746+
"""
747+
741748

742749
class WorkflowConfig(BaseModel):
743750
"""Complete workflow configuration file."""

src/conductor/engine/checkpoint.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ def save_checkpoint(
136136
error: BaseException,
137137
inputs: dict[str, Any],
138138
copilot_session_ids: dict[str, str] | None = None,
139+
system_metadata: dict[str, Any] | None = None,
139140
) -> Path | None:
140141
"""Serialize workflow state to a checkpoint file.
141142
@@ -153,6 +154,7 @@ def save_checkpoint(
153154
error: The exception that triggered the checkpoint.
154155
inputs: Workflow inputs.
155156
copilot_session_ids: Optional mapping of agent names to session IDs.
157+
system_metadata: Optional system metadata captured at workflow start.
156158
157159
Returns:
158160
Path to the saved checkpoint file, or ``None`` if saving failed.
@@ -193,6 +195,7 @@ def save_checkpoint(
193195
"context": _make_json_serializable(context.to_dict()),
194196
"limits": _make_json_serializable(limits.to_dict()),
195197
"copilot_session_ids": copilot_session_ids or {},
198+
"system": system_metadata or {},
196199
}
197200

198201
# Serialize to JSON

src/conductor/engine/event_log.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ def __init__(self, workflow_name: str) -> None:
6161
ts = time.strftime("%Y%m%d-%H%M%S")
6262
# Append random suffix to avoid filename collisions
6363
# when multiple runs start in the same second
64-
suffix = secrets.token_hex(4)
65-
ts = f"{ts}-{suffix}"
64+
self._run_id = secrets.token_hex(4)
65+
ts = f"{ts}-{self._run_id}"
6666
self._path = (
6767
Path(tempfile.gettempdir())
6868
/ "conductor"
@@ -71,6 +71,11 @@ def __init__(self, workflow_name: str) -> None:
7171
self._path.parent.mkdir(parents=True, exist_ok=True)
7272
self._handle = open(self._path, "w", encoding="utf-8") # noqa: SIM115
7373

74+
@property
75+
def run_id(self) -> str:
76+
"""Unique run identifier (8-char hex)."""
77+
return self._run_id
78+
7479
@property
7580
def path(self) -> Path:
7681
"""Path to the JSONL log file."""

0 commit comments

Comments
 (0)