Skip to content

Commit 05537a6

Browse files
committed
Replace recursive strategy with iterative approach for generating splits to overcome max recursion depth limitation
1 parent 3737d5d commit 05537a6

File tree

1 file changed

+66
-65
lines changed

1 file changed

+66
-65
lines changed

src/split.py

Lines changed: 66 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -194,71 +194,72 @@ def _process(self, reader: BytesIO, limit: int,
194194
carryover: bytes = kwargs.get('carryover', None)
195195
header: bytes = kwargs.get('header', None)
196196
manifest: csv.DictWriter = kwargs.get('manifest', None)
197-
processed = 0
198-
splitfilename = self._getnextsplit(splitnum)
199-
splitfile = os.path.join(self.outputdir, splitfilename)
200-
if includeheader and not header:
201-
newline = True
202-
header = reader.readline()
203-
writer = open(splitfile, mode='wb+')
204-
try:
205-
if header:
206-
writer.write(header)
207-
processed += len(header) if splitby == 'size' else 1
208-
if carryover:
209-
writer.write(carryover)
210-
processed += len(carryover) if splitby == 'size' else 1
211-
carryover = None
212-
if splitby == 'size':
213-
buffersize = Split._getreadbuffersize(splitsize=limit)
214-
while 1:
215-
if self.terminate:
216-
log.info('Term flag has been set by the user.')
217-
log.info('Terminating the process.')
218-
break
219-
if newline:
197+
while True:
198+
processed = 0
199+
splitfilename = self._getnextsplit(splitnum)
200+
splitfile = os.path.join(self.outputdir, splitfilename)
201+
if includeheader and not header:
202+
newline = True
203+
header = reader.readline()
204+
writer = open(splitfile, mode='wb+')
205+
try:
206+
if header:
207+
writer.write(header)
208+
processed += len(header) if splitby == 'size' else 1
209+
if carryover:
210+
writer.write(carryover)
211+
processed += len(carryover) if splitby == 'size' else 1
212+
carryover = None
213+
if splitby == 'size':
214+
buffersize = Split._getreadbuffersize(splitsize=limit)
215+
while 1:
216+
if self.terminate:
217+
log.info('Term flag has been set by the user.')
218+
log.info('Terminating the process.')
219+
break
220+
if newline:
221+
chunk = reader.readline()
222+
else:
223+
chunk = reader.read(buffersize)
224+
if not chunk:
225+
break
226+
chunksize = len(chunk)
227+
if processed + chunksize <= limit:
228+
writer.write(chunk)
229+
processed += chunksize
230+
else:
231+
carryover = chunk
232+
break
233+
elif splitby == 'linecount':
234+
while 1:
235+
if self.terminate:
236+
log.info('Term flag has been set by the user.')
237+
log.info('Terminating the process.')
238+
break
220239
chunk = reader.readline()
221-
else:
222-
chunk = reader.read(buffersize)
223-
if not chunk:
224-
break
225-
chunksize = len(chunk)
226-
if processed + chunksize <= limit:
227-
writer.write(chunk)
228-
processed += chunksize
229-
else:
230-
carryover = chunk
231-
break
232-
elif splitby == 'linecount':
233-
while 1:
234-
if self.terminate:
235-
log.info('Term flag has been set by the user.')
236-
log.info('Terminating the process.')
237-
break
238-
chunk = reader.readline()
239-
if not chunk:
240-
break
241-
processed += 1
242-
if processed <= limit:
243-
writer.write(chunk)
244-
else:
245-
carryover = chunk
246-
break
240+
if not chunk:
241+
break
242+
processed += 1
243+
if processed <= limit:
244+
writer.write(chunk)
245+
else:
246+
carryover = chunk
247+
break
248+
else:
249+
raise ValueError('Unsupported split type provided.')
250+
finally:
251+
writer.close()
252+
splitsize = os.path.getsize(splitfile)
253+
if manifest:
254+
manifest.writerow(
255+
{'filename': splitfilename, 'filesize': splitsize, 'header': includeheader})
256+
if callback:
257+
callback(splitfile, splitsize)
258+
if carryover:
259+
splitnum += 1
260+
continue
247261
else:
248-
raise ValueError('Unsupported split type provided.')
249-
finally:
250-
writer.close()
251-
splitsize = os.path.getsize(splitfile)
252-
if manifest:
253-
manifest.writerow(
254-
{'filename': splitfilename, 'filesize': splitsize, 'header': includeheader})
255-
if callback:
256-
callback(splitfile, splitsize)
257-
if carryover:
258-
splitnum += 1
259-
self._process(reader, limit, splitby, newline, includeheader, callback,
260-
splitnum=splitnum, carryover=carryover, header=header,
261-
manifest=manifest)
262+
break
262263

263264
def _endprocess(self):
264265
"""Runs statements that marks the completion of the process
@@ -336,7 +337,7 @@ def bylinecount(self, linecount: int, includeheader: bool = False,
336337
# th.daemon = True
337338
# th.start()
338339

339-
# #split.bysize(10, includeheader=True, callback=cb)
340-
# split.bylinecount(10000, includeheader=True, callback=cb)
340+
# split.bysize(10000, includeheader=True, callback=cb)
341+
# # split.bylinecount(10000, includeheader=False, callback=cb)
341342

342343
# th.join()

0 commit comments

Comments
 (0)