Skip to content

Commit e6caf2c

Browse files
authored
Merge pull request #407 from splunk/DVPL-9943
Modified Streaming and Generating Custom Search Command
2 parents 809ae2a + 1ffab11 commit e6caf2c

File tree

7 files changed

+165
-5
lines changed

7 files changed

+165
-5
lines changed

README.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,53 @@ The test suite uses Python's standard library, the built-in `unittest` library,
160160
|/tests | Source for unit tests |
161161
|/utils | Source for utilities shared by the examples and unit tests |
162162

163+
### Customization
164+
* When working with custom search commands such as Custom Streaming Commands or Custom Generating Commands, We may need to add new fields to the records based on certain conditions.
165+
* Structural changes like this may not be preserved.
166+
* Make sure to use ``add_field(record, fieldname, value)`` method from SearchCommand to add a new field and value to the record.
167+
* ___Note:__ Usage of ``add_field`` method is completely optional, if you are not facing any issues with field retention._
168+
169+
Do
170+
```python
171+
class CustomStreamingCommand(StreamingCommand):
172+
def stream(self, records):
173+
for index, record in enumerate(records):
174+
if index % 1 == 0:
175+
self.add_field(record, "odd_record", "true")
176+
yield record
177+
```
178+
179+
Don't
180+
```python
181+
class CustomStreamingCommand(StreamingCommand):
182+
def stream(self, records):
183+
for index, record in enumerate(records):
184+
if index % 1 == 0:
185+
record["odd_record"] = "true"
186+
yield record
187+
```
188+
### Customization for Generating Custom Search Command
189+
* Generating Custom Search Command is used to generate events using SDK code.
190+
* Make sure to use ``gen_record()`` method from SearchCommand to add a new record and pass event data as a key=value pair separated by , (mentioned in below example).
191+
192+
Do
193+
```python
194+
@Configuration()
195+
class GeneratorTest(GeneratingCommand):
196+
def generate(self):
197+
yield self.gen_record(_time=time.time(), one=1)
198+
yield self.gen_record(_time=time.time(), two=2)
199+
```
200+
201+
Don't
202+
```python
203+
@Configuration()
204+
class GeneratorTest(GeneratingCommand):
205+
def generate(self):
206+
yield {'_time': time.time(), 'one': 1}
207+
yield {'_time': time.time(), 'two': 2}
208+
```
209+
163210
### Changelog
164211

165212
The [CHANGELOG](CHANGELOG.md) contains a description of changes for each version of the SDK. For the latest version, see the [CHANGELOG.md](https://github.com/splunk/splunk-sdk-python/blob/master/CHANGELOG.md) on GitHub.

splunklib/searchcommands/generating_command.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,13 +213,20 @@ def _execute(self, ifile, process):
213213

214214
def _execute_chunk_v2(self, process, chunk):
215215
count = 0
216+
records = []
216217
for row in process:
217-
self._record_writer.write_record(row)
218+
records.append(row)
218219
count += 1
219220
if count == self._record_writer._maxresultrows:
220-
self._finished = False
221-
return
222-
self._finished = True
221+
break
222+
223+
for row in records:
224+
self._record_writer.write_record(row)
225+
226+
if count == self._record_writer._maxresultrows:
227+
self._finished = False
228+
else:
229+
self._finished = True
223230

224231
def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout, allow_empty_input=True):
225232
""" Process data.

splunklib/searchcommands/internals.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ def __init__(self, ofile, maxresultrows=None):
508508
self._chunk_count = 0
509509
self._pending_record_count = 0
510510
self._committed_record_count = 0
511+
self.custom_fields = set()
511512

512513
@property
513514
def is_flushed(self):
@@ -572,6 +573,7 @@ def write_record(self, record):
572573

573574
def write_records(self, records):
574575
self._ensure_validity()
576+
records = list(records)
575577
write_record = self._write_record
576578
for record in records:
577579
write_record(record)
@@ -593,6 +595,7 @@ def _write_record(self, record):
593595

594596
if fieldnames is None:
595597
self._fieldnames = fieldnames = list(record.keys())
598+
self._fieldnames.extend([i for i in self.custom_fields if i not in self._fieldnames])
596599
value_list = imap(lambda fn: (str(fn), str('__mv_') + str(fn)), fieldnames)
597600
self._writerow(list(chain.from_iterable(value_list)))
598601

splunklib/searchcommands/search_command.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,14 @@ def logging_level(self, value):
173173
raise ValueError('Unrecognized logging level: {}'.format(value))
174174
self._logger.setLevel(level)
175175

176+
def add_field(self, current_record, field_name, field_value):
177+
self._record_writer.custom_fields.add(field_name)
178+
current_record[field_name] = field_value
179+
180+
def gen_record(self, **record):
181+
self._record_writer.custom_fields |= set(record.keys())
182+
return record
183+
176184
record = Option(doc='''
177185
**Syntax: record=<bool>
178186

tests/searchcommands/test_generator_command.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ def generate(self):
4040
assert expected.issubset(seen)
4141
assert finished_seen
4242

43-
4443
def test_allow_empty_input_for_generating_command():
4544
"""
4645
Passing allow_empty_input for generating command will cause an error
@@ -59,3 +58,29 @@ def generate(self):
5958
except ValueError as error:
6059
assert str(error) == "allow_empty_input cannot be False for Generating Commands"
6160

