@@ -27,3 +27,71 @@ def stream(self, records):
27
27
output = chunky .ChunkedDataStream (ofile )
28
28
getinfo_response = output .read_chunk ()
29
29
assert getinfo_response .meta ["type" ] == "streaming"
30
+
31
+
32
+ def test_field_preservation_negative ():
33
+ @Configuration ()
34
+ class TestStreamingCommand (StreamingCommand ):
35
+
36
+ def stream (self , records ):
37
+ for index , record in enumerate (records ):
38
+ if index % 2 != 0 :
39
+ record ["odd_field" ] = True
40
+ else :
41
+ record ["even_field" ] = True
42
+ yield record
43
+
44
+ cmd = TestStreamingCommand ()
45
+ ifile = io .BytesIO ()
46
+ ifile .write (chunky .build_getinfo_chunk ())
47
+ data = list ()
48
+ for i in range (0 , 10 ):
49
+ data .append ({"in_index" : str (i )})
50
+ ifile .write (chunky .build_data_chunk (data , finished = True ))
51
+ ifile .seek (0 )
52
+ ofile = io .BytesIO ()
53
+ cmd ._process_protocol_v2 ([], ifile , ofile )
54
+ ofile .seek (0 )
55
+ output_iter = chunky .ChunkedDataStream (ofile ).__iter__ ()
56
+ output_iter .next ()
57
+ output_records = [i for i in output_iter .next ().data ]
58
+
59
+ # Assert that count of records having "odd_field" is 0
60
+ assert len (list (filter (lambda r : "odd_field" in r , output_records ))) == 0
61
+
62
+ # Assert that count of records having "even_field" is 10
63
+ assert len (list (filter (lambda r : "even_field" in r , output_records ))) == 10
64
+
65
+
66
+ def test_field_preservation_positive ():
67
+ @Configuration ()
68
+ class TestStreamingCommand (StreamingCommand ):
69
+
70
+ def stream (self , records ):
71
+ for index , record in enumerate (records ):
72
+ if index % 2 != 0 :
73
+ self .add_field (record , "odd_field" , True )
74
+ else :
75
+ self .add_field (record , "even_field" , True )
76
+ yield record
77
+
78
+ cmd = TestStreamingCommand ()
79
+ ifile = io .BytesIO ()
80
+ ifile .write (chunky .build_getinfo_chunk ())
81
+ data = list ()
82
+ for i in range (0 , 10 ):
83
+ data .append ({"in_index" : str (i )})
84
+ ifile .write (chunky .build_data_chunk (data , finished = True ))
85
+ ifile .seek (0 )
86
+ ofile = io .BytesIO ()
87
+ cmd ._process_protocol_v2 ([], ifile , ofile )
88
+ ofile .seek (0 )
89
+ output_iter = chunky .ChunkedDataStream (ofile ).__iter__ ()
90
+ output_iter .next ()
91
+ output_records = [i for i in output_iter .next ().data ]
92
+
93
+ # Assert that count of records having "odd_field" is 10
94
+ assert len (list (filter (lambda r : "odd_field" in r , output_records ))) == 10
95
+
96
+ # Assert that count of records having "even_field" is 10
97
+ assert len (list (filter (lambda r : "even_field" in r , output_records ))) == 10
0 commit comments