Skip to content

feat: (Preview) Support automatic load of timedelta from BQ tables. #1429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3733,6 +3733,7 @@ def to_gbq(
default_project=default_project,
)
)

query_job = self._session._executor.export_gbq(
export_array.rename_columns(id_overrides),
destination=destination,
Expand Down
13 changes: 12 additions & 1 deletion bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,12 @@ def convert_schema_field(
pa_struct = pa.struct(fields)
pa_type = pa.list_(pa_struct) if is_repeated else pa_struct
return field.name, pd.ArrowDtype(pa_type)
elif (
field.field_type == "INTEGER"
and field.description is not None
and field.description.endswith(TIMEDELTA_DESCRIPTION_TAG)
):
return field.name, TIMEDELTA_DTYPE
elif field.field_type in _TK_TO_BIGFRAMES:
if is_repeated:
pa_type = pa.list_(
Expand Down Expand Up @@ -719,7 +725,9 @@ def convert_to_schema_field(
)
if bigframes_dtype.pyarrow_dtype == pa.duration("us"):
# Timedeltas are represented as integers in microseconds.
return google.cloud.bigquery.SchemaField(name, "INTEGER")
return google.cloud.bigquery.SchemaField(
name, "INTEGER", description=TIMEDELTA_DESCRIPTION_TAG
)
raise TypeError(
f"No arrow conversion for {bigframes_dtype}. {constants.FEEDBACK_LINK}"
)
Expand Down Expand Up @@ -876,3 +884,6 @@ def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype:
"STRING",
"ARRAY",
}


TIMEDELTA_DESCRIPTION_TAG = "#microseconds"
36 changes: 33 additions & 3 deletions bigframes/session/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import weakref

import google.api_core.exceptions
import google.cloud.bigquery as bigquery
from google.cloud import bigquery
import google.cloud.bigquery.job as bq_job
import google.cloud.bigquery.table as bq_table
import google.cloud.bigquery_storage_v1
Expand All @@ -47,6 +47,7 @@
import bigframes.core.ordering as order
import bigframes.core.schema
import bigframes.core.tree_properties as tree_properties
import bigframes.dtypes
import bigframes.features
import bigframes.session._io.bigquery as bq_io
import bigframes.session.metrics
Expand Down Expand Up @@ -320,6 +321,19 @@ def export_gbq(
sql=sql,
job_config=job_config,
)

has_timedelta_col = any(
t == bigframes.dtypes.TIMEDELTA_DTYPE for t in array_value.schema.dtypes
)

if if_exists != "append" and has_timedelta_col:
# Only update schema if this is not modifying an existing table, and the
# new table contains timedelta columns.
assert query_job.destination is not None
table = self.bqclient.get_table(query_job.destination)
table.schema = array_value.schema.to_bigquery()
self.bqclient.update_table(table, ["schema"])

return query_job

def export_gcs(
Expand Down Expand Up @@ -649,12 +663,28 @@ def _validate_result_schema(
raise ValueError(
f"This error should only occur while testing. BigFrames internal schema: {internal_schema.to_bigquery()} does not match actual schema: {actual_schema}"
)
if ibis_schema.to_bigquery() != actual_schema:
sanitized_schema = _sanitize_for_ibis(actual_schema)
if ibis_schema.to_bigquery() != sanitized_schema:
raise ValueError(
f"This error should only occur while testing. Ibis schema: {ibis_schema.to_bigquery()} does not match actual schema: {actual_schema}"
f"This error should only occur while testing. Ibis schema: {ibis_schema.to_bigquery()} does not match sanitized schema: {sanitized_schema}"
)


def _sanitize_for_ibis(
schema: Tuple[bigquery.SchemaField, ...]
) -> Tuple[bigquery.SchemaField, ...]:
# Schema inferred from Ibis does not contain description field. We only need to compare the names, types and modes.
return tuple(
bigquery.SchemaField(
f.name,
f.field_type,
f.mode, # type:ignore
fields=_sanitize_for_ibis(f.fields),
)
for f in schema
)


def generate_head_plan(node: nodes.BigFrameNode, n: int):
return nodes.SliceNode(node, start=None, stop=n)

Expand Down
55 changes: 55 additions & 0 deletions tests/system/small/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@

import typing

from google.cloud import bigquery

import bigframes
from bigframes import dtypes
import bigframes.dataframe
import bigframes.features
import bigframes.pandas as bpd
Expand Down Expand Up @@ -697,6 +700,58 @@ def test_to_gbq_w_json(bigquery_client):
assert table.schema[1].field_type == "JSON"


def test_to_gbq_with_timedelta(bigquery_client, dataset_id):
destination_table = f"{dataset_id}.test_to_gbq_with_timedelta"
s1 = bpd.Series([1, 2, 3, 4])
s2 = bpd.to_timedelta(bpd.Series([1, 2, 3, 4]), unit="s")
df = bpd.DataFrame({"id": s1, "timedelta_col": s2})

df.to_gbq(destination_table)
table = bigquery_client.get_table(destination_table)

assert table.schema[1].name == "timedelta_col"
assert table.schema[1].field_type == "INTEGER"
assert dtypes.TIMEDELTA_DESCRIPTION_TAG in table.schema[1].description


def test_gbq_round_trip_with_timedelta(session, dataset_id):
destination_table = f"{dataset_id}.test_gbq_roundtrip_with_timedelta"
df = pd.DataFrame(
{
"col_1": [1],
"col_2": [pd.Timedelta(1, "s")],
"col_3": [1.1],
}
)
bpd.DataFrame(df).to_gbq(destination_table)

result = session.read_gbq(destination_table)

assert result["col_1"].dtype == dtypes.INT_DTYPE
assert result["col_2"].dtype == dtypes.TIMEDELTA_DTYPE
assert result["col_3"].dtype == dtypes.FLOAT_DTYPE


def test_to_gbq_timedelta_tag_ignored_when_appending(bigquery_client, dataset_id):
# First, create a table
destination_table = f"{dataset_id}.test_to_gbq_timedelta_tag_ignored_when_appending"
schema = [bigquery.SchemaField("my_col", "INTEGER")]
bigquery_client.create_table(bigquery.Table(destination_table, schema))

# Then, append to that table with timedelta values
df = pd.DataFrame(
{
"my_col": [pd.Timedelta(1, "s")],
}
)
bpd.DataFrame(df).to_gbq(destination_table, if_exists="append")

table = bigquery_client.get_table(destination_table)
assert table.schema[0].name == "my_col"
assert table.schema[0].field_type == "INTEGER"
assert table.schema[0].description is None


@pytest.mark.parametrize(
("index"),
[True, False],
Expand Down