61+
def test_all_fieldnames_present_for_generated_records():
62+
@Configuration()
63+
class GeneratorTest(GeneratingCommand):
64+
def generate(self):
65+
yield self.gen_record(_time=time.time(), one=1)
66+
yield self.gen_record(_time=time.time(), two=2)
67+
yield self.gen_record(_time=time.time(), three=3)
68+
yield self.gen_record(_time=time.time(), four=4)
69+
yield self.gen_record(_time=time.time(), five=5)
70+
71+
generator = GeneratorTest()
72+
in_stream = io.BytesIO()
73+
in_stream.write(chunky.build_getinfo_chunk())
74+
in_stream.write(chunky.build_chunk({'action': 'execute'}))
75+
in_stream.seek(0)
76+
out_stream = io.BytesIO()
77+
generator._process_protocol_v2([], in_stream, out_stream)
78+
out_stream.seek(0)
79+
80+
ds = chunky.ChunkedDataStream(out_stream)
81+
fieldnames_expected = {'_time', 'one', 'two', 'three', 'four', 'five'}
82+
fieldnames_actual = set()
83+
for chunk in ds:
84+
for row in chunk.data:
85+
fieldnames_actual |= set(row.keys())
86+
assert fieldnames_expected.issubset(fieldnames_actual)

tests/searchcommands/test_internals_v2.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ def test_record_writer_with_random_data(self, save_recording=False):
233233
self.assertGreater(writer._buffer.tell(), 0)
234234
self.assertEqual(writer._total_record_count, 0)
235235
self.assertEqual(writer.committed_record_count, 0)
236+
fieldnames.sort()
237+
writer._fieldnames.sort()
236238
self.assertListEqual(writer._fieldnames, fieldnames)
237239
self.assertListEqual(writer._inspector['messages'], messages)
238240

tests/searchcommands/test_streaming_command.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,71 @@ def stream(self, records):
2727
output = chunky.ChunkedDataStream(ofile)
2828
getinfo_response = output.read_chunk()
2929
assert getinfo_response.meta["type"] == "streaming"
30+
31+
32+
def test_field_preservation_negative():
33+
@Configuration()
34+
class TestStreamingCommand(StreamingCommand):
35+
36+
def stream(self, records):
37+
for index, record in enumerate(records):
38+
if index % 2 != 0:
39+
record["odd_field"] = True
40+
else:
41+
record["even_field"] = True
42+
yield record
43+
44+
cmd = TestStreamingCommand()
45+
ifile = io.BytesIO()
46+
ifile.write(chunky.build_getinfo_chunk())
47+
data = list()
48+
for i in range(0, 10):
49+
data.append({"in_index": str(i)})
50+
ifile.write(chunky.build_data_chunk(data, finished=True))
51+
ifile.seek(0)
52+
ofile = io.BytesIO()
53+
cmd._process_protocol_v2([], ifile, ofile)
54+
ofile.seek(0)
55+
output_iter = chunky.ChunkedDataStream(ofile).__iter__()
56+
output_iter.next()
57+
output_records = [i for i in output_iter.next().data]
58+
59+
# Assert that count of records having "odd_field" is 0
60+
assert len(list(filter(lambda r: "odd_field" in r, output_records))) == 0
61+
62+
# Assert that count of records having "even_field" is 10
63+
assert len(list(filter(lambda r: "even_field" in r, output_records))) == 10
64+
65+
66+
def test_field_preservation_positive():
67+
@Configuration()
68+
class TestStreamingCommand(StreamingCommand):
69+
70+
def stream(self, records):
71+
for index, record in enumerate(records):
72+
if index % 2 != 0:
73+
self.add_field(record, "odd_field", True)
74+
else:
75+
self.add_field(record, "even_field", True)
76+
yield record
77+
78+
cmd = TestStreamingCommand()
79+
ifile = io.BytesIO()
80+
ifile.write(chunky.build_getinfo_chunk())
81+
data = list()
82+
for i in range(0, 10):
83+
data.append({"in_index": str(i)})
84+
ifile.write(chunky.build_data_chunk(data, finished=True))
85+
ifile.seek(0)
86+
ofile = io.BytesIO()
87+
cmd._process_protocol_v2([], ifile, ofile)
88+
ofile.seek(0)
89+
output_iter = chunky.ChunkedDataStream(ofile).__iter__()
90+
output_iter.next()
91+
output_records = [i for i in output_iter.next().data]
92+
93+
# Assert that count of records having "odd_field" is 10
94+
assert len(list(filter(lambda r: "odd_field" in r, output_records))) == 10
95+
96+
# Assert that count of records having "even_field" is 10
97+
assert len(list(filter(lambda r: "even_field" in r, output_records))) == 10

0 commit comments

Comments
 (0)