Skip to content

Fix chunk synchronization #301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions splunklib/searchcommands/generating_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,19 +204,21 @@ def _execute(self, ifile, process):

"""
if self._protocol_version == 2:
result = self._read_chunk(ifile)
self._execute_v2(ifile, self.generate())
else:
assert self._protocol_version == 1
self._record_writer.write_records(self.generate())
self.finish()

if not result:
def _execute_chunk_v2(self, process, chunk):
count = 0
for row in process:
self._record_writer.write_record(row)
count += 1
if count == self._record_writer._maxresultrows:
self._finished = False
return

metadata, body = result
action = getattr(metadata, 'action', None)

if action != 'execute':
raise RuntimeError('Expected execute action, not {}'.format(action))

self._record_writer.write_records(self.generate())
self.finish()
self._finished = True

# endregion

Expand Down
61 changes: 31 additions & 30 deletions splunklib/searchcommands/internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ def _clear(self):
self._buffer.truncate()
self._inspector.clear()
self._record_count = 0
self._flushed = False

def _ensure_validity(self):
if self._finished is True:
Expand Down Expand Up @@ -748,37 +747,39 @@ class RecordWriterV2(RecordWriter):
def flush(self, finished=None, partial=None):

RecordWriter.flush(self, finished, partial) # validates arguments and the state of this instance
inspector = self._inspector

if self._flushed is False:

self._total_record_count += self._record_count
self._chunk_count += 1

# TODO: DVPL-6448: splunklib.searchcommands | Add support for partial: true when it is implemented in
# ChunkedExternProcessor (See SPL-103525)
#
# We will need to replace the following block of code with this block:
#
# metadata = [
# ('inspector', self._inspector if len(self._inspector) else None),
# ('finished', finished),
# ('partial', partial)]

if len(inspector) == 0:
inspector = None

if partial is True:
finished = False

metadata = [item for item in (('inspector', inspector), ('finished', finished))]
self._write_chunk(metadata, self._buffer.getvalue())
self._clear()
if partial or not finished:
# Don't flush partial chunks, since the SCP v2 protocol does not
# provide a way to send partial chunks yet.
return

elif finished is True:
self._write_chunk((('finished', True),), '')
if not self.is_flushed:
self.write_chunk(finished=True)

self._finished = finished is True
def write_chunk(self, finished=None):
inspector = self._inspector
self._total_record_count += self._record_count
self._chunk_count += 1

# TODO: DVPL-6448: splunklib.searchcommands | Add support for partial: true when it is implemented in
# ChunkedExternProcessor (See SPL-103525)
#
# We will need to replace the following block of code with this block:
#
# metadata = [
# ('inspector', self._inspector if len(self._inspector) else None),
# ('finished', finished),
# ('partial', partial)]

if len(inspector) == 0:
inspector = None

#if partial is True:
# finished = False

metadata = [item for item in (('inspector', inspector), ('finished', finished))]
self._write_chunk(metadata, self._buffer.getvalue())
self._clear()

def write_metadata(self, configuration):
self._ensure_validity()
Expand Down Expand Up @@ -818,4 +819,4 @@ def _write_chunk(self, metadata, body):
self.write(metadata)
self.write(body)
self._ofile.flush()
self._flushed = False
self._flushed = True
53 changes: 21 additions & 32 deletions splunklib/searchcommands/search_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def flush(self):
:return: :const:`None`

"""
self._record_writer.flush(partial=True)
self._record_writer.flush(finished=False)

def prepare(self):
""" Prepare for execution.
Expand Down Expand Up @@ -776,7 +776,7 @@ def _process_protocol_v2(self, argv, ifile, ofile):
# noinspection PyBroadException
try:
debug('Executing under protocol_version=2')
self._records = self._records_protocol_v2
#self._records = self._records_protocol_v2
self._metadata.action = 'execute'
self._execute(ifile, None)
except SystemExit:
Expand Down Expand Up @@ -833,6 +833,8 @@ def _decode_list(mv):

_encoded_value = re.compile(r'\$(?P<item>(?:\$\$|[^$])*)\$(?:;|$)') # matches a single value in an encoded list

# Note: Subclasses must override this method so that it can be called
# called as self._execute(ifile, None)
def _execute(self, ifile, process):
""" Default processing loop

Expand All @@ -846,8 +848,12 @@ def _execute(self, ifile, process):
:rtype: NoneType

"""
self._record_writer.write_records(process(self._records(ifile)))
self.finish()
if self.protocol_version == 1:
self._record_writer.write_records(process(self._records(ifile)))
self.finish()
else:
assert self._protocol_version == 2
self._execute_v2(ifile, process)

@staticmethod
def _read_chunk(ifile):
Expand Down Expand Up @@ -896,7 +902,9 @@ def _read_chunk(ifile):
_header = re.compile(r'chunked\s+1.0\s*,\s*(\d+)\s*,\s*(\d+)\s*\n')

def _records_protocol_v1(self, ifile):
return self._read_csv_records(ifile)

def _read_csv_records(self, ifile):
reader = csv.reader(ifile, dialect=CsvDialect)

try:
Expand All @@ -921,7 +929,7 @@ def _records_protocol_v1(self, ifile):
record[fieldname] = value
yield record

def _records_protocol_v2(self, ifile):
def _execute_v2(self, ifile, process):

while True:
result = self._read_chunk(ifile)
Expand All @@ -931,41 +939,22 @@ def _records_protocol_v2(self, ifile):

metadata, body = result
action = getattr(metadata, 'action', None)

if action != 'execute':
raise RuntimeError('Expected execute action, not {}'.format(action))

finished = getattr(metadata, 'finished', False)
self._finished = getattr(metadata, 'finished', False)
self._record_writer.is_flushed = False

if len(body) > 0:
reader = csv.reader(StringIO(body), dialect=CsvDialect)

try:
fieldnames = next(reader)
except StopIteration:
return
self._execute_chunk_v2(process, result)

mv_fieldnames = dict([(name, name[len('__mv_'):]) for name in fieldnames if name.startswith('__mv_')])
self._record_writer.write_chunk(finished=self._finished)

if len(mv_fieldnames) == 0:
for values in reader:
yield OrderedDict(izip(fieldnames, values))
else:
for values in reader:
record = OrderedDict()
for fieldname, value in izip(fieldnames, values):
if fieldname.startswith('__mv_'):
if len(value) > 0:
record[mv_fieldnames[fieldname]] = self._decode_list(value)
elif fieldname not in record:
record[fieldname] = value
yield record

if finished:
return
def _execute_chunk_v2(self, process, chunk):
metadata, body = chunk
if len(body) > 0:
records = self._read_csv_records(StringIO(body))
self._record_writer.write_records(process(records))

self.flush()

def _report_unexpected_error(self):

Expand Down
Loading