Skip to content

Switching to use FileOpener instead of FileLoader #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ The stack of DataPipes can then be constructed in functional form:

```py
>>> import torch.utils.data.datapipes as dp
>>> datapipes1 = dp.iter.FileLoader(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2)
>>> datapipes1 = dp.iter.FileOpener(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2)

>>> datapipes2 = dp.iter.FileLoader(['a.file', 'b.file'])
>>> datapipes2 = dp.iter.FileOpener(['a.file', 'b.file'])
>>> datapipes2 = dp.iter.Mapper(datapipes2)
>>> datapipes2 = dp.iter.Shuffler(datapipes2)
>>> datapipes2 = dp.iter.Batcher(datapipes2, 2)
Expand Down Expand Up @@ -259,7 +259,7 @@ Then, the pipeline can be assembled as follows:

>>> FOLDER = 'path/2/csv/folder'
>>> datapipe = dp.iter.FileLister([FOLDER]).filter(fn=lambda filename: filename.endswith('.csv'))
>>> datapipe = dp.iter.FileLoader(datapipe, mode='rt')
>>> datapipe = dp.iter.FileOpener(datapipe, mode='rt')
>>> datapipe = datapipe.parse_csv_files(delimiter=' ')

>>> for d in datapipe: # Start loading data
Expand Down
413 changes: 200 additions & 213 deletions examples/text/CC100.ipynb

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions examples/text/amazonreviewpolarity.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Facebook, Inc. and its affiliates.
import os

from torchdata.datapipes.iter import FileLoader, GDriveReader, IterableWrapper
from torchdata.datapipes.iter import FileOpener, GDriveReader, IterableWrapper

from .utils import _add_docstring_header, _create_dataset_directory, _wrap_split_argument

Expand Down Expand Up @@ -43,7 +43,7 @@ def AmazonReviewPolarity(root, split):
)
cache_dp = GDriveReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True)

cache_dp = FileLoader(cache_dp)
cache_dp = FileOpener(cache_dp, mode="b")

# stack TAR extractor on top of loader DP
extracted_files = cache_dp.read_from_tar()
Expand Down
4 changes: 2 additions & 2 deletions examples/text/imdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from pathlib import Path

from torchdata.datapipes.iter import FileLoader, HttpReader, IterableWrapper
from torchdata.datapipes.iter import FileOpener, HttpReader, IterableWrapper

from .utils import _add_docstring_header, _create_dataset_directory, _wrap_split_argument

Expand Down Expand Up @@ -40,7 +40,7 @@ def IMDB(root, split):
)
cache_dp = HttpReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True)

cache_dp = FileLoader(cache_dp)
cache_dp = FileOpener(cache_dp, mode="b")

# stack TAR extractor on top of load files data pipe
extracted_files = cache_dp.read_from_tar()
Expand Down
4 changes: 2 additions & 2 deletions examples/text/squad1.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Facebook, Inc. and its affiliates.
import os

from torchdata.datapipes.iter import FileLoader, HttpReader, IterableWrapper, IterDataPipe
from torchdata.datapipes.iter import FileOpener, HttpReader, IterableWrapper, IterDataPipe

from .utils import _add_docstring_header, _create_dataset_directory, _wrap_split_argument

Expand Down Expand Up @@ -61,7 +61,7 @@ def SQuAD1(root, split):
)
cache_dp = HttpReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True)

cache_dp = FileLoader(cache_dp)
cache_dp = FileOpener(cache_dp, mode="b")

# stack custom data pipe on top of JSON reader to orchestrate data samples for Q&A dataset
return _ParseSQuADQAData(cache_dp.parse_json_files())
4 changes: 2 additions & 2 deletions examples/text/squad2.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Facebook, Inc. and its affiliates.
import os

from torchdata.datapipes.iter import FileLoader, HttpReader, IterableWrapper, IterDataPipe
from torchdata.datapipes.iter import FileOpener, HttpReader, IterableWrapper, IterDataPipe

from .utils import _add_docstring_header, _create_dataset_directory, _wrap_split_argument

Expand Down Expand Up @@ -61,7 +61,7 @@ def SQuAD2(root, split):
)
cache_dp = HttpReader(cache_dp).end_caching(mode="wb", same_filepath_fn=True)

cache_dp = FileLoader(cache_dp)
cache_dp = FileOpener(cache_dp, mode="b")

# stack custom data pipe on top of JSON reader to orchestrate data samples for Q&A dataset
return _ParseSQuADQAData(cache_dp.parse_json_files())
6 changes: 3 additions & 3 deletions examples/vision/caltech101.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import torch
from torch.utils.data.datapipes.utils.decoder import imagehandler, mathandler
from torchdata.datapipes.iter import (
FileLoader,
FileOpener,
Filter,
IterableWrapper,
IterKeyZipper,
Expand Down Expand Up @@ -89,14 +89,14 @@ def collate_sample(data):

def Caltech101(root=ROOT):
anns_dp = IterableWrapper([os.path.join(root, "Annotations.tar")])
anns_dp = FileLoader(anns_dp)
anns_dp = FileOpener(anns_dp, mode="b")
anns_dp = TarArchiveReader(anns_dp)
anns_dp = Filter(anns_dp, is_ann)
anns_dp = RoutedDecoder(anns_dp, mathandler())
anns_dp = Mapper(anns_dp, collate_ann)

images_dp = IterableWrapper([os.path.join(root, "101_ObjectCategories.tar.gz")])
images_dp = FileLoader(images_dp)
images_dp = FileOpener(images_dp, mode="b")
images_dp = TarArchiveReader(images_dp)
images_dp = Filter(images_dp, is_not_background_image)
images_dp = Filter(images_dp, is_not_rogue_image)
Expand Down
4 changes: 2 additions & 2 deletions examples/vision/caltech256.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from torch.utils.data.datapipes.utils.decoder import imagehandler

from torchdata.datapipes.iter import FileLoader, IterableWrapper, Mapper, RoutedDecoder, TarArchiveReader
from torchdata.datapipes.iter import FileOpener, IterableWrapper, Mapper, RoutedDecoder, TarArchiveReader


# Download size is ~1.2 GB so fake data is provided
Expand All @@ -23,7 +23,7 @@ def collate_sample(data):

def Caltech256(root=ROOT):
dp = IterableWrapper([os.path.join(root, "256_ObjectCategories.tar")])
dp = FileLoader(dp)
dp = FileOpener(dp, mode="b")
dp = TarArchiveReader(dp)
dp = RoutedDecoder(dp, imagehandler("pil"))
return Mapper(dp, collate_sample)
Expand Down
32 changes: 16 additions & 16 deletions test/test_local_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
CSVParser,
Extractor,
FileLister,
FileLoader,
FileOpener,
HashChecker,
IoPathFileLister,
IoPathFileLoader,
Expand Down Expand Up @@ -104,7 +104,7 @@ def make_path(fname):
csv_files = {"1.csv": "key,item\na,1\nb,2", "empty.csv": "", "empty2.csv": "\n"}
self._custom_files_set_up(csv_files)
datapipe1 = IterableWrapper([make_path(fname) for fname in ["1.csv", "empty.csv", "empty2.csv"]])
datapipe2 = FileLoader(datapipe1)
datapipe2 = FileOpener(datapipe1, mode="b")
datapipe3 = datapipe2.map(get_name)

# Functional Test: yield one row at time from each file, skipping over empty content
Expand Down Expand Up @@ -140,7 +140,7 @@ def get_name(path_and_stream):
csv_files = {"1.csv": "key,item\na,1\nb,2", "empty.csv": "", "empty2.csv": "\n"}
self._custom_files_set_up(csv_files)
datapipe1 = FileLister(self.temp_dir.name, "*.csv")
datapipe2 = FileLoader(datapipe1)
datapipe2 = FileOpener(datapipe1, mode="b")
datapipe3 = datapipe2.map(get_name)

# Functional Test: yield one row at a time as dict, with the first row being the header (key)
Expand Down Expand Up @@ -184,7 +184,7 @@ def fill_hash_dict():
fill_hash_dict()

datapipe1 = FileLister(self.temp_dir.name, "*")
datapipe2 = FileLoader(datapipe1)
datapipe2 = FileOpener(datapipe1, mode="b")
hash_check_dp = HashChecker(datapipe2, hash_dict)

# Functional Test: Ensure the DataPipe values are unchanged if the hashes are the same
Expand Down Expand Up @@ -223,7 +223,7 @@ def fill_hash_dict():
self.assertEqual(expected_stream.read(), actual_stream.read())

# __len__ Test: returns the length of source DataPipe
with self.assertRaisesRegex(TypeError, "FileLoaderIterDataPipe instance doesn't have valid length"):
with self.assertRaisesRegex(TypeError, "FileOpenerIterDataPipe instance doesn't have valid length"):
len(hash_check_dp)

def test_json_parser_iterdatapipe(self):
Expand All @@ -240,7 +240,7 @@ def is_nonempty_json(path_and_stream):
}
self._custom_files_set_up(json_files)
datapipe1 = IterableWrapper([f"{self.temp_dir.name}/{fname}" for fname in ["empty.json", "1.json", "2.json"]])
datapipe2 = FileLoader(datapipe1)
datapipe2 = FileOpener(datapipe1, mode="b")
datapipe3 = datapipe2.map(get_name)
datapipe_empty = datapipe3.filter(is_empty_json)
datapipe_nonempty = datapipe3.filter(is_nonempty_json)
Expand Down Expand Up @@ -317,12 +317,12 @@ def _write_test_tar_gz_files(self):
def test_tar_archive_reader_iterdatapipe(self):
self._write_test_tar_files()
datapipe1 = FileLister(self.temp_dir.name, "*.tar")
datapipe2 = FileLoader(datapipe1)
datapipe2 = FileOpener(datapipe1, mode="b")
tar_reader_dp = TarArchiveReader(datapipe2)

self._write_test_tar_gz_files()
datapipe_gz_1 = FileLister(self.temp_dir.name, "*.tar.gz")
datapipe_gz_2 = FileLoader(datapipe_gz_1)
datapipe_gz_2 = FileOpener(datapipe_gz_1, mode="b")
gz_reader_dp = TarArchiveReader(datapipe_gz_2)

# Functional Test: Read extracted files before reaching the end of the tarfile
Expand Down Expand Up @@ -358,7 +358,7 @@ def _write_test_zip_files(self):
def test_zip_archive_reader_iterdatapipe(self):
self._write_test_zip_files()
datapipe1 = FileLister(self.temp_dir.name, "*.zip")
datapipe2 = FileLoader(datapipe1)
datapipe2 = FileOpener(datapipe1, mode="b")
zip_reader_dp = ZipArchiveReader(datapipe2)

# Functional Test: read extracted files before reaching the end of the zipfile
Expand Down Expand Up @@ -394,7 +394,7 @@ def test_xz_archive_reader_iterdatapipe(self):
# Whereas we create multiple .xz files in the same directories below.
self._write_test_xz_files()
datapipe1 = FileLister(self.temp_dir.name, "*.xz")
datapipe2 = FileLoader(datapipe1)
datapipe2 = FileOpener(datapipe1, mode="b")
xz_reader_dp = XzFileReader(datapipe2)

# Functional Test: Read extracted files before reaching the end of the xzfile
Expand Down Expand Up @@ -453,27 +453,27 @@ def test_extractor_iterdatapipe(self):

# Functional Test: work with .tar files
tar_file_dp = FileLister(self.temp_dir.name, "*.tar")
tar_load_dp = FileLoader(tar_file_dp)
tar_load_dp = FileOpener(tar_file_dp, mode="b")
tar_extract_dp = Extractor(tar_load_dp, file_type="tar")
self._extractor_tar_test_helper(self.temp_files, tar_extract_dp)

# Functional test: work with .tar.gz files
tar_gz_file_dp = FileLister(self.temp_dir.name, "*.tar.gz")
tar_gz_load_dp = FileLoader(tar_gz_file_dp)
tar_gz_load_dp = FileOpener(tar_gz_file_dp, mode="b")
tar_gz_extract_dp = Extractor(tar_gz_load_dp, file_type="tar")
self._extractor_tar_test_helper(self.temp_files, tar_gz_extract_dp)

# Functional Test: work with .gz files
gz_file_dp = IterableWrapper([f"{self.temp_dir.name}/temp.gz"])
gz_load_dp = FileLoader(gz_file_dp)
gz_load_dp = FileOpener(gz_file_dp, mode="b")
gz_extract_dp = Extractor(gz_load_dp, file_type="gzip")
for _, gz_stream in gz_extract_dp:
with open(self.temp_files[0], "rb") as f:
self.assertEqual(f.read(), gz_stream.read())

# Functional Test: work with .zip files
zip_file_dp = FileLister(self.temp_dir.name, "*.zip")
zip_load_dp = FileLoader(zip_file_dp)
zip_load_dp = FileOpener(zip_file_dp, mode="b")
zip_extract_dp = zip_load_dp.extract(file_type="zip")
for _, zip_stream in zip_extract_dp:
for fname in self.temp_files:
Expand All @@ -482,7 +482,7 @@ def test_extractor_iterdatapipe(self):

# Functional Test: work with .xz files
xz_file_dp = FileLister(self.temp_dir.name, "*.xz")
xz_load_dp = FileLoader(xz_file_dp)
xz_load_dp = FileOpener(xz_file_dp, mode="b")
xz_extract_dp = Extractor(xz_load_dp, file_type="lzma")
self._extractor_xz_test_helper(xz_extract_dp)

Expand Down Expand Up @@ -599,7 +599,7 @@ def test_rar_archive_loader(self):
self._write_test_rar_files()

datapipe1 = FileLister(self.temp_dir.name, "*.rar")
datapipe2 = FileLoader(datapipe1)
datapipe2 = FileOpener(datapipe1, mode="b")
rar_loader_dp = RarArchiveLoader(datapipe2)

# Functional Test: read extracted files before reaching the end of the rarfile
Expand Down
4 changes: 2 additions & 2 deletions test/test_remote_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from _utils._common_utils_for_test import check_hash_fn, create_temp_dir

from torchdata.datapipes.iter import EndOnDiskCacheHolder, FileLoader, HttpReader, IterableWrapper, OnDiskCacheHolder
from torchdata.datapipes.iter import EndOnDiskCacheHolder, FileOpener, HttpReader, IterableWrapper, OnDiskCacheHolder


class TestDataPipeRemoteIO(expecttest.TestCase):
Expand Down Expand Up @@ -110,7 +110,7 @@ def _gen_filepath_fn(tar_path):

# DataPipe Constructor
file_cache_dp = OnDiskCacheHolder(tar_cache_dp, filepath_fn=_gen_filepath_fn)
file_cache_dp = FileLoader(file_cache_dp, mode="rb")
file_cache_dp = FileOpener(file_cache_dp, mode="rb")

# Functional API
file_cache_dp = file_cache_dp.read_from_tar()
Expand Down
2 changes: 2 additions & 0 deletions torchdata/datapipes/iter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Demultiplexer,
FileLister,
FileLoader,
FileOpener,
Filter,
Forker,
Grouper,
Expand Down Expand Up @@ -94,6 +95,7 @@
"FSSpecSaver",
"FileLister",
"FileLoader",
"FileOpener",
"Filter",
"Forker",
"GDriveReader",
Expand Down