-
Notifications
You must be signed in to change notification settings - Fork 99
feat: convert StreamedResultSet to Pandas Dataframe #226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
483e9fb
362c143
fc5ab60
cb5733a
0aee1d8
e7a4d4c
537d498
b4d715e
f219b0d
021b3c2
a584fa0
a83a7ce
be5c241
4057e76
ad1d039
f460174
7906b14
f919276
0028b7b
0104348
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
# Copyright 2021 Google LLC | ||
|
||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
|
||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Helper module for working with the pandas library.""" | ||
|
||
try: | ||
import pandas | ||
|
||
pandas_import_error = None | ||
except ImportError as err: | ||
pandas = None | ||
pandas_import_error = err | ||
|
||
|
||
def check_pandas_import(): | ||
if pandas is None: | ||
raise ImportError( | ||
"The pandas module is required for this method.\n" | ||
"Try running 'pip3 install pandas'" | ||
) from pandas_import_error | ||
|
||
|
||
def to_dataframe(result_set): | ||
"""This functions converts the query results into pandas dataframe | ||
|
||
:type result_set: :class:`~google.cloud.spanner_v1.StreamedResultSet` | ||
:param result_set: complete response data returned from a read/query | ||
|
||
:rtype: pandas.DataFrame | ||
:returns: Dataframe with the help of a mapping dictionary which maps every spanner datatype to a pandas compatible datatype. | ||
""" | ||
check_pandas_import() | ||
|
||
# Download all results first, so that the fields property is populated. | ||
data = list(result_set) | ||
|
||
columns_dict = {} | ||
column_list = [] | ||
for item in result_set.fields: | ||
column_list.append(item.name) | ||
columns_dict[item.name] = item.type_.code | ||
|
||
# Creating dataframe using column headers and list of data rows | ||
df = pandas.DataFrame(data, columns=column_list) | ||
|
||
# Convert TIMESTAMP and DATE columns to appropriate type. The | ||
# datetime64[ns, UTC] dtype is null-safe. | ||
for k, v in columns_dict.items(): | ||
if v.name == "TIMESTAMP" or v.name == "DATE": | ||
try: | ||
df[k] = df[k].to_datetime() | ||
except Exception as e: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we please use a more specific exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, please add some comments about why we have these two different methods of converting to datetime. I don't understand it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also also, please add back in the "localize" logic to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also also also, we need tests for the For example, in pandas-gbq we test that the dataframe we got matches one that we manually created and localized in the test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
added |
||
df[k]=df[k].astype('datetime64[ns]') | ||
df[k]=df[k].dt.tz_localize("UTC") | ||
|
||
return df |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
# Copyright 2021 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from google.cloud import spanner_v1 | ||
from google.auth.credentials import AnonymousCredentials | ||
import pytest | ||
|
||
# for referrence | ||
TABLE_NAME = "testTable" | ||
COLUMNS = ["id", "name"] | ||
VALUES = [[1, "Alice"], [2, "Bob"]] | ||
|
||
|
||
TABLE_NAME_1 = "Functional_Alltypes" | ||
COLUMNS_1 = ["id", "bool_col", "date", "float_col", "string_col", "timestamp_col"] | ||
VALUES_1 = [ | ||
[1, True, "2016-02-09", 2.2, "David", "2002-02-10T15:30:00.45Z"], | ||
[2, False, "2016-10-10", 2.5, "Ryan", "2009-02-12T10:06:00.45Z"], | ||
[10, True, "2019-01-06", None, None, None], | ||
[12, True, "2018-02-02", 2.6, None, None], | ||
[None, None, None, None, None, None], | ||
] | ||
|
||
|
||
@pytest.fixture | ||
def snapshot_obj(): | ||
try: | ||
spanner_client = spanner_v1.Client( | ||
project="test-project", | ||
client_options={"api_endpoint": "0.0.0.0:9010"}, | ||
credentials=AnonymousCredentials(), | ||
) | ||
instance_id = "test-instance" | ||
instance = spanner_client.instance(instance_id) | ||
database_id = "test-database" | ||
database = instance.database(database_id) | ||
with database.snapshot() as snapshot: | ||
return snapshot | ||
|
||
except: | ||
pytest.skip("Cloud Spanner Emulator configuration is incorrect") | ||
|
||
|
||
@pytest.mark.parametrize(("limit"), [(0), (1), (2)]) | ||
def test_df(limit, snapshot_obj): | ||
results = snapshot_obj.execute_sql( | ||
"Select * from testTable limit {limit}".format(limit=limit) | ||
) | ||
df = results.to_dataframe() | ||
assert len(df) == limit | ||
|
||
|
||
@pytest.mark.parametrize(("value"), [2]) | ||
def test_rows_with_no_null_values(value, snapshot_obj): | ||
results = snapshot_obj.execute_sql( | ||
"Select * from Functional_Alltypes where id IS NOT NULL AND bool_col IS NOT NULL AND date IS NOT NULL and float_col IS NOT NULL and string_col IS NOT NULL and timestamp_col IS NOT NULL " | ||
) | ||
df = results.to_dataframe() | ||
assert len(df) == value | ||
|
||
|
||
@pytest.mark.parametrize(("value"), [2]) | ||
def test_rows_with_one_or_more_null_values(value, snapshot_obj): | ||
results = snapshot_obj.execute_sql( | ||
"Select * from Functional_Alltypes where id IS NOT NULL AND string_col IS NULL AND timestamp_col IS NULL " | ||
) | ||
df = results.to_dataframe() | ||
assert len(df) == value | ||
|
||
|
||
@pytest.mark.parametrize(("value"), [1]) | ||
def test_rows_with_all_null_values(value, snapshot_obj): | ||
results = snapshot_obj.execute_sql( | ||
"Select * from Functional_Alltypes where id IS NULL AND bool_col IS NULL AND date IS NULL and float_col IS NULL and string_col IS NULL and timestamp_col IS NULL " | ||
) | ||
df = results.to_dataframe() | ||
assert len(df) == value |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
# Copyright 2021 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
import unittest | ||
|
||
|
||
class TestPandasDataFrame(unittest.TestCase): | ||
def _getTargetClass(self): | ||
from google.cloud.spanner_v1.streamed import StreamedResultSet | ||
|
||
return StreamedResultSet | ||
|
||
def _make_one(self, *args, **kwargs): | ||
return self._getTargetClass()(*args, **kwargs) | ||
|
||
@staticmethod | ||
def _make_scalar_field(name, type_): | ||
from google.cloud.spanner_v1 import StructType | ||
from google.cloud.spanner_v1 import Type | ||
|
||
return StructType.Field(name=name, type_=Type(code=type_)) | ||
|
||
@staticmethod | ||
def _make_value(value): | ||
from google.cloud.spanner_v1._helpers import _make_value_pb | ||
|
||
return _make_value_pb(value) | ||
|
||
@staticmethod | ||
def _make_result_set_metadata(fields=(), transaction_id=None): | ||
from google.cloud.spanner_v1 import ResultSetMetadata | ||
from google.cloud.spanner_v1 import StructType | ||
|
||
metadata = ResultSetMetadata(row_type=StructType(fields=[])) | ||
for field in fields: | ||
metadata.row_type.fields.append(field) | ||
if transaction_id is not None: | ||
metadata.transaction.id = transaction_id | ||
return metadata | ||
|
||
@staticmethod | ||
def _make_result_set_stats(query_plan=None, **kw): | ||
from google.cloud.spanner_v1 import ResultSetStats | ||
from google.protobuf.struct_pb2 import Struct | ||
from google.cloud.spanner_v1._helpers import _make_value_pb | ||
|
||
query_stats = Struct( | ||
fields={key: _make_value_pb(value) for key, value in kw.items()} | ||
) | ||
return ResultSetStats(query_plan=query_plan, query_stats=query_stats) | ||
|
||
def test_multiple_rows(self): | ||
from google.cloud.spanner_v1 import TypeCode | ||
|
||
iterator = _MockCancellableIterator() | ||
streamed = self._make_one(iterator) | ||
FIELDS = [ | ||
self._make_scalar_field("Name", TypeCode.STRING), | ||
self._make_scalar_field("Age", TypeCode.INT64), | ||
] | ||
metadata = streamed._metadata = self._make_result_set_metadata(FIELDS) | ||
stats = streamed._stats = self._make_result_set_stats() | ||
streamed._rows[:] = [["Alice", 1], ["Bob", 2], ["Adam", 3]] | ||
df_obj = streamed.to_dataframe() | ||
assert len(df_obj) == 3 | ||
|
||
def test_single_rows(self): | ||
from google.cloud.spanner_v1 import TypeCode | ||
|
||
iterator = _MockCancellableIterator() | ||
streamed = self._make_one(iterator) | ||
FIELDS = [ | ||
self._make_scalar_field("Name", TypeCode.STRING), | ||
self._make_scalar_field("Age", TypeCode.INT64), | ||
] | ||
metadata = streamed._metadata = self._make_result_set_metadata(FIELDS) | ||
stats = streamed._stats = self._make_result_set_stats() | ||
streamed._rows[:] = [["Alice", 1]] | ||
df_obj = streamed.to_dataframe() | ||
assert len(df_obj) == 1 | ||
|
||
def test_no_rows(self): | ||
from google.cloud.spanner_v1 import TypeCode | ||
|
||
iterator = _MockCancellableIterator() | ||
streamed = self._make_one(iterator) | ||
FIELDS = [ | ||
self._make_scalar_field("Name", TypeCode.STRING), | ||
self._make_scalar_field("Age", TypeCode.INT64), | ||
] | ||
metadata = streamed._metadata = self._make_result_set_metadata(FIELDS) | ||
stats = streamed._stats = self._make_result_set_stats() | ||
streamed._rows[:] = [] | ||
df_obj = streamed.to_dataframe() | ||
assert len(df_obj) == 0 | ||
|
||
|
||
class _MockCancellableIterator(object): | ||
|
||
cancel_calls = 0 | ||
|
||
def __init__(self, *values): | ||
self.iter_values = iter(values) | ||
|
||
def next(self): | ||
return next(self.iter_values) | ||
|
||
def __next__(self): # pragma: NO COVER Py3k | ||
return self.next() |
Uh oh!
There was an error while loading. Please reload this page.