Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions aws_doc_sdk_examples_tools/lliam/service_layer/run_ailly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import json
import logging
import time
from collections import defaultdict
from datetime import timedelta
from pathlib import Path
from subprocess import run
from typing import Any, Dict, List, Optional, Set

from aws_doc_sdk_examples_tools.lliam.domain.commands import RunAilly
from aws_doc_sdk_examples_tools.lliam.config import (
AILLY_DIR_PATH,
BATCH_PREFIX,
)

logger = logging.getLogger(__file__)


def handle_run_ailly(cmd: RunAilly, uow: None):
resolved_batches = resolve_requested_batches(cmd.batches)

if resolved_batches:
total_start_time = time.time()

for batch in resolved_batches:
run_ailly_single_batch(batch)

total_end_time = time.time()
total_duration = total_end_time - total_start_time
num_batches = len(resolved_batches)
logger.info(
f"[TIMECHECK] {num_batches} batches took {format_duration(total_duration)} to run"
)


def resolve_requested_batches(batch_names: List[str]) -> List[Path]:
if not batch_names:
batch_paths = [
p
for p in AILLY_DIR_PATH.iterdir()
if p.is_dir() and p.name.startswith(BATCH_PREFIX)
]

return batch_paths

batch_paths = []

for batch_name in batch_names:
batch_path = Path(AILLY_DIR_PATH / batch_name)
if not batch_path.exists():
raise FileNotFoundError(batch_path)
if not batch_path.is_dir():
raise NotADirectoryError(batch_path)
batch_paths.append(batch_path)

return batch_paths


def run_ailly_single_batch(batch: Path) -> None:
"""Run ailly and process files for a single batch."""
batch_start_time = time.time()
iam_updates_path = AILLY_DIR_PATH / f"updates_{batch.name}.json"

cmd = [
"ailly",
"--max-depth",
"10",
"--root",
str(AILLY_DIR_PATH),
batch.name,
]
logger.info(f"Running {cmd}")
run(cmd)

batch_end_time = time.time()
batch_duration = batch_end_time - batch_start_time
logger.info(
f"[TIMECHECK] {batch.name} took {format_duration(batch_duration)} to run"
)

logger.info(f"Processing generated content for {batch.name}")
process_ailly_files(input_dir=batch, output_file=iam_updates_path)


EXPECTED_KEYS: Set[str] = set(["title", "title_abbrev"])
VALUE_PREFIXES: Dict[str, str] = {"title": "", "title_abbrev": "", "synopsis": ""}


class MissingExpectedKeys(Exception):
pass


def parse_fenced_blocks(content: str, fence="===") -> List[List[str]]:
blocks = []
inside_fence = False
current_block: List[str] = []

for line in content.splitlines():
if line.strip() == fence:
if inside_fence:
blocks.append(current_block)
current_block = []
inside_fence = not inside_fence
elif inside_fence:
current_block.append(line)

return blocks


def parse_block_lines(
block: List[str], key_pairs: Dict[str, str], expected_keys=EXPECTED_KEYS
):
for line in block:
if "=>" in line:
parts = line.split("=>", 1)
key = parts[0].strip()
value = parts[1].strip() if len(parts) > 1 else ""
key_pairs[key] = value
if missing_keys := expected_keys - key_pairs.keys():
raise MissingExpectedKeys(missing_keys)


def parse_ailly_file(
file_path: str, value_prefixes: Dict[str, str] = VALUE_PREFIXES
) -> Dict[str, Any]:
"""
Parse an .md.ailly.md file and extract key-value pairs that are between === fence markers. Each
key value pair is assumed to be on one line and in the form of `key => value`. This formatting is
totally dependent on the LLM output written by Ailly.

Args:
file_path: Path to the .md.ailly.md file

Returns:
Dictionary containing the extracted key-value pairs
"""
result: Dict[str, str] = {}

try:
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()

blocks = parse_fenced_blocks(content)

for block in blocks:
parse_block_lines(block, result)

for key, prefix in value_prefixes.items():
if key in result:
result[key] = f"{prefix}{result[key]}"

result["id"] = Path(file_path).name.split(".md.ailly.md")[0]
result["_source_file"] = file_path

except Exception as e:
logger.error(f"Error parsing file {file_path}", exc_info=e)

return result


def parse_package_name(policy_update: Dict[str, str]) -> Optional[str]:
if not policy_update:
return None

if not isinstance(policy_update, dict):
return None

if not (id := policy_update.get("id")):
return None

id_parts = [part.strip() for part in id.split(".")]

if id_parts[0] != "iam-policies":
return None

return id_parts[1] # The package name, hopefully.


