|
1 | | -from typing import Dict |
| 1 | +from pathlib import Path as P |
| 2 | +from typing import Any, Callable, ClassVar, Dict, List, Set |
| 3 | +from typing_extensions import Protocol |
2 | 4 |
|
3 | 5 | import click |
4 | 6 |
|
5 | | -from commodore.component import Component |
| 7 | +from commodore.config import Config, Component |
6 | 8 | from commodore.helpers import yaml_load |
7 | 9 |
|
8 | | -from .jsonnet import run_jsonnet_filter |
9 | | -from .builtin_filters import run_builtin_filter |
10 | 10 | from .inventory import resolve_inventory_vars, InventoryError |
11 | 11 |
|
| 12 | +from .jsonnet import run_jsonnet_filter, validate_jsonnet_filter |
| 13 | +from .builtin_filters import run_builtin_filter, validate_builtin_filter |
12 | 14 |
|
13 | | -def postprocess_components(config, kapitan_inventory, components: Dict[str, Component]): |
| 15 | + |
| 16 | +class FilterFunc(Protocol): |
| 17 | + def __call__( |
| 18 | + self, inventory: Dict, component: str, filterid: str, path: P, **filterargs: str |
| 19 | + ): |
| 20 | + ... |
| 21 | + |
| 22 | + |
| 23 | +ValidateFunc = Callable[[str, Dict], Dict] |
| 24 | + |
| 25 | + |
| 26 | +class Filter: |
| 27 | + type: str |
| 28 | + filter: str |
| 29 | + path: P |
| 30 | + filterargs: Dict |
| 31 | + enabled: bool |
| 32 | + |
| 33 | + # PyLint complains about ClassVar not being subscriptable |
| 34 | + # pylint: disable=unsubscriptable-object |
| 35 | + _run_handlers: ClassVar[Dict[str, FilterFunc]] = { |
| 36 | + "builtin": run_builtin_filter, |
| 37 | + "jsonnet": run_jsonnet_filter, |
| 38 | + } |
| 39 | + # pylint: disable=unsubscriptable-object |
| 40 | + _validate_handlers: ClassVar[Dict[str, ValidateFunc]] = { |
| 41 | + "builtin": validate_builtin_filter, |
| 42 | + "jsonnet": validate_jsonnet_filter, |
| 43 | + } |
| 44 | + # pylint: disable=unsubscriptable-object |
| 45 | + _required_keys: ClassVar[Set[str]] = {"type", "path", "filter"} |
| 46 | + |
| 47 | + def __init__(self, fd: Dict): |
| 48 | + """ |
| 49 | + Assumes that `fd` has been validated with `_validate_filter`. |
| 50 | + """ |
| 51 | + self.type = fd["type"] |
| 52 | + self.filter = fd["filter"] |
| 53 | + self.path = P(fd["path"]) |
| 54 | + self.enabled = fd.get("enabled", True) |
| 55 | + self.filterargs = fd.get("filterargs", {}) |
| 56 | + self._runner: FilterFunc = self._run_handlers[self.type] |
| 57 | + |
| 58 | + def run(self, inventory: Dict, component: str): |
| 59 | + """ |
| 60 | + Run the filter. |
| 61 | + """ |
| 62 | + if not self.enabled: |
| 63 | + click.secho( |
| 64 | + f" > Skipping disabled filter {self.filter} on path {self.path}" |
| 65 | + ) |
| 66 | + return |
| 67 | + |
| 68 | + self._runner(inventory, component, self.filter, self.path, **self.filterargs) |
| 69 | + |
| 70 | + @classmethod |
| 71 | + def validate(cls, cn: str, f: Dict): |
| 72 | + """ |
| 73 | + Validate filter definition in `f`. |
| 74 | + Raises exceptions as appropriate when the definition is invalid. |
| 75 | + Returns the definiton if it validates successfully. |
| 76 | + """ |
| 77 | + if not all(key in f for key in cls._required_keys): |
| 78 | + missing_required_keys = cls._required_keys - f.keys() |
| 79 | + raise KeyError(f"Filter is missing required key(s) {missing_required_keys}") |
| 80 | + |
| 81 | + if "enabled" in f and not isinstance(f["enabled"], bool): |
| 82 | + raise ValueError("Filter key 'enabled' is not a boolean") |
| 83 | + |
| 84 | + typ = f["type"] |
| 85 | + if typ not in cls._validate_handlers: |
| 86 | + raise ValueError(f"Filter has unknown type {typ}") |
| 87 | + |
| 88 | + # perform type-specific extra validation |
| 89 | + cls._validate_handlers[typ](cn, f) |
| 90 | + |
| 91 | + return f |
| 92 | + |
| 93 | + @classmethod |
| 94 | + def from_dict(cls, cn: str, f: Dict): |
| 95 | + """ |
| 96 | + Create Filter object from filter definition dict `f`. |
| 97 | + Raises exceptions as appropriate when the definition is invalid. |
| 98 | + Returns a Filter object if the passed definition validates successfully. |
| 99 | + """ |
| 100 | + return Filter(Filter.validate(cn, f)) |
| 101 | + |
| 102 | + |
| 103 | +def _get_inventory_filters(inventory: Dict[str, Any]) -> List[Dict[str, Any]]: |
| 104 | + """ |
| 105 | + Return list of filters defined in inventory. |
| 106 | +
|
| 107 | + Inventory filters are expected to be defined as a list in |
| 108 | + `parameters.commodore.postprocess.filters`. |
| 109 | + """ |
| 110 | + commodore = inventory["parameters"].get("commodore", {}) |
| 111 | + return commodore.get("postprocess", {}).get("filters", []) |
| 112 | + |
| 113 | + |
| 114 | +def _get_external_filters( |
| 115 | + inventory: Dict[str, Any], c: Component |
| 116 | +) -> List[Dict[str, Any]]: |
| 117 | + filters_file = c.filters_file |
| 118 | + filters = [] |
| 119 | + if filters_file.is_file(): |
| 120 | + _filters = yaml_load(filters_file).get("filters", []) |
| 121 | + for f in _filters: |
| 122 | + # Resolve any inventory references in filter definition |
| 123 | + try: |
| 124 | + f = resolve_inventory_vars(inventory, f) |
| 125 | + except InventoryError as e: |
| 126 | + raise click.ClickException( |
| 127 | + f"Failed to resolve reclass references for external filter: {e}" |
| 128 | + ) from e |
| 129 | + |
| 130 | + # external filters without 'type' always have type 'jsonnet' |
| 131 | + if "type" not in f: |
| 132 | + click.secho( |
| 133 | + " > [WARN] component uses untyped external postprocessing filter", |
| 134 | + fg="yellow", |
| 135 | + ) |
| 136 | + f["type"] = "jsonnet" |
| 137 | + |
| 138 | + if f["type"] == "jsonnet": |
| 139 | + f["path"] = f["output_path"] |
| 140 | + del f["output_path"] |
| 141 | + f["filter"] = str(P("postprocess") / f["filter"]) |
| 142 | + filters.append(f) |
| 143 | + |
| 144 | + return filters |
| 145 | + |
| 146 | + |
| 147 | +def postprocess_components( |
| 148 | + config: Config, |
| 149 | + kapitan_inventory: Dict[str, Dict[str, Any]], |
| 150 | + components: Dict[str, Component], |
| 151 | +): |
14 | 152 | click.secho("Postprocessing...", bold=True) |
| 153 | + |
15 | 154 | for cn, c in components.items(): |
16 | 155 | inventory = kapitan_inventory.get(cn) |
17 | 156 | if not inventory: |
18 | 157 | click.echo(f" > No target exists for component {cn}, skipping...") |
19 | 158 | continue |
20 | 159 |
|
21 | | - filters_file = c.filters_file |
22 | | - if filters_file.is_file(): |
| 160 | + # inventory filters |
| 161 | + invfilters = _get_inventory_filters(inventory) |
| 162 | + |
| 163 | + # "old", external filters |
| 164 | + extfilters = _get_external_filters(inventory, c) |
| 165 | + |
| 166 | + filters: List[Filter] = [] |
| 167 | + for fd in invfilters + extfilters: |
| 168 | + try: |
| 169 | + filters.append(Filter.from_dict(cn, fd)) |
| 170 | + except (KeyError, ValueError) as e: |
| 171 | + click.secho( |
| 172 | + f" > Skipping filter '{fd['filter']}' with invalid definition {fd}: {e}", |
| 173 | + fg="yellow", |
| 174 | + ) |
| 175 | + |
| 176 | + if len(filters) > 0 and config.debug: |
| 177 | + click.echo(f" > {cn}...") |
| 178 | + |
| 179 | + for f in filters: |
23 | 180 | if config.debug: |
24 | | - click.echo(f" > {cn}...") |
25 | | - filters = yaml_load(filters_file) |
26 | | - for f in filters["filters"]: |
27 | | - # Resolve any inventory references in filter definition |
28 | | - try: |
29 | | - f = resolve_inventory_vars(inventory, f) |
30 | | - except InventoryError as e: |
31 | | - raise click.ClickException( |
32 | | - f"Failed to resolve variables for old-style filter: {e}" |
33 | | - ) from e |
34 | | - |
35 | | - # filters without 'type' are always 'jsonnet' |
36 | | - if "type" not in f: |
37 | | - click.secho( |
38 | | - " > [WARN] component uses old-style postprocess filters", |
39 | | - fg="yellow", |
40 | | - ) |
41 | | - f["type"] = "jsonnet" |
42 | | - # Filters which aren't explicitly disabled are always enabled |
43 | | - if "enabled" in f and not f["enabled"]: |
44 | | - click.secho( |
45 | | - " > Skipping disabled filter" |
46 | | - + f" {f['filter']} on path {f['path']}" |
47 | | - ) |
48 | | - continue |
49 | | - if f["type"] == "jsonnet": |
50 | | - run_jsonnet_filter(inventory, cn, filters_file.parent, f) |
51 | | - elif f["type"] == "builtin": |
52 | | - run_builtin_filter(inventory, cn, f) |
53 | | - else: |
54 | | - click.secho( |
55 | | - f" > [WARN] unknown builtin filter {f['filter']}", |
56 | | - fg="yellow", |
57 | | - ) |
| 181 | + click.secho(f" > Executing filter '{f.type}:{f.filter}'") |
| 182 | + f.run(inventory, cn) |
0 commit comments