Skip to content

Feat!: improve signal CLI UX #4812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 161 additions & 7 deletions sqlmesh/core/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,36 @@ def log_test_results(self, result: ModelTextTestResult, target_dialect: str) ->
"""


class SignalConsole(abc.ABC):
@abc.abstractmethod
def start_signal_progress(
self,
snapshot: Snapshot,
default_catalog: t.Optional[str],
environment_naming_info: EnvironmentNamingInfo,
) -> None:
"""Indicates that signal checking has begun for a snapshot."""

@abc.abstractmethod
def update_signal_progress(
self,
snapshot: Snapshot,
signal_name: str,
signal_idx: int,
total_signals: int,
ready_intervals: Intervals,
check_intervals: Intervals,
duration: float,
) -> None:
"""Updates the signal checking progress."""

@abc.abstractmethod
def stop_signal_progress(self) -> None:
"""Indicates that signal checking has completed for a snapshot."""


class Console(
SignalConsole,
PlanBuilderConsole,
LinterConsole,
StateExporterConsole,
Expand Down Expand Up @@ -536,6 +565,29 @@ def update_snapshot_evaluation_progress(
def stop_evaluation_progress(self, success: bool = True) -> None:
pass

def start_signal_progress(
self,
snapshot: Snapshot,
default_catalog: t.Optional[str],
environment_naming_info: EnvironmentNamingInfo,
) -> None:
pass

def update_signal_progress(
self,
snapshot: Snapshot,
signal_name: str,
signal_idx: int,
total_signals: int,
ready_intervals: Intervals,
check_intervals: Intervals,
duration: float,
) -> None:
pass

def stop_signal_progress(self) -> None:
pass

def start_creation_progress(
self,
snapshots: t.List[Snapshot],
Expand Down Expand Up @@ -860,6 +912,8 @@ def __init__(
self.table_diff_model_tasks: t.Dict[str, TaskID] = {}
self.table_diff_progress_live: t.Optional[Live] = None

self.signal_status_tree: t.Optional[Tree] = None

self.verbosity = verbosity
self.dialect = dialect
self.ignore_warnings = ignore_warnings
Expand Down Expand Up @@ -901,6 +955,9 @@ def start_evaluation_progress(
audit_only: bool = False,
) -> None:
"""Indicates that a new snapshot evaluation/auditing progress has begun."""
# Add a newline to separate signal checking from evaluation
self._print("")

if not self.evaluation_progress_live:
self.evaluation_total_progress = make_progress_bar(
"Executing model batches" if not audit_only else "Auditing models", self.console
Expand Down Expand Up @@ -1050,6 +1107,88 @@ def stop_evaluation_progress(self, success: bool = True) -> None:
self.environment_naming_info = EnvironmentNamingInfo()
self.default_catalog = None

def start_signal_progress(
self,
snapshot: Snapshot,
default_catalog: t.Optional[str],
environment_naming_info: EnvironmentNamingInfo,
) -> None:
"""Indicates that signal checking has begun for a snapshot."""
display_name = snapshot.display_name(
environment_naming_info,
default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
dialect=self.dialect,
)
self.signal_status_tree = Tree(f"Checking signals for {display_name}")

def update_signal_progress(
self,
snapshot: Snapshot,
signal_name: str,
signal_idx: int,
total_signals: int,
ready_intervals: Intervals,
check_intervals: Intervals,
duration: float,
) -> None:
"""Updates the signal checking progress."""
tree = Tree(f"[{signal_idx + 1}/{total_signals}] {signal_name} {duration:.2f}s")
Copy link
Contributor Author

@georgesittas georgesittas Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaving the right-justification of the duration for a followup PR (if we think it's worth it), as it will require some width calculations similar to what we do in the evaluation stage.


formatted_check_intervals = [_format_signal_interval(snapshot, i) for i in check_intervals]
formatted_ready_intervals = [_format_signal_interval(snapshot, i) for i in ready_intervals]

if not formatted_check_intervals:
formatted_check_intervals = ["no intervals"]
if not formatted_ready_intervals:
formatted_ready_intervals = ["no intervals"]

# Color coding to help detect partial interval ranges quickly
if ready_intervals == check_intervals:
msg = "All ready"
color = "green"
elif ready_intervals:
msg = "Some ready"
color = "yellow"
else:
msg = "None ready"
color = "red"

if self.verbosity < Verbosity.VERY_VERBOSE:
num_check_intervals = len(formatted_check_intervals)
if num_check_intervals > 3:
formatted_check_intervals = formatted_check_intervals[:3]
formatted_check_intervals.append(f"... and {num_check_intervals - 3} more")

num_ready_intervals = len(formatted_ready_intervals)
if num_ready_intervals > 3:
formatted_ready_intervals = formatted_ready_intervals[:3]
formatted_ready_intervals.append(f"... and {num_ready_intervals - 3} more")

check = ", ".join(formatted_check_intervals)
tree.add(f"Check: {check}")

ready = ", ".join(formatted_ready_intervals)
tree.add(f"[{color}]{msg}: {ready}[/{color}]")
else:
check_tree = Tree("Check")
tree.add(check_tree)
for interval in formatted_check_intervals:
check_tree.add(interval)

ready_tree = Tree(f"[{color}]{msg}[/{color}]")
tree.add(ready_tree)
for interval in formatted_ready_intervals:
ready_tree.add(f"[{color}]{interval}[/{color}]")

if self.signal_status_tree is not None:
self.signal_status_tree.add(tree)

def stop_signal_progress(self) -> None:
"""Indicates that signal checking has completed for a snapshot."""
if self.signal_status_tree is not None:
self._print(self.signal_status_tree)
self.signal_status_tree = None

def start_creation_progress(
self,
snapshots: t.List[Snapshot],
Expand Down Expand Up @@ -3810,19 +3949,34 @@ def _format_audits_errors(error: NodeAuditsErrors) -> str:
return " " + "\n".join(error_messages)


def _format_interval(snapshot: Snapshot, interval: Interval) -> str:
"""Format an interval with an optional prefix."""
inclusive_interval = make_inclusive(interval[0], interval[1])
if snapshot.model.interval_unit.is_date_granularity:
return f"{to_ds(inclusive_interval[0])} - {to_ds(inclusive_interval[1])}"

if inclusive_interval[0].date() == inclusive_interval[1].date():
# omit end date if interval start/end on same day
return f"{to_ds(inclusive_interval[0])} {inclusive_interval[0].strftime('%H:%M:%S')}-{inclusive_interval[1].strftime('%H:%M:%S')}"

return f"{inclusive_interval[0].strftime('%Y-%m-%d %H:%M:%S')} - {inclusive_interval[1].strftime('%Y-%m-%d %H:%M:%S')}"


def _format_signal_interval(snapshot: Snapshot, interval: Interval) -> str:
"""Format an interval for signal output (without 'insert' prefix)."""
return _format_interval(snapshot, interval)


def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) -> str:
"""Format an interval for evaluation output (with 'insert' prefix)."""
if snapshot.is_model and (
snapshot.model.kind.is_incremental
or snapshot.model.kind.is_managed
or snapshot.model.kind.is_custom
):
inclusive_interval = make_inclusive(interval[0], interval[1])
if snapshot.model.interval_unit.is_date_granularity:
return f"insert {to_ds(inclusive_interval[0])} - {to_ds(inclusive_interval[1])}"
# omit end date if interval start/end on same day
if inclusive_interval[0].date() == inclusive_interval[1].date():
return f"insert {to_ds(inclusive_interval[0])} {inclusive_interval[0].strftime('%H:%M:%S')}-{inclusive_interval[1].strftime('%H:%M:%S')}"
return f"insert {inclusive_interval[0].strftime('%Y-%m-%d %H:%M:%S')} - {inclusive_interval[1].strftime('%Y-%m-%d %H:%M:%S')}"
formatted_interval = _format_interval(snapshot, interval)
return f"insert {formatted_interval}"

return ""


Expand Down
17 changes: 15 additions & 2 deletions sqlmesh/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ def batch_intervals(
self,
merged_intervals: SnapshotToIntervals,
deployability_index: t.Optional[DeployabilityIndex],
environment_naming_info: EnvironmentNamingInfo,
) -> t.Dict[Snapshot, Intervals]:
dag = snapshots_to_dag(merged_intervals)

Expand Down Expand Up @@ -303,7 +304,13 @@ def batch_intervals(
default_catalog=self.default_catalog,
)

intervals = snapshot.check_ready_intervals(intervals, context)
intervals = snapshot.check_ready_intervals(
intervals,
context,
console=self.console,
default_catalog=self.default_catalog,
environment_naming_info=environment_naming_info,
)
unready -= set(intervals)

for parent in snapshot.parents:
Expand All @@ -324,10 +331,14 @@ def batch_intervals(
):
batches.append((next_batch[0][0], next_batch[-1][-1]))
next_batch = []

next_batch.append(interval)

if next_batch:
batches.append((next_batch[0][0], next_batch[-1][-1]))

snapshot_batches[snapshot] = batches

return snapshot_batches

def run_merged_intervals(
Expand Down Expand Up @@ -359,7 +370,9 @@ def run_merged_intervals(
"""
execution_time = execution_time or now_timestamp()

