Skip to content

Commit 9899243

Browse files
authored
Feat!: improve signal CLI UX (#4812)
1 parent ee3878e commit 9899243

File tree

6 files changed

+356
-19
lines changed

6 files changed

+356
-19
lines changed

sqlmesh/core/console.py

Lines changed: 161 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,36 @@ def log_test_results(self, result: ModelTextTestResult, target_dialect: str) ->
330330
"""
331331

332332

333+
class SignalConsole(abc.ABC):
334+
@abc.abstractmethod
335+
def start_signal_progress(
336+
self,
337+
snapshot: Snapshot,
338+
default_catalog: t.Optional[str],
339+
environment_naming_info: EnvironmentNamingInfo,
340+
) -> None:
341+
"""Indicates that signal checking has begun for a snapshot."""
342+
343+
@abc.abstractmethod
344+
def update_signal_progress(
345+
self,
346+
snapshot: Snapshot,
347+
signal_name: str,
348+
signal_idx: int,
349+
total_signals: int,
350+
ready_intervals: Intervals,
351+
check_intervals: Intervals,
352+
duration: float,
353+
) -> None:
354+
"""Updates the signal checking progress."""
355+
356+
@abc.abstractmethod
357+
def stop_signal_progress(self) -> None:
358+
"""Indicates that signal checking has completed for a snapshot."""
359+
360+
333361
class Console(
362+
SignalConsole,
334363
PlanBuilderConsole,
335364
LinterConsole,
336365
StateExporterConsole,
@@ -536,6 +565,29 @@ def update_snapshot_evaluation_progress(
536565
def stop_evaluation_progress(self, success: bool = True) -> None:
537566
pass
538567

568+
def start_signal_progress(
569+
self,
570+
snapshot: Snapshot,
571+
default_catalog: t.Optional[str],
572+
environment_naming_info: EnvironmentNamingInfo,
573+
) -> None:
574+
pass
575+
576+
def update_signal_progress(
577+
self,
578+
snapshot: Snapshot,
579+
signal_name: str,
580+
signal_idx: int,
581+
total_signals: int,
582+
ready_intervals: Intervals,
583+
check_intervals: Intervals,
584+
duration: float,
585+
) -> None:
586+
pass
587+
588+
def stop_signal_progress(self) -> None:
589+
pass
590+
539591
def start_creation_progress(
540592
self,
541593
snapshots: t.List[Snapshot],
@@ -860,6 +912,8 @@ def __init__(
860912
self.table_diff_model_tasks: t.Dict[str, TaskID] = {}
861913
self.table_diff_progress_live: t.Optional[Live] = None
862914

915+
self.signal_status_tree: t.Optional[Tree] = None
916+
863917
self.verbosity = verbosity
864918
self.dialect = dialect
865919
self.ignore_warnings = ignore_warnings
@@ -901,6 +955,9 @@ def start_evaluation_progress(
901955
audit_only: bool = False,
902956
) -> None:
903957
"""Indicates that a new snapshot evaluation/auditing progress has begun."""
958+
# Add a newline to separate signal checking from evaluation
959+
self._print("")
960+
904961
if not self.evaluation_progress_live:
905962
self.evaluation_total_progress = make_progress_bar(
906963
"Executing model batches" if not audit_only else "Auditing models", self.console
@@ -1050,6 +1107,88 @@ def stop_evaluation_progress(self, success: bool = True) -> None:
10501107
self.environment_naming_info = EnvironmentNamingInfo()
10511108
self.default_catalog = None
10521109

1110+
def start_signal_progress(
1111+
self,
1112+
snapshot: Snapshot,
1113+
default_catalog: t.Optional[str],
1114+
environment_naming_info: EnvironmentNamingInfo,
1115+
) -> None:
1116+
"""Indicates that signal checking has begun for a snapshot."""
1117+
display_name = snapshot.display_name(
1118+
environment_naming_info,
1119+
default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
1120+
dialect=self.dialect,
1121+
)
1122+
self.signal_status_tree = Tree(f"Checking signals for {display_name}")
1123+
1124+
def update_signal_progress(
1125+
self,
1126+
snapshot: Snapshot,
1127+
signal_name: str,
1128+
signal_idx: int,
1129+
total_signals: int,
1130+
ready_intervals: Intervals,
1131+
check_intervals: Intervals,
1132+
duration: float,
1133+
) -> None:
1134+
"""Updates the signal checking progress."""
1135+
tree = Tree(f"[{signal_idx + 1}/{total_signals}] {signal_name} {duration:.2f}s")
1136+
1137+
formatted_check_intervals = [_format_signal_interval(snapshot, i) for i in check_intervals]
1138+
formatted_ready_intervals = [_format_signal_interval(snapshot, i) for i in ready_intervals]
1139+
1140+
if not formatted_check_intervals:
1141+
formatted_check_intervals = ["no intervals"]
1142+
if not formatted_ready_intervals:
1143+
formatted_ready_intervals = ["no intervals"]
1144+
1145+
# Color coding to help detect partial interval ranges quickly
1146+
if ready_intervals == check_intervals:
1147+
msg = "All ready"
1148+
color = "green"
1149+
elif ready_intervals:
1150+
msg = "Some ready"
1151+
color = "yellow"
1152+
else:
1153+
msg = "None ready"
1154+
color = "red"
1155+
1156+
if self.verbosity < Verbosity.VERY_VERBOSE:
1157+
num_check_intervals = len(formatted_check_intervals)
1158+
if num_check_intervals > 3:
1159+
formatted_check_intervals = formatted_check_intervals[:3]
1160+
formatted_check_intervals.append(f"... and {num_check_intervals - 3} more")
1161+
1162+
num_ready_intervals = len(formatted_ready_intervals)
1163+
if num_ready_intervals > 3:
1164+
formatted_ready_intervals = formatted_ready_intervals[:3]
1165+
formatted_ready_intervals.append(f"... and {num_ready_intervals - 3} more")
1166+
1167+
check = ", ".join(formatted_check_intervals)
1168+
tree.add(f"Check: {check}")
1169+
1170+
ready = ", ".join(formatted_ready_intervals)
1171+
tree.add(f"[{color}]{msg}: {ready}[/{color}]")
1172+
else:
1173+
check_tree = Tree("Check")
1174+
tree.add(check_tree)
1175+
for interval in formatted_check_intervals:
1176+
check_tree.add(interval)
1177+
1178+
ready_tree = Tree(f"[{color}]{msg}[/{color}]")
1179+
tree.add(ready_tree)
1180+
for interval in formatted_ready_intervals:
1181+
ready_tree.add(f"[{color}]{interval}[/{color}]")
1182+
1183+
if self.signal_status_tree is not None:
1184+
self.signal_status_tree.add(tree)
1185+
1186+
def stop_signal_progress(self) -> None:
1187+
"""Indicates that signal checking has completed for a snapshot."""
1188+
if self.signal_status_tree is not None:
1189+
self._print(self.signal_status_tree)
1190+
self.signal_status_tree = None
1191+
10531192
def start_creation_progress(
10541193
self,
10551194
snapshots: t.List[Snapshot],
@@ -3810,19 +3949,34 @@ def _format_audits_errors(error: NodeAuditsErrors) -> str:
38103949
return " " + "\n".join(error_messages)
38113950

38123951

3952+
def _format_interval(snapshot: Snapshot, interval: Interval) -> str:
3953+
"""Format an interval with an optional prefix."""
3954+
inclusive_interval = make_inclusive(interval[0], interval[1])
3955+
if snapshot.model.interval_unit.is_date_granularity:
3956+
return f"{to_ds(inclusive_interval[0])} - {to_ds(inclusive_interval[1])}"
3957+
3958+
if inclusive_interval[0].date() == inclusive_interval[1].date():
3959+
# omit end date if interval start/end on same day
3960+
return f"{to_ds(inclusive_interval[0])} {inclusive_interval[0].strftime('%H:%M:%S')}-{inclusive_interval[1].strftime('%H:%M:%S')}"
3961+
3962+
return f"{inclusive_interval[0].strftime('%Y-%m-%d %H:%M:%S')} - {inclusive_interval[1].strftime('%Y-%m-%d %H:%M:%S')}"
3963+
3964+
3965+
def _format_signal_interval(snapshot: Snapshot, interval: Interval) -> str:
3966+
"""Format an interval for signal output (without 'insert' prefix)."""
3967+
return _format_interval(snapshot, interval)
3968+
3969+
38133970
def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) -> str:
3971+
"""Format an interval for evaluation output (with 'insert' prefix)."""
38143972
if snapshot.is_model and (
38153973
snapshot.model.kind.is_incremental
38163974
or snapshot.model.kind.is_managed
38173975
or snapshot.model.kind.is_custom
38183976
):
3819-
inclusive_interval = make_inclusive(interval[0], interval[1])
3820-
if snapshot.model.interval_unit.is_date_granularity:
3821-
return f"insert {to_ds(inclusive_interval[0])} - {to_ds(inclusive_interval[1])}"
3822-
# omit end date if interval start/end on same day
3823-
if inclusive_interval[0].date() == inclusive_interval[1].date():
3824-
return f"insert {to_ds(inclusive_interval[0])} {inclusive_interval[0].strftime('%H:%M:%S')}-{inclusive_interval[1].strftime('%H:%M:%S')}"
3825-
return f"insert {inclusive_interval[0].strftime('%Y-%m-%d %H:%M:%S')} - {inclusive_interval[1].strftime('%Y-%m-%d %H:%M:%S')}"
3977+
formatted_interval = _format_interval(snapshot, interval)
3978+
return f"insert {formatted_interval}"
3979+
38263980
return ""
38273981

38283982

sqlmesh/core/scheduler.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ def batch_intervals(
269269
self,
270270
merged_intervals: SnapshotToIntervals,
271271
deployability_index: t.Optional[DeployabilityIndex],
272+
environment_naming_info: EnvironmentNamingInfo,
272273
) -> t.Dict[Snapshot, Intervals]:
273274
dag = snapshots_to_dag(merged_intervals)
274275

@@ -303,7 +304,13 @@ def batch_intervals(
303304
default_catalog=self.default_catalog,
304305
)
305306

306-
intervals = snapshot.check_ready_intervals(intervals, context)
307+
intervals = snapshot.check_ready_intervals(
308+
intervals,
309+
context,
310+
console=self.console,
311+
default_catalog=self.default_catalog,
312+
environment_naming_info=environment_naming_info,
313+
)
307314
unready -= set(intervals)
308315

309316
for parent in snapshot.parents:
@@ -324,10 +331,14 @@ def batch_intervals(
324331
):
325332
batches.append((next_batch[0][0], next_batch[-1][-1]))
326333
next_batch = []
334+
327335
next_batch.append(interval)
336+
328337
if next_batch:
329338
batches.append((next_batch[0][0], next_batch[-1][-1]))
339+
330340
snapshot_batches[snapshot] = batches
341+
331342
return snapshot_batches
332343

333344
def run_merged_intervals(
@@ -359,7 +370,9 @@ def run_merged_intervals(
359370
"""
360371
execution_time = execution_time or now_timestamp()
361372

362-
batched_intervals = self.batch_intervals(merged_intervals, deployability_index)
373+
batched_intervals = self.batch_intervals(
374+
merged_intervals, deployability_index, environment_naming_info
375+
)
363376

364377
self.console.start_evaluation_progress(
365378
batched_intervals,

sqlmesh/core/snapshot/definition.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import sys
4+
import time
45
import typing as t
56
from collections import defaultdict
67
from datetime import datetime, timedelta
@@ -50,6 +51,7 @@
5051
from sqlmesh.utils.pydantic import PydanticModel, field_validator
5152

5253
if t.TYPE_CHECKING:
54+
from sqlmesh.core.console import Console
5355
from sqlglot.dialects.dialect import DialectType
5456
from sqlmesh.core.environment import EnvironmentNamingInfo
5557
from sqlmesh.core.context import ExecutionContext
@@ -965,7 +967,14 @@ def missing_intervals(
965967
model_end_ts,
966968
)
967969

968-
def check_ready_intervals(self, intervals: Intervals, context: ExecutionContext) -> Intervals:
970+
def check_ready_intervals(
971+
self,
972+
intervals: Intervals,
973+
context: ExecutionContext,
974+
console: t.Optional[Console] = None,
975+
default_catalog: t.Optional[str] = None,
976+
environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
977+
) -> Intervals:
969978
"""Returns a list of intervals that are considered ready by the provided signal.
970979
971980
Note that this will handle gaps in the provided intervals. The returned intervals
@@ -979,7 +988,19 @@ def check_ready_intervals(self, intervals: Intervals, context: ExecutionContext)
979988
python_env = self.model.python_env
980989
env = prepare_env(python_env)
981990

982-
for signal_name, kwargs in signals.items():
991+
if console:
992+
console.start_signal_progress(
993+
self,
994+
default_catalog,
995+
environment_naming_info or EnvironmentNamingInfo(),
996+
)
997+
998+
for signal_idx, (signal_name, kwargs) in enumerate(signals.items()):
999+
# Capture intervals before signal check for display
1000+
intervals_to_check = merge_intervals(intervals)
1001+
1002+
signal_start_ts = time.perf_counter()
1003+
9831004
try:
9841005
intervals = _check_ready_intervals(
9851006
env[signal_name],
@@ -996,6 +1017,23 @@ def check_ready_intervals(self, intervals: Intervals, context: ExecutionContext)
9961017
f"{e} '{signal_name}' for '{self.model.name}' at {self.model._path}"
9971018
)
9981019

1020+
duration = time.perf_counter() - signal_start_ts
1021+
1022+
if console:
1023+
console.update_signal_progress(
1024+
snapshot=self,
1025+
signal_name=signal_name,
1026+
signal_idx=signal_idx,
1027+
total_signals=len(signals),
1028+
ready_intervals=merge_intervals(intervals),
1029+
check_intervals=intervals_to_check,
1030+
duration=duration,
1031+
)
1032+
1033+
# Stop signal progress tracking
1034+
if console:
1035+
console.stop_signal_progress()
1036+
9991037
return intervals
10001038

10011039
def categorize_as(self, category: SnapshotChangeCategory) -> None:

0 commit comments

Comments
 (0)