Skip to content

Commit 7a51eb9

Browse files
committed
FIX-#1765: Fix support of s3 in read_parquet
Signed-off-by: Alexey Prutskov <[email protected]>
1 parent 55e3459 commit 7a51eb9

File tree

5 files changed

+54
-5
lines changed

5 files changed

+54
-5
lines changed

modin/backends/pandas/parsers.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,17 @@ class PandasParquetParser(PandasParser):
355355
def parse(fname, **kwargs):
356356
num_splits = kwargs.pop("num_splits", None)
357357
columns = kwargs.get("columns", None)
358+
if fname.startswith("s3://"):
359+
from botocore.exceptions import NoCredentialsError
360+
import s3fs
361+
362+
try:
363+
fs = s3fs.S3FileSystem()
364+
fname = fs.open(fname)
365+
except NoCredentialsError:
366+
fs = s3fs.S3FileSystem(anon=True)
367+
fname = fs.open(fname)
368+
358369
if num_splits is None:
359370
return pandas.read_parquet(fname, **kwargs)
360371
kwargs["use_pandas_metadata"] = True

modin/engines/base/frame/data.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,14 +285,14 @@ def _filter_empties(self):
285285
[
286286
self._partitions[i][j]
287287
for j in range(len(self._partitions[i]))
288-
if j < len(self._column_widths) and self._column_widths[j] > 0
288+
if j < len(self._column_widths) and self._column_widths[j] != 0
289289
]
290290
for i in range(len(self._partitions))
291-
if i < len(self._row_lengths) and self._row_lengths[i] > 0
291+
if i < len(self._row_lengths) and self._row_lengths[i] != 0
292292
]
293293
)
294-
self._column_widths_cache = [w for w in self._column_widths if w > 0]
295-
self._row_lengths_cache = [r for r in self._row_lengths if r > 0]
294+
self._column_widths_cache = [w for w in self._column_widths if w != 0]
295+
self._row_lengths_cache = [r for r in self._row_lengths if r != 0]
296296

297297
def _validate_axis_equality(self, axis: int, force: bool = False):
298298
"""

modin/engines/base/io/column_stores/parquet_reader.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# governing permissions and limitations under the License.
1313

1414
import os
15+
import s3fs
1516

1617
from modin.engines.base.io.column_stores.column_store_reader import ColumnStoreReader
1718
from modin.error_message import ErrorMessage
@@ -48,7 +49,7 @@ def _read(cls, path, engine, columns, **kwargs):
4849
from pyarrow.parquet import ParquetFile, ParquetDataset
4950
from modin.pandas.io import PQ_INDEX_REGEX
5051

51-
if os.path.isdir(path):
52+
if isinstance(path, str) and os.path.isdir(path):
5253
partitioned_columns = set()
5354
directory = True
5455
# We do a tree walk of the path directory because partitioned
@@ -84,6 +85,22 @@ def _read(cls, path, engine, columns, **kwargs):
8485
pd = ParquetDataset(path, filesystem=fs)
8586
meta = pd.metadata
8687
column_names = pd.schema.names
88+
elif isinstance(path, s3fs.S3File) or (
89+
isinstance(path, str) and path.startswith("s3://")
90+
):
91+
from botocore.exceptions import NoCredentialsError
92+
93+
if isinstance(path, s3fs.S3File):
94+
bucket_path = path.url().split(".s3.amazonaws.com")
95+
path = "s3://" + bucket_path[0].split("://")[1] + bucket_path[1]
96+
try:
97+
fs = s3fs.S3FileSystem()
98+
pd = ParquetDataset(path, filesystem=fs)
99+
except NoCredentialsError:
100+
fs = s3fs.S3FileSystem(anon=True)
101+
pd = ParquetDataset(path, filesystem=fs)
102+
meta = pd.metadata
103+
column_names = pd.schema.names
87104
else:
88105
meta = ParquetFile(path).metadata
89106
column_names = meta.schema.names

modin/pandas/test/test_io.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,26 @@ def test_from_csv_s3(make_csv_file):
10521052
df_equals(modin_df, pandas_df)
10531053

10541054

1055+
@pytest.mark.skipif(
1056+
Engine.get() == "Python",
1057+
reason="S3-like path doesn't support in pandas with anonymous credentials. See issue #2301.",
1058+
)
1059+
def test_read_parquet_s3():
1060+
import s3fs
1061+
1062+
# Pandas currently supports only default credentials for boto therefore
1063+
# we use S3FileSystem with `anon=True` for to make testing possible.
1064+
dataset_url = "s3://aws-roda-hcls-datalake/chembl_27/chembl_27_public_tissue_dictionary/part-00000-66508102-96fa-4fd9-a0fd-5bc072a74293-c000.snappy.parquet"
1065+
fs = s3fs.S3FileSystem(anon=True)
1066+
pandas_df = pandas.read_parquet(fs.open(dataset_url, "rb"))
1067+
modin_df_s3fs = pd.read_parquet(fs.open(dataset_url, "rb"))
1068+
df_equals(pandas_df, modin_df_s3fs)
1069+
1070+
# Modin supports default and anonymous credentials and resolves this internally.
1071+
modin_df_s3 = pd.read_parquet(dataset_url)
1072+
df_equals(pandas_df, modin_df_s3)
1073+
1074+
10551075
def test_from_csv_default(make_csv_file):
10561076
# We haven't implemented read_csv from https, but if it's implemented, then this needs to change
10571077
dataset_url = "https://raw.githubusercontent.com/modin-project/modin/master/modin/pandas/test/data/blah.csv"

requirements/env_omnisci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ dependencies:
1212
- coverage<5.0
1313
- pygithub==1.53
1414
- omniscidbe4py
15+
- s3fs>=0.4.2
1516
- pip:
1617
- ray>=1.0.0

0 commit comments

Comments
 (0)