Skip to content

[DataLoader] Close open in DataPipe streams on best effort basis #78952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

VitalyFedyunin
Copy link
Contributor

@VitalyFedyunin VitalyFedyunin commented Jun 6, 2022

Adding ability to:

  • Track open StreamWrappers with StreamWrapper.session_streams
  • Automatically close parent StreamWrapper (ex. torchdata tar is the parent and extracted file streams are children)
  • Close streams in discarded by filtering structures

Stack from ghstack (oldest at bottom):

Differential Revision: D37489935

[ghstack-poisoned]
@facebook-github-bot
Copy link
Contributor

facebook-github-bot commented Jun 6, 2022

🔗 Helpful links

✅ No Failures (0 Pending)

As of commit c9c6bf8 (more details on the Dr. CI page):

Expand to see more

💚 💚 Looks good so far! There are no failures yet. 💚 💚


This comment was automatically generated by Dr. CI (expand for details).

Please report bugs/suggestions to the (internal) Dr. CI Users group.

Click here to manually regenerate this comment.

@VitalyFedyunin VitalyFedyunin changed the title Debug pipes overflow [DataLoader] Close open in DataPipe streams on best effort basis Jun 6, 2022
@VitalyFedyunin VitalyFedyunin requested review from NivekT and ejguan June 6, 2022 18:42
… basis"

Adding ability to:
- Track open StreamWrappers with `StreamWrapper.session_streams`
- Automatically close parent StreamWrapper (ex. torchdata tar is the parent and extracted file streams are children)
- Close streams in discarded by filtering structures




[ghstack-poisoned]
Copy link
Contributor

@NivekT NivekT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if some does fork then filter (or zip)? Would this close the stream held by the other ChildDataPipe?

stream_dp = ...
cdp1, cdp2 = stream_dp.fork(2)
cdp2 = cdp2.filter(...)
list(cdp1.readlines())

I think we have to increment the counter when fork is used, and only close a stream when there is no other reference to it.

@VitalyFedyunin
Copy link
Contributor Author

What happens if some does fork then filter (or zip)? Would this close the stream held by the other ChildDataPipe?

stream_dp = ...
cdp1, cdp2 = stream_dp.fork(2)
cdp2 = cdp2.filter(...)
list(cdp1.readlines())

I think we have to increment the counter when fork is used, and only close a stream when there is no other reference to it.

Ideally, we should warn and RuntimeError if stream is forked. IMO there is no good way to allow reading abstract buffer twice.

… basis"

Adding ability to:
- Track open StreamWrappers with `StreamWrapper.session_streams`
- Automatically close parent StreamWrapper (ex. torchdata tar is the parent and extracted file streams are children)
- Close streams in discarded by filtering structures




[ghstack-poisoned]
VitalyFedyunin added a commit that referenced this pull request Jun 6, 2022
ghstack-source-id: 3a5317f
Pull Request resolved: #78952
Comment on lines 514 to 524
iterators = [iter(datapipe) for datapipe in self.datapipes]
for data in zip(*iterators):
yield data

unused = []
for iterator in iterators:
unused += list(iterator)

# TODO(VitalyFedyunin): This should be Exception or warning when torchdata.debug is enabled
for item in unused:
StreamWrapper.close_streams(item)
Copy link
Contributor

@ejguan ejguan Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the behavior of generator functions. When a not fully consumed iterator is deconstructed, I think the iterator would be destroyed and these StreamWrappers are never created.

Do we need to handle this case?

Copy link
Contributor

@NivekT NivekT Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are two options here:

  1. Do try...finally within __iter__, where the finally clause does the clean up.
  • This should work regardless how/why the deconstruction of the iterator happens.
    I just tried this with IterableWrapper.
def __iter__(self):
    try:
        source_data = self.iterable
        if self.deepcopy:
            try:
                source_data = copy.deepcopy(self.iterable)
            except TypeError:
                warnings.warn("....")
        for data in source_data:
            yield data
    finally:
        print("FINALLLY")