def process_ailly_files(
input_dir: Path, output_file: Path, file_pattern: str = "*.md.ailly.md"
) -> None:
"""
Process all .md.ailly.md files in the input directory and write the results as JSON to the output file.

Args:
input_dir: Directory containing .md.ailly.md files
output_file: Path to the output JSON file
file_pattern: Pattern to match files (default: "*.md.ailly.md")
"""
results = defaultdict(list)

try:
for file_path in input_dir.rglob(file_pattern):
logger.info(f"Processing file: {file_path}")
policy_update = parse_ailly_file(str(file_path))
if policy_update:
package_name = parse_package_name(policy_update)
if not package_name:
raise TypeError(f"Could not get package name from policy update.")
results[package_name].append(policy_update)

with open(output_file, "w", encoding="utf-8") as out_file:
json.dump(results, out_file, indent=2)

logger.info(
f"Successfully processed files. Output written to {output_file.name}"
)

except Exception as e:
logger.error("Error processing files", exc_info=e)


def format_duration(seconds: float) -> str:
td = timedelta(seconds=seconds)
return str(td).zfill(8)
115 changes: 115 additions & 0 deletions aws_doc_sdk_examples_tools/lliam/service_layer/update_doc_gen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import json
import logging
from collections import Counter
from pathlib import Path
from typing import Dict, Iterable, List

from aws_doc_sdk_examples_tools.yaml_writer import prepare_write, write_many

from aws_doc_sdk_examples_tools.lliam.config import (
AILLY_DIR_PATH,
BATCH_PREFIX,
)
from aws_doc_sdk_examples_tools.lliam.domain.commands import UpdateReservoir
from aws_doc_sdk_examples_tools.doc_gen import DocGen, Example

logger = logging.getLogger(__name__)

IAM_LANGUAGE = "IAMPolicyGrammar"


def examples_from_updates(updates: List[Dict]) -> Iterable[Example]:
"""
Takes a list of example metadata updates and returns an
iterable of examples with the applied updates.
"""

indexed_updates = {}
for item in updates:
if "id" in item:
indexed_updates[item["id"]] = item

examples = [
Example(
id=id,
file=None,
languages={},
title=update.get("title"),
title_abbrev=update.get("title_abbrev"),
synopsis=update.get("synopsis"),
)
for id, update in indexed_updates.items()
]
return examples


def make_title_abbreviation(old: Example, new: Example, abbreviations: Counter):
language = old.languages[IAM_LANGUAGE]
version = language.versions[0]
source = version.source
source_title = source.title if source else ""
base = f"{new.title_abbrev} (from '{source_title}' docs)"
abbreviations[base] += 1
count = abbreviations[base]
return f"{base} ({count})" if count > 1 else base


def update_examples(doc_gen: DocGen, examples: Iterable[Example]) -> Dict[str, Example]:
"""
Merge a subset of example properties into a DocGen instance.
"""
title_abbrevs = Counter(
[example.title_abbrev for example in doc_gen.examples.values()]
)
updated = {}
for example in examples:
if doc_gen_example := doc_gen.examples.get(example.id):
doc_gen_example.title = example.title
doc_gen_example.title_abbrev = make_title_abbreviation(
old=doc_gen_example, new=example, abbreviations=title_abbrevs
)
doc_gen_example.synopsis = example.synopsis
updated[doc_gen_example.id] = doc_gen_example
else:
logger.warning(f"Could not find example with id: {example.id}")
return updated


def update_doc_gen(doc_gen_root: Path, updates: List[Dict]) -> Dict[str, Example]:
doc_gen = DocGen.from_root(doc_gen_root)
examples = examples_from_updates(updates)
updated_examples = update_examples(doc_gen, examples)
return updated_examples


def handle_update_reservoir(cmd: UpdateReservoir, uow: None):
update_files = (
[AILLY_DIR_PATH / f"updates_{batch}.json" for batch in cmd.batches]
if cmd.batches
else list(AILLY_DIR_PATH.glob(f"updates_{BATCH_PREFIX}*.json"))
)

if not update_files:
logger.warning("No IAM update files found to process")
return

for update_file in sorted(update_files):
if update_file.exists():
logger.info(f"Processing updates from {update_file.name}")
updates = json.loads(update_file.read_text())
if cmd.packages:
updates = [
update
for package, update_list in updates.items()
if package in cmd.packages
for update in update_list
]
if not updates:
logger.warning(f"No matching updates to run in {update_file.name}")
continue
examples = update_doc_gen(doc_gen_root=cmd.root, updates=updates)

writes = prepare_write(examples)
write_many(cmd.root, writes)
else:
logger.warning(f"Update file not found: {update_file}")
Loading