From e37c69146687aa051ffee8a4132d0bea7f793981 Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Tue, 12 Oct 2021 20:31:50 +0530 Subject: [PATCH 01/18] Changes added to preserve the custom field --- splunklib/searchcommands/internals.py | 3 +++ splunklib/searchcommands/search_command.py | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/splunklib/searchcommands/internals.py b/splunklib/searchcommands/internals.py index 85f9e0fe..411c7025 100644 --- a/splunklib/searchcommands/internals.py +++ b/splunklib/searchcommands/internals.py @@ -508,6 +508,7 @@ def __init__(self, ofile, maxresultrows=None): self._chunk_count = 0 self._pending_record_count = 0 self._committed_record_count = 0 + self.custom_fields = set() @property def is_flushed(self): @@ -593,6 +594,8 @@ def _write_record(self, record): if fieldnames is None: self._fieldnames = fieldnames = list(record.keys()) + self._fieldnames.extend(self.custom_fields) + fieldnames.extend(self.custom_fields) value_list = imap(lambda fn: (str(fn), str('__mv_') + str(fn)), fieldnames) self._writerow(list(chain.from_iterable(value_list))) diff --git a/splunklib/searchcommands/search_command.py b/splunklib/searchcommands/search_command.py index 270569ad..96b3a4c3 100644 --- a/splunklib/searchcommands/search_command.py +++ b/splunklib/searchcommands/search_command.py @@ -173,6 +173,11 @@ def logging_level(self, value): raise ValueError('Unrecognized logging level: {}'.format(value)) self._logger.setLevel(level) + def add_field(self, current_record, field_name, field_value): + self._record_writer.custom_fields.add(field_name) + current_record[field_name] = field_value + return current_record + record = Option(doc=''' **Syntax: record= From ae817d927c092f9fde207b4fce9746f392b7549f Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Tue, 19 Oct 2021 17:45:03 +0530 Subject: [PATCH 02/18] Update README.md Description added for how to use add_field method to preserve conditionally added new fields and values. --- README.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/README.md b/README.md index 6b92a179..21f22723 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,31 @@ The test suite uses Python's standard library, the built-in `unittest` library, |/tests | Source for unit tests | |/utils | Source for utilities shared by the examples and unit tests | +### Customization +* When working with custom search commands such as Custom Streaming Commands or Custom Generating Commands, We may need to add new fields to the records based on certain conditions. +* Structural changes like this may not be preserved. +* Make sure to use ``add_field(record, fieldname, value)`` method from SearchCommand to add a new field and value to the record. + +Do +```python +class CustomStreamingCommand(StreamingCommand): + def stream(self, records): + for index, record in enumerate(records): + if index % 1 == 0: + self.add_field(record, "odd_record", "true") + yield record +``` + +Don't +```python +class CustomStreamingCommand(StreamingCommand): + def stream(self, records): + for index, record in enumerate(records): + if index % 1 == 0: + record["odd_record"] = "true" + yield record +``` + ### Changelog The [CHANGELOG](CHANGELOG.md) contains a description of changes for each version of the SDK. For the latest version, see the [CHANGELOG.md](https://github.com/splunk/splunk-sdk-python/blob/master/CHANGELOG.md) on GitHub. From 0875ba95f9bc5a37458f962c92570498a2c9069c Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Wed, 20 Oct 2021 19:32:41 +0530 Subject: [PATCH 03/18] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 21f22723..bdda1855 100644 --- a/README.md +++ b/README.md @@ -161,7 +161,7 @@ class CustomStreamingCommand(StreamingCommand): def stream(self, records): for index, record in enumerate(records): if index % 1 == 0: - self.add_field(record, "odd_record", "true") + record = self.add_field(record, "odd_record", "true") yield record ``` From 35a8ff1e97fb874b3635d14aa568d5811078c786 Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Thu, 21 Oct 2021 14:19:50 +0530 Subject: [PATCH 04/18] Update internals.py --- splunklib/searchcommands/internals.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/splunklib/searchcommands/internals.py b/splunklib/searchcommands/internals.py index 411c7025..e116d091 100644 --- a/splunklib/searchcommands/internals.py +++ b/splunklib/searchcommands/internals.py @@ -573,6 +573,7 @@ def write_record(self, record): def write_records(self, records): self._ensure_validity() + records = list(records) write_record = self._write_record for record in records: write_record(record) @@ -593,9 +594,7 @@ def _write_record(self, record): fieldnames = self._fieldnames if fieldnames is None: - self._fieldnames = fieldnames = list(record.keys()) - self._fieldnames.extend(self.custom_fields) - fieldnames.extend(self.custom_fields) + self._fieldnames = fieldnames = {*list(record.keys())} | self.custom_fields value_list = imap(lambda fn: (str(fn), str('__mv_') + str(fn)), fieldnames) self._writerow(list(chain.from_iterable(value_list))) From 114f2e8cecba76e2c32646433ccd45f18a8cdcdd Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Thu, 21 Oct 2021 14:20:17 +0530 Subject: [PATCH 05/18] Update search_command.py --- splunklib/searchcommands/search_command.py | 1 - 1 file changed, 1 deletion(-) diff --git a/splunklib/searchcommands/search_command.py b/splunklib/searchcommands/search_command.py index 96b3a4c3..5c1eb988 100644 --- a/splunklib/searchcommands/search_command.py +++ b/splunklib/searchcommands/search_command.py @@ -176,7 +176,6 @@ def logging_level(self, value): def add_field(self, current_record, field_name, field_value): self._record_writer.custom_fields.add(field_name) current_record[field_name] = field_value - return current_record record = Option(doc=''' **Syntax: record= From a622420f0a1161e583776319329a7f3494b46eb2 Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Thu, 21 Oct 2021 16:48:42 +0530 Subject: [PATCH 06/18] Update internals.py --- splunklib/searchcommands/internals.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splunklib/searchcommands/internals.py b/splunklib/searchcommands/internals.py index e116d091..88d017db 100644 --- a/splunklib/searchcommands/internals.py +++ b/splunklib/searchcommands/internals.py @@ -594,7 +594,7 @@ def _write_record(self, record): fieldnames = self._fieldnames if fieldnames is None: - self._fieldnames = fieldnames = {*list(record.keys())} | self.custom_fields + self._fieldnames = fieldnames = set(list(record.keys())) | self.custom_fields value_list = imap(lambda fn: (str(fn), str('__mv_') + str(fn)), fieldnames) self._writerow(list(chain.from_iterable(value_list))) From c4995a6d26b284af2c68a014aaf06ccf43f75b39 Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Thu, 21 Oct 2021 18:03:27 +0530 Subject: [PATCH 07/18] Update internals.py --- splunklib/searchcommands/internals.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/splunklib/searchcommands/internals.py b/splunklib/searchcommands/internals.py index 88d017db..ae8b0393 100644 --- a/splunklib/searchcommands/internals.py +++ b/splunklib/searchcommands/internals.py @@ -594,7 +594,8 @@ def _write_record(self, record): fieldnames = self._fieldnames if fieldnames is None: - self._fieldnames = fieldnames = set(list(record.keys())) | self.custom_fields + self._fieldnames = fieldnames = list(record.keys()) + self._fieldnames.extend(self.custom_fields) value_list = imap(lambda fn: (str(fn), str('__mv_') + str(fn)), fieldnames) self._writerow(list(chain.from_iterable(value_list))) From c6ec68986f20e02ef636ceeb310f2f6720d8078e Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Fri, 22 Oct 2021 13:46:41 +0530 Subject: [PATCH 08/18] Merged fieldnames --- splunklib/searchcommands/internals.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/splunklib/searchcommands/internals.py b/splunklib/searchcommands/internals.py index ae8b0393..41169bb4 100644 --- a/splunklib/searchcommands/internals.py +++ b/splunklib/searchcommands/internals.py @@ -594,8 +594,8 @@ def _write_record(self, record): fieldnames = self._fieldnames if fieldnames is None: - self._fieldnames = fieldnames = list(record.keys()) - self._fieldnames.extend(self.custom_fields) + self.custom_fields |= set(record.keys()) + self._fieldnames = fieldnames = [*self.custom_fields] value_list = imap(lambda fn: (str(fn), str('__mv_') + str(fn)), fieldnames) self._writerow(list(chain.from_iterable(value_list))) From b9571363b8235924a4a0e77fc74d3561060a3a1f Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Fri, 22 Oct 2021 13:49:52 +0530 Subject: [PATCH 09/18] Fixed: test failed due to fieldname merged --- tests/searchcommands/test_internals_v2.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/searchcommands/test_internals_v2.py b/tests/searchcommands/test_internals_v2.py index bdef65c4..34e6b61c 100755 --- a/tests/searchcommands/test_internals_v2.py +++ b/tests/searchcommands/test_internals_v2.py @@ -233,6 +233,8 @@ def test_record_writer_with_random_data(self, save_recording=False): self.assertGreater(writer._buffer.tell(), 0) self.assertEqual(writer._total_record_count, 0) self.assertEqual(writer.committed_record_count, 0) + fieldnames.sort() + writer._fieldnames.sort() self.assertListEqual(writer._fieldnames, fieldnames) self.assertListEqual(writer._inspector['messages'], messages) From d20f1946a73ff6d5ba6f7cb40b854ccc34406707 Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Fri, 22 Oct 2021 13:55:09 +0530 Subject: [PATCH 10/18] Update internals.py --- splunklib/searchcommands/internals.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splunklib/searchcommands/internals.py b/splunklib/searchcommands/internals.py index 41169bb4..a5d76c9c 100644 --- a/splunklib/searchcommands/internals.py +++ b/splunklib/searchcommands/internals.py @@ -595,7 +595,7 @@ def _write_record(self, record): if fieldnames is None: self.custom_fields |= set(record.keys()) - self._fieldnames = fieldnames = [*self.custom_fields] + self._fieldnames = fieldnames = list(self.custom_fields) value_list = imap(lambda fn: (str(fn), str('__mv_') + str(fn)), fieldnames) self._writerow(list(chain.from_iterable(value_list))) From d6952070b4fbc708aa40cc375bf059cc3c6ac7bd Mon Sep 17 00:00:00 2001 From: akaila-splunk Date: Mon, 25 Oct 2021 11:31:21 +0530 Subject: [PATCH 11/18] add gen_record() method for create a new record --- splunklib/searchcommands/generating_command.py | 8 ++++++++ splunklib/searchcommands/search_command.py | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/splunklib/searchcommands/generating_command.py b/splunklib/searchcommands/generating_command.py index e766effb..6a2f2c6d 100644 --- a/splunklib/searchcommands/generating_command.py +++ b/splunklib/searchcommands/generating_command.py @@ -213,7 +213,15 @@ def _execute(self, ifile, process): def _execute_chunk_v2(self, process, chunk): count = 0 + records = [] for row in process: + records.append(row) + count+=1 + if count == self._record_writer._maxresultrows: + break + + count = 0 + for row in records: self._record_writer.write_record(row) count += 1 if count == self._record_writer._maxresultrows: diff --git a/splunklib/searchcommands/search_command.py b/splunklib/searchcommands/search_command.py index 5c1eb988..3c6329b1 100644 --- a/splunklib/searchcommands/search_command.py +++ b/splunklib/searchcommands/search_command.py @@ -177,6 +177,10 @@ def add_field(self, current_record, field_name, field_value): self._record_writer.custom_fields.add(field_name) current_record[field_name] = field_value + def gen_record(self, **record): + self._record_writer.custom_fields |= record.keys() + return {**record} + record = Option(doc=''' **Syntax: record= From 0003f65c521f421e04d2b5c8f3b39afe8f877680 Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Mon, 25 Oct 2021 17:06:09 +0530 Subject: [PATCH 12/18] Update internals.py --- splunklib/searchcommands/internals.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/splunklib/searchcommands/internals.py b/splunklib/searchcommands/internals.py index a5d76c9c..fa32f0b1 100644 --- a/splunklib/searchcommands/internals.py +++ b/splunklib/searchcommands/internals.py @@ -594,8 +594,8 @@ def _write_record(self, record): fieldnames = self._fieldnames if fieldnames is None: - self.custom_fields |= set(record.keys()) - self._fieldnames = fieldnames = list(self.custom_fields) + self._fieldnames = fieldnames = list(record.keys()) + self._fieldnames.extend([i for i in self.custom_fields if i not in self._fieldnames]) value_list = imap(lambda fn: (str(fn), str('__mv_') + str(fn)), fieldnames) self._writerow(list(chain.from_iterable(value_list))) From c8f793d89cbdf70cffa6542d334a49e041d40d96 Mon Sep 17 00:00:00 2001 From: akaila-splunk Date: Tue, 26 Oct 2021 13:58:12 +0530 Subject: [PATCH 13/18] Update search_command.py --- splunklib/searchcommands/search_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splunklib/searchcommands/search_command.py b/splunklib/searchcommands/search_command.py index 3c6329b1..1d106143 100644 --- a/splunklib/searchcommands/search_command.py +++ b/splunklib/searchcommands/search_command.py @@ -179,7 +179,7 @@ def add_field(self, current_record, field_name, field_value): def gen_record(self, **record): self._record_writer.custom_fields |= record.keys() - return {**record} + return record record = Option(doc=''' **Syntax: record= From aa3bab7dad6c07f17b1b5222512f44fff5174262 Mon Sep 17 00:00:00 2001 From: akaila-splunk Date: Wed, 27 Oct 2021 19:26:23 +0530 Subject: [PATCH 14/18] added test case for generating CSC and updated README.md --- README.md | 21 +++++++++++++++ .../searchcommands/generating_command.py | 13 +++++---- .../searchcommands/test_generator_command.py | 27 ++++++++++++++++++- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index bdda1855..124e7f98 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,27 @@ class CustomStreamingCommand(StreamingCommand): record["odd_record"] = "true" yield record ``` +### Customization for Generating Custom Search Command +* Generating Custom Search Command is used to generate events using SDK code. +* Make sure to use ``gen_record()`` method from SearchCommand to add a new record and pass event data as a key=value pair separated by , (mentioned in below example). + +Do +```python +@Configuration() + class GeneratorTest(GeneratingCommand): + def generate(self): + yield self.gen_record(_time=time.time(), one=1) + yield self.gen_record(_time=time.time(), two=2) +``` + +Don't +```python +@Configuration() + class GeneratorTest(GeneratingCommand): + def generate(self): + yield {'_time': time.time(), 'one': 1} + yield {'_time': time.time(), 'two': 2} +``` ### Changelog diff --git a/splunklib/searchcommands/generating_command.py b/splunklib/searchcommands/generating_command.py index 6a2f2c6d..6a75d2c2 100644 --- a/splunklib/searchcommands/generating_command.py +++ b/splunklib/searchcommands/generating_command.py @@ -216,18 +216,17 @@ def _execute_chunk_v2(self, process, chunk): records = [] for row in process: records.append(row) - count+=1 + count += 1 if count == self._record_writer._maxresultrows: break - count = 0 for row in records: self._record_writer.write_record(row) - count += 1 - if count == self._record_writer._maxresultrows: - self._finished = False - return - self._finished = True + + if count == self._record_writer._maxresultrows: + self._finished = False + else: + self._finished = True def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout, allow_empty_input=True): """ Process data. diff --git a/tests/searchcommands/test_generator_command.py b/tests/searchcommands/test_generator_command.py index 3b2281e8..0af79f96 100644 --- a/tests/searchcommands/test_generator_command.py +++ b/tests/searchcommands/test_generator_command.py @@ -40,7 +40,6 @@ def generate(self): assert expected.issubset(seen) assert finished_seen - def test_allow_empty_input_for_generating_command(): """ Passing allow_empty_input for generating command will cause an error @@ -59,3 +58,29 @@ def generate(self): except ValueError as error: assert str(error) == "allow_empty_input cannot be False for Generating Commands" +def test_all_fieldnames_present_for_generated_records(): + @Configuration() + class GeneratorTest(GeneratingCommand): + def generate(self): + yield self.gen_record(_time=time.time(), one=1) + yield self.gen_record(_time=time.time(), two=2) + yield self.gen_record(_time=time.time(), three=3) + yield self.gen_record(_time=time.time(), four=4) + yield self.gen_record(_time=time.time(), five=5) + + generator = GeneratorTest() + in_stream = io.BytesIO() + in_stream.write(chunky.build_getinfo_chunk()) + in_stream.write(chunky.build_chunk({'action': 'execute'})) + in_stream.seek(0) + out_stream = io.BytesIO() + generator._process_protocol_v2([], in_stream, out_stream) + out_stream.seek(0) + + ds = chunky.ChunkedDataStream(out_stream) + fieldnames_expected = {'_time', 'one', 'two', 'three', 'four', 'five'} + fieldnames_actual = set() + for chunk in ds: + for row in chunk.data: + fieldnames_actual |= row.keys() + assert fieldnames_expected.issubset(fieldnames_actual) From cc17181d593b48716d6063290bcaada4afac75a5 Mon Sep 17 00:00:00 2001 From: akaila-splunk Date: Thu, 28 Oct 2021 14:52:38 +0530 Subject: [PATCH 15/18] updated search_command.py file --- splunklib/searchcommands/search_command.py | 2 +- tests/searchcommands/test_generator_command.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/splunklib/searchcommands/search_command.py b/splunklib/searchcommands/search_command.py index 1d106143..5a626cc5 100644 --- a/splunklib/searchcommands/search_command.py +++ b/splunklib/searchcommands/search_command.py @@ -178,7 +178,7 @@ def add_field(self, current_record, field_name, field_value): current_record[field_name] = field_value def gen_record(self, **record): - self._record_writer.custom_fields |= record.keys() + self._record_writer.custom_fields |= set(record.keys()) return record record = Option(doc=''' diff --git a/tests/searchcommands/test_generator_command.py b/tests/searchcommands/test_generator_command.py index 0af79f96..63ae3ac8 100644 --- a/tests/searchcommands/test_generator_command.py +++ b/tests/searchcommands/test_generator_command.py @@ -82,5 +82,5 @@ def generate(self): fieldnames_actual = set() for chunk in ds: for row in chunk.data: - fieldnames_actual |= row.keys() + fieldnames_actual |= set(row.keys()) assert fieldnames_expected.issubset(fieldnames_actual) From 834f570894c54618fe09fb435f00f3fad755f07f Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Mon, 1 Nov 2021 15:21:39 +0530 Subject: [PATCH 16/18] Update README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 124e7f98..16e4f448 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,7 @@ The test suite uses Python's standard library, the built-in `unittest` library, * When working with custom search commands such as Custom Streaming Commands or Custom Generating Commands, We may need to add new fields to the records based on certain conditions. * Structural changes like this may not be preserved. * Make sure to use ``add_field(record, fieldname, value)`` method from SearchCommand to add a new field and value to the record. +* ___Note:__ Usage of ``add_field`` method is completely optional, if you are not facing any issues with field retention._ Do ```python From f847b4106ebaf92ef04207ee99cff6eceba1513c Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Wed, 3 Nov 2021 12:20:46 +0530 Subject: [PATCH 17/18] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 16e4f448..2a8ea22e 100644 --- a/README.md +++ b/README.md @@ -162,7 +162,7 @@ class CustomStreamingCommand(StreamingCommand): def stream(self, records): for index, record in enumerate(records): if index % 1 == 0: - record = self.add_field(record, "odd_record", "true") + self.add_field(record, "odd_record", "true") yield record ``` From 1ffab118f22f8c74611c8d42f082816227fb350d Mon Sep 17 00:00:00 2001 From: vmalaviya Date: Wed, 3 Nov 2021 15:23:24 +0530 Subject: [PATCH 18/18] Update test_streaming_command.py --- .../searchcommands/test_streaming_command.py | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/tests/searchcommands/test_streaming_command.py b/tests/searchcommands/test_streaming_command.py index dcc00b53..ffe6a737 100644 --- a/tests/searchcommands/test_streaming_command.py +++ b/tests/searchcommands/test_streaming_command.py @@ -27,3 +27,71 @@ def stream(self, records): output = chunky.ChunkedDataStream(ofile) getinfo_response = output.read_chunk() assert getinfo_response.meta["type"] == "streaming" + + +def test_field_preservation_negative(): + @Configuration() + class TestStreamingCommand(StreamingCommand): + + def stream(self, records): + for index, record in enumerate(records): + if index % 2 != 0: + record["odd_field"] = True + else: + record["even_field"] = True + yield record + + cmd = TestStreamingCommand() + ifile = io.BytesIO() + ifile.write(chunky.build_getinfo_chunk()) + data = list() + for i in range(0, 10): + data.append({"in_index": str(i)}) + ifile.write(chunky.build_data_chunk(data, finished=True)) + ifile.seek(0) + ofile = io.BytesIO() + cmd._process_protocol_v2([], ifile, ofile) + ofile.seek(0) + output_iter = chunky.ChunkedDataStream(ofile).__iter__() + output_iter.next() + output_records = [i for i in output_iter.next().data] + + # Assert that count of records having "odd_field" is 0 + assert len(list(filter(lambda r: "odd_field" in r, output_records))) == 0 + + # Assert that count of records having "even_field" is 10 + assert len(list(filter(lambda r: "even_field" in r, output_records))) == 10 + + +def test_field_preservation_positive(): + @Configuration() + class TestStreamingCommand(StreamingCommand): + + def stream(self, records): + for index, record in enumerate(records): + if index % 2 != 0: + self.add_field(record, "odd_field", True) + else: + self.add_field(record, "even_field", True) + yield record + + cmd = TestStreamingCommand() + ifile = io.BytesIO() + ifile.write(chunky.build_getinfo_chunk()) + data = list() + for i in range(0, 10): + data.append({"in_index": str(i)}) + ifile.write(chunky.build_data_chunk(data, finished=True)) + ifile.seek(0) + ofile = io.BytesIO() + cmd._process_protocol_v2([], ifile, ofile) + ofile.seek(0) + output_iter = chunky.ChunkedDataStream(ofile).__iter__() + output_iter.next() + output_records = [i for i in output_iter.next().data] + + # Assert that count of records having "odd_field" is 10 + assert len(list(filter(lambda r: "odd_field" in r, output_records))) == 10 + + # Assert that count of records having "even_field" is 10 + assert len(list(filter(lambda r: "even_field" in r, output_records))) == 10