batched_intervals = self.batch_intervals(merged_intervals, deployability_index)
batched_intervals = self.batch_intervals(
merged_intervals, deployability_index, environment_naming_info
)

self.console.start_evaluation_progress(
batched_intervals,
Expand Down
42 changes: 40 additions & 2 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import sys
import time
import typing as t
from collections import defaultdict
from datetime import datetime, timedelta
Expand Down Expand Up @@ -50,6 +51,7 @@
from sqlmesh.utils.pydantic import PydanticModel, field_validator

if t.TYPE_CHECKING:
from sqlmesh.core.console import Console
from sqlglot.dialects.dialect import DialectType
from sqlmesh.core.environment import EnvironmentNamingInfo
from sqlmesh.core.context import ExecutionContext
Expand Down Expand Up @@ -965,7 +967,14 @@ def missing_intervals(
model_end_ts,
)

def check_ready_intervals(self, intervals: Intervals, context: ExecutionContext) -> Intervals:
def check_ready_intervals(
self,
intervals: Intervals,
context: ExecutionContext,
console: t.Optional[Console] = None,
default_catalog: t.Optional[str] = None,
environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
) -> Intervals:
"""Returns a list of intervals that are considered ready by the provided signal.

Note that this will handle gaps in the provided intervals. The returned intervals
Expand All @@ -979,7 +988,19 @@ def check_ready_intervals(self, intervals: Intervals, context: ExecutionContext)
python_env = self.model.python_env
env = prepare_env(python_env)

for signal_name, kwargs in signals.items():
if console:
console.start_signal_progress(
self,
default_catalog,
environment_naming_info or EnvironmentNamingInfo(),
)

for signal_idx, (signal_name, kwargs) in enumerate(signals.items()):
# Capture intervals before signal check for display
intervals_to_check = merge_intervals(intervals)

signal_start_ts = time.perf_counter()

try:
intervals = _check_ready_intervals(
env[signal_name],
Expand All @@ -996,6 +1017,23 @@ def check_ready_intervals(self, intervals: Intervals, context: ExecutionContext)
f"{e} '{signal_name}' for '{self.model.name}' at {self.model._path}"
)

duration = time.perf_counter() - signal_start_ts

if console:
console.update_signal_progress(
snapshot=self,
signal_name=signal_name,
signal_idx=signal_idx,
total_signals=len(signals),
ready_intervals=merge_intervals(intervals),
check_intervals=intervals_to_check,
duration=duration,
)

# Stop signal progress tracking
if console:
console.stop_signal_progress()

return intervals

def categorize_as(self, category: SnapshotChangeCategory) -> None:
Expand Down
Loading