dp = IterableWrapper(range(10))
it = iter(dp)
print(next(it))
print("About to create a new iterator")
it = iter(dp)
print("Created a new iterator")
0
About to create a new iterator
FINALLLY
Created a new iterator
  1. We can put the clean up logic with hook_iterator or reset method of IterDataPipe whenever that is triggered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of finally: it will produce readable code and can be placed as a development pattern

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -157,13 +157,59 @@ class StreamWrapper:
DataPipe operation like `FileOpener`. StreamWrapper would guarantee
the wrapped file handler is closed when it's out of scope.
'''
def __init__(self, file_obj):
session_streams: Dict[Any, int] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use a set in this case.
Do you think we might also register a function to atexit to make sure all session_streams are closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oddly it doesn't allow to put self into Set. Switching back to Dict.

VitalyFedyunin added a commit to pytorch/data that referenced this pull request Jun 6, 2022
…d streams"

Blocked by pytorch/pytorch#78952

Automatically cleans StreamsWrappers from various buffers
Automatically closing parent unarchive streams




[ghstack-poisoned]
… basis"

Adding ability to:
- Track open StreamWrappers with `StreamWrapper.session_streams`
- Automatically close parent StreamWrapper (ex. torchdata tar is the parent and extracted file streams are children)
- Close streams in discarded by filtering structures




[ghstack-poisoned]
… basis"

Adding ability to:
- Track open StreamWrappers with `StreamWrapper.session_streams`
- Automatically close parent StreamWrapper (ex. torchdata tar is the parent and extracted file streams are children)
- Close streams in discarded by filtering structures




[ghstack-poisoned]
VitalyFedyunin added a commit that referenced this pull request Jun 27, 2022
ghstack-source-id: 251e5cf
Pull Request resolved: #78952
@VitalyFedyunin VitalyFedyunin requested review from ejguan and NivekT June 27, 2022 21:57
… basis"

Adding ability to:
- Track open StreamWrappers with `StreamWrapper.session_streams`
- Automatically close parent StreamWrapper (ex. torchdata tar is the parent and extracted file streams are children)
- Close streams in discarded by filtering structures




[ghstack-poisoned]
… basis"

Adding ability to:
- Track open StreamWrappers with `StreamWrapper.session_streams`
- Automatically close parent StreamWrapper (ex. torchdata tar is the parent and extracted file streams are children)
- Close streams in discarded by filtering structures




[ghstack-poisoned]
VitalyFedyunin added a commit that referenced this pull request Jun 28, 2022
ghstack-source-id: df41ef1
Pull Request resolved: #78952
@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.

Copy link
Contributor

@NivekT NivekT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think we need some tests to reason through the common and complicated cases.
  2. Question: when a buffer is cleared (say in Shuffler), do we need to attempt to call close_streams?

self.name = name
if parent_stream is not None:
if not isinstance(parent_stream, StreamWrapper):
raise RuntimeError('Parent steam should be StreamWrapper, {} was given'.format(type(parent_stream)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise RuntimeError('Parent steam should be StreamWrapper, {} was given'.format(type(parent_stream)))
raise RuntimeError('Parent stream should be StreamWrapper, {} was given'.format(type(parent_stream)))

Comment on lines 266 to 272
def autoclose(self):
'''
Marks Steam to close automatically as soon as all child streams are closed.
'''
if self.child_counter == 0:
self.close()
self.close_on_last_child = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the usage of this? Provide an option for mark streams as autoclose?
If that is the case, maybe that can be an optional argument to __init__?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will close stream if there is no active children or mark it to be closed automatically if there is at least one.

if self.child_counter == 0:
self.close()
self.close_on_last_child = True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change the implementation of __del__ here to remove self from StreamWrapper.session_streams?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on this comment

if isinstance(v, dict):
for kk, vv in v.items():
cls.close_streams(vv, depth=depth + 1)
elif isinstance(v, list) or isinstance(v, tuple):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isinstance(v, (list, tuple))

self.parent_stream.child_counter -= 1
if not self.parent_stream.child_counter and self.parent_stream.close_on_last_child:
self.parent_stream.close()
self.file_obj.close(*args, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.file_obj.close(*args, **kwargs)
try:
self.file_obj.close(*args, **kwargs)
except AttributeError:
pass

… basis"

Adding ability to:
- Track open StreamWrappers with `StreamWrapper.session_streams`
- Automatically close parent StreamWrapper (ex. torchdata tar is the parent and extracted file streams are children)
- Close streams in discarded by filtering structures


Differential Revision: [D37489935](https://our.internmc.facebook.com/intern/diff/D37489935)

[ghstack-poisoned]
… basis"

Adding ability to:
- Track open StreamWrappers with `StreamWrapper.session_streams`
- Automatically close parent StreamWrapper (ex. torchdata tar is the parent and extracted file streams are children)
- Close streams in discarded by filtering structures


Differential Revision: [D37489935](https://our.internmc.facebook.com/intern/diff/D37489935)

[ghstack-poisoned]
VitalyFedyunin added a commit that referenced this pull request Jun 29, 2022
ghstack-source-id: d77d7c0
Pull Request resolved: #78952
@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.

@VitalyFedyunin
Copy link
Contributor Author

@pytorchbot merge -g

@pytorchmergebot
Copy link
Collaborator

@pytorchbot successfully started a merge job. Check the current status here

@pytorchmergebot
Copy link
Collaborator

@VitalyFedyunin your PR has been successfully merged.

@github-actions
Copy link
Contributor

Hey @VitalyFedyunin.
You've committed this PR, but it does not have both a 'release notes: ...' and 'topics: ...' label. Please add one of each to the PR. The 'release notes: ...' label should represent the part of PyTorch that this PR changes (fx, autograd, distributed, etc) and the 'topics: ...' label should represent the kind of PR it is (not user facing, new feature, bug fix, perf improvement, etc). The list of valid labels can be found here for the 'release notes: ...' and here for the 'topics: ...'.
For changes that are 'topic: not user facing' there is no need for a release notes label.

@NivekT NivekT added module: data torch.utils.data release notes: dataloader release notes category topic: improvements topic category labels Jun 29, 2022
facebook-github-bot pushed a commit that referenced this pull request Jun 30, 2022
) (#78952)

Summary:
Adding ability to:
- Track open StreamWrappers with `StreamWrapper.session_streams`
- Automatically close parent StreamWrapper (ex. torchdata tar is the parent and extracted file streams are children)
- Close streams in discarded by filtering structures

Pull Request resolved: #78952
Approved by: https://github.com/ejguan

Test Plan: contbuild & OSS CI, see https://hud.pytorch.org/commit/pytorch/pytorch/331c0c18033bf4138c9ea468a8c759865dd8ffff

Reviewed By: ejguan

Differential Revision: D37489935

fbshipit-source-id: 6c0fa4d03b1d957cae9ab6062a00b27b120d68e6
@facebook-github-bot facebook-github-bot deleted the gh/VitalyFedyunin/179/head branch July 3, 2022 14:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants