Skip to content
16 changes: 16 additions & 0 deletions bigframes/_config/bigquery_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(
requests_transport_adapters: Sequence[
Tuple[str, requests.adapters.BaseAdapter]
] = (),
enable_polars_execution: bool = False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a compute option so it can be changed at runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of just want to keep it as a constant within-session for now (will need to commit if making this a GA feature though). Turning polars execution on and off mid-session will make things like caching, multi-part execution really tricky

):
self._credentials = credentials
self._project = project
Expand All @@ -113,6 +114,7 @@ def __init__(
client_endpoints_override = {}

self._client_endpoints_override = client_endpoints_override
self._enable_polars_execution = enable_polars_execution

@property
def application_name(self) -> Optional[str]:
Expand Down Expand Up @@ -424,3 +426,17 @@ def requests_transport_adapters(
SESSION_STARTED_MESSAGE.format(attribute="requests_transport_adapters")
)
self._requests_transport_adapters = value

@property
def enable_polars_execution(self) -> bool:
"""If True, will use polars to execute some simple query plans locally."""
return self._enable_polars_execution

@enable_polars_execution.setter
def enable_polars_execution(self, value: bool):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a check that the session has already started? If not (perhaps because we want to safely ignore this if the global session has already started), maybe add a comment for why.

    def enable_polars_execution(self, value: bool):
        if self._session_started and self._enable_polars_execution != value:
            raise ValueError(
                SESSION_STARTED_MESSAGE.format(attribute="enable_polars_execution")
            )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, should error out probably, as a session cannot change once started. error added in new revision

if value is True:
msg = bfe.format_message(
"Polars execution is an experimental feature, and may not be stable. Must have polars installed."
)
warnings.warn(msg, category=bfe.PreviewWarning)
self._enable_polars_execution = value
4 changes: 2 additions & 2 deletions bigframes/core/compile/polars/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,15 @@ class PolarsCompiler:
expr_compiler = PolarsExpressionCompiler()
agg_compiler = PolarsAggregateCompiler()

def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame:
def compile(self, plan: nodes.BigFrameNode) -> pl.LazyFrame:
if not polars_installed:
raise ValueError(
"Polars is not installed, cannot compile to polars engine."
)

# TODO: Create standard way to configure BFET -> BFET rewrites
# Polars has incomplete slice support in lazy mode
node = array_value.node
node = plan
node = bigframes.core.rewrite.column_pruning(node)
node = nodes.bottom_up(node, bigframes.core.rewrite.rewrite_slice)
node = bigframes.core.rewrite.pull_out_window_order(node)
Expand Down
1 change: 1 addition & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def __init__(
storage_manager=self._temp_storage_manager,
strictly_ordered=self._strictly_ordered,
metrics=self._metrics,
enable_polars_execution=context.enable_polars_execution,
)

def __del__(self):
Expand Down
24 changes: 19 additions & 5 deletions bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.features
from bigframes.session import executor, loader, local_scan_executor, read_api_execution
from bigframes.session import (
executor,
loader,
local_scan_executor,
read_api_execution,
semi_executor,
)
import bigframes.session._io.bigquery as bq_io
import bigframes.session.metrics
import bigframes.session.planner
Expand Down Expand Up @@ -146,6 +152,7 @@ def __init__(
*,
strictly_ordered: bool = True,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
enable_polars_execution: bool = False,
):
self.bqclient = bqclient
self.storage_manager = storage_manager
Expand All @@ -154,14 +161,21 @@ def __init__(
self.metrics = metrics
self.loader = loader
self.bqstoragereadclient = bqstoragereadclient
# Simple left-to-right precedence for now
self._semi_executors = (
self._enable_polars_execution = enable_polars_execution
self._semi_executors: Sequence[semi_executor.SemiExecutor] = (
read_api_execution.ReadApiSemiExecutor(
bqstoragereadclient=bqstoragereadclient,
project=self.bqclient.project,
),
local_scan_executor.LocalScanExecutor(),
)
if enable_polars_execution:
from bigframes.session import polars_executor

self._semi_executors = (
*self._semi_executors,
polars_executor.PolarsExecutor(),
)
self._upload_lock = threading.Lock()

def to_sql(
Expand Down Expand Up @@ -636,8 +650,8 @@ def _execute_plan(
"""Just execute whatever plan as is, without further caching or decomposition."""
# First try to execute fast-paths
if not output_spec.require_bq_table:
for semi_executor in self._semi_executors:
maybe_result = semi_executor.execute(plan, ordered=ordered, peek=peek)
for exec in self._semi_executors:
maybe_result = exec.execute(plan, ordered=ordered, peek=peek)
if maybe_result:
return maybe_result

Expand Down
2 changes: 1 addition & 1 deletion bigframes/session/polars_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def execute(
# Note: Ignoring ordered flag, as just executing totally ordered is fine.
try:
lazy_frame: pl.LazyFrame = self._compiler.compile(
array_value.ArrayValue(plan)
array_value.ArrayValue(plan).node
)
except Exception:
return None
Expand Down
4 changes: 2 additions & 2 deletions bigframes/testing/polars_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def peek(
"""
A 'peek' efficiently accesses a small number of rows in the dataframe.
"""
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node)
pa_table = lazy_frame.collect().limit(n_rows).to_arrow()
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
# Nullability may be different, and might use large versions of list, string datatypes.
Expand All @@ -64,7 +64,7 @@ def execute(
"""
Execute the ArrayValue, storing the result to a temporary session-owned table.
"""
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node)
pa_table = lazy_frame.collect().to_arrow()
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
# Nullability may be different, and might use large versions of list, string datatypes.
Expand Down
76 changes: 76 additions & 0 deletions tests/system/small/test_polars_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest

import bigframes
from bigframes.testing.utils import assert_pandas_df_equal

polars = pytest.importorskip("polars", reason="polars is required for this test")


@pytest.fixture(scope="module")
def session_w_polars():
context = bigframes.BigQueryOptions(location="US", enable_polars_execution=True)
session = bigframes.Session(context=context)
yield session
session.close() # close generated session at cleanup time


def test_polar_execution_sorted(session_w_polars, scalars_pandas_df_index):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_result = scalars_pandas_df_index.sort_index(ascending=False)[
["int64_too", "bool_col"]
]
bf_result = bf_df.sort_index(ascending=False)[["int64_too", "bool_col"]].to_pandas()

assert session_w_polars._metrics.execution_count == execution_count_before
assert_pandas_df_equal(bf_result, pd_result)


def test_polar_execution_sorted_filtered(session_w_polars, scalars_pandas_df_index):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_result = scalars_pandas_df_index.sort_index(ascending=False).dropna(
subset=["int64_col", "string_col"]
)
bf_result = (
bf_df.sort_index(ascending=False)
.dropna(subset=["int64_col", "string_col"])
.to_pandas()
)

# Filter and isnull not supported by polar engine yet, so falls back to bq execution
assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
assert_pandas_df_equal(bf_result, pd_result)


def test_polar_execution_unsupported_sql_fallback(
session_w_polars, scalars_pandas_df_index
):
execution_count_before = session_w_polars._metrics.execution_count
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)

pd_df = scalars_pandas_df_index.copy()
pd_df["str_len_col"] = pd_df.string_col.str.len()
pd_result = pd_df

bf_df["str_len_col"] = bf_df.string_col.str.len()
bf_result = bf_df.to_pandas()

# str len not supported by polar engine yet, so falls back to bq execution
assert session_w_polars._metrics.execution_count == (execution_count_before + 1)
assert_pandas_df_equal(bf_result, pd_result)