Skip to content

Commit dc13f85

Browse files
UTs for Out-forward and Out-file
Signed-off-by: Athish Pranav D <[email protected]>
1 parent e0cde8b commit dc13f85

File tree

2 files changed

+130
-34
lines changed

2 files changed

+130
-34
lines changed

test/plugin/test_out_file.rb

Lines changed: 74 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
require 'time'
66
require 'timecop'
77
require 'zlib'
8+
require 'zstd-ruby'
89
require 'fluent/file_wrapper'
910

1011
class FileOutputTest < Test::Unit::TestCase
@@ -397,20 +398,32 @@ def create_driver(conf = CONFIG, opts = {})
397398
end
398399
end
399400

400-
def check_gzipped_result(path, expect)
401+
def check_zipped_result(path, expect, type: :gzip)
401402
# Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790
402403
# Following code from https://www.ruby-forum.com/topic/971591#979520
403404
result = ''
404-
File.open(path, "rb") { |io|
405-
loop do
406-
gzr = Zlib::GzipReader.new(StringIO.new(io.read))
407-
result << gzr.read
408-
unused = gzr.unused
409-
gzr.finish
410-
break if unused.nil?
411-
io.pos -= unused.length
412-
end
413-
}
405+
if type == :gzip || type == :gz
406+
File.open(path, "rb") { |io|
407+
loop do
408+
gzr = Zlib::GzipReader.new(StringIO.new(io.read))
409+
result << gzr.read
410+
unused = gzr.unused
411+
gzr.finish
412+
break if unused.nil?
413+
io.pos -= unused.length
414+
end
415+
}
416+
elsif type == :zstd
417+
File.open(path, "rb") { |io|
418+
loop do
419+
reader = Zstd::StreamReader.new(StringIO.new(io.read))
420+
result << reader.read(1024)
421+
break if io.eof?
422+
end
423+
}
424+
else
425+
raise "Invalid compression type to check"
426+
end
414427

415428
assert_equal expect, result
416429
end
@@ -421,7 +434,7 @@ def check_result(path, expect)
421434
end
422435

423436
sub_test_case 'write' do
424-
test 'basic case' do
437+
test 'basic case with gz' do
425438
d = create_driver
426439

427440
assert_false File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz")
@@ -433,7 +446,29 @@ def check_result(path, expect)
433446
end
434447

435448
assert File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz")
436-
check_gzipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}])
449+
check_zipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}])
450+
end
451+
452+
test 'write with zstd compression' do
453+
d = create_driver %[
454+
path #{TMP_DIR}/out_file_test
455+
compress zstd
456+
utc
457+
<buffer>
458+
timekey_use_utc true
459+
</buffer>
460+
]
461+
462+
assert_false File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.zstd")
463+
464+
time = event_time("2011-01-02 13:14:15 UTC")
465+
d.run(default_tag: 'test') do
466+
d.feed(time, {"a"=>1})
467+
d.feed(time, {"a"=>2})
468+
end
469+
470+
assert File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.zstd")
471+
check_zipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.zstd", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}], type: :zstd)
437472
end
438473
end
439474

@@ -481,7 +516,7 @@ def parse_system(text)
481516

482517
assert File.exist?("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz")
483518

484-
check_gzipped_result("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n])
519+
check_zipped_result("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n])
485520
dir_mode = "%o" % File::stat(TMP_DIR_WITH_SYSTEM).mode
486521
assert_equal(OVERRIDE_DIR_PERMISSION, dir_mode[-3, 3].to_i)
487522
file_mode = "%o" % File::stat("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz").mode
@@ -500,7 +535,7 @@ def parse_system(text)
500535
end
501536

502537
path = d.instance.last_written_path
503-
check_gzipped_result(path, %[#{Yajl.dump({"a" => 1, 'time' => time.to_i})}#{@default_newline}] + %[#{Yajl.dump({"a" => 2, 'time' => time.to_i})}#{@default_newline}])
538+
check_zipped_result(path, %[#{Yajl.dump({"a" => 1, 'time' => time.to_i})}#{@default_newline}] + %[#{Yajl.dump({"a" => 2, 'time' => time.to_i})}#{@default_newline}])
504539
end
505540

506541
test 'ltsv' do
@@ -513,7 +548,7 @@ def parse_system(text)
513548
end
514549

515550
path = d.instance.last_written_path
516-
check_gzipped_result(path, %[a:1\ttime:2011-01-02T13:14:15Z#{@default_newline}] + %[a:2\ttime:2011-01-02T13:14:15Z#{@default_newline}])
551+
check_zipped_result(path, %[a:1\ttime:2011-01-02T13:14:15Z#{@default_newline}] + %[a:2\ttime:2011-01-02T13:14:15Z#{@default_newline}])
517552
end
518553

519554
test 'single_value' do
@@ -526,7 +561,7 @@ def parse_system(text)
526561
end
527562

528563
path = d.instance.last_written_path
529-
check_gzipped_result(path, %[1#{@default_newline}] + %[2#{@default_newline}])
564+
check_zipped_result(path, %[1#{@default_newline}] + %[2#{@default_newline}])
530565
end
531566
end
532567

@@ -547,23 +582,24 @@ def parse_system(text)
547582

548583
path = write_once.call
549584
assert_equal "#{TMP_DIR}/out_file_test.20110102_0.log.gz", path
550-
check_gzipped_result(path, formatted_lines)
585+
check_zipped_result(path, formatted_lines)
551586
assert_equal 1, Dir.glob("#{TMP_DIR}/out_file_test.*").size
552587

553588
path = write_once.call
554589
assert_equal "#{TMP_DIR}/out_file_test.20110102_1.log.gz", path
555-
check_gzipped_result(path, formatted_lines)
590+
check_zipped_result(path, formatted_lines)
556591
assert_equal 2, Dir.glob("#{TMP_DIR}/out_file_test.*").size
557592

558593
path = write_once.call
559594
assert_equal "#{TMP_DIR}/out_file_test.20110102_2.log.gz", path
560-
check_gzipped_result(path, formatted_lines)
595+
check_zipped_result(path, formatted_lines)
561596
assert_equal 3, Dir.glob("#{TMP_DIR}/out_file_test.*").size
562597
end
563598

564599
data(
565-
"with compression" => true,
566-
"without compression" => false,
600+
"without compression" => "text",
601+
"with gzip compression" => "gz",
602+
"with zstd compression" => "zstd"
567603
)
568604
test 'append' do |compression|
569605
time = event_time("2011-01-02 13:14:15 UTC")
@@ -578,8 +614,8 @@ def parse_system(text)
578614
timekey_use_utc true
579615
</buffer>
580616
]
581-
if compression
582-
config << " compress gz"
617+
if compression != :text
618+
config << " compress #{compression}"
583619
end
584620
d = create_driver(config)
585621
d.run(default_tag: 'test'){
@@ -590,16 +626,16 @@ def parse_system(text)
590626
}
591627

592628
log_file_name = "out_file_test.20110102.log"
593-
if compression
594-
log_file_name << ".gz"
629+
if compression != "text"
630+
log_file_name << ".#{compression}"
595631
end
596632

597633
1.upto(3) do |i|
598634
path = write_once.call
599635
assert_equal "#{TMP_DIR}/#{log_file_name}", path
600636
expect = formatted_lines * i
601-
if compression
602-
check_gzipped_result(path, expect)
637+
if compression != "text"
638+
check_zipped_result(path, expect, type: compression.to_sym)
603639
else
604640
check_result(path, expect)
605641
end
@@ -630,15 +666,15 @@ def parse_system(text)
630666

631667
path = write_once.call
632668
assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
633-
check_gzipped_result(path, formatted_lines)
669+
check_zipped_result(path, formatted_lines)
634670

635671
path = write_once.call
636672
assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
637-
check_gzipped_result(path, formatted_lines * 2)
673+
check_zipped_result(path, formatted_lines * 2)
638674

639675
path = write_once.call
640676
assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
641-
check_gzipped_result(path, formatted_lines * 3)
677+
check_zipped_result(path, formatted_lines * 3)
642678
end
643679
end
644680

@@ -667,15 +703,15 @@ def parse_system(text)
667703
path = write_once.call
668704
# Rotated at 2011-01-02 17:00:00+02:00
669705
assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
670-
check_gzipped_result(path, formatted_lines)
706+
check_zipped_result(path, formatted_lines)
671707

672708
path = write_once.call
673709
assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
674-
check_gzipped_result(path, formatted_lines * 2)
710+
check_zipped_result(path, formatted_lines * 2)
675711

676712
path = write_once.call
677713
assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
678-
check_gzipped_result(path, formatted_lines * 3)
714+
check_zipped_result(path, formatted_lines * 3)
679715
end
680716
end
681717

@@ -871,6 +907,10 @@ def run_and_check(d, symlink_path)
871907
test 'returns .gz for gzip' do
872908
assert_equal '.gz', @i.compression_suffix(:gzip)
873909
end
910+
911+
test 'returns .zstd for zstd' do
912+
assert_equal '.zstd', @i.compression_suffix(:zstd)
913+
end
874914
end
875915

876916
sub_test_case '#generate_path_template' do

test/plugin/test_out_forward.rb

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,15 @@ def try_write(chunk)
342342
assert_equal :gzip, node.instance_variable_get(:@compress)
343343
end
344344

345+
test 'set_compress_is_zstd' do
346+
@d = d = create_driver(config + %[compress zstd])
347+
assert_equal :zstd, d.instance.compress
348+
assert_equal :zstd, d.instance.buffer.compress
349+
350+
node = d.instance.nodes.first
351+
assert_equal :zstd, node.instance_variable_get(:@compress)
352+
end
353+
345354
test 'set_compress_is_gzip_in_buffer_section' do
346355
mock = flexmock($log)
347356
mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>")
@@ -359,6 +368,23 @@ def try_write(chunk)
359368
assert_equal :text, node.instance_variable_get(:@compress)
360369
end
361370

371+
test 'set_compress_is_zstd_in_buffer_section' do
372+
mock = flexmock($log)
373+
mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>")
374+
375+
@d = d = create_driver(config + %[
376+
<buffer>
377+
type memory
378+
compress zstd
379+
</buffer>
380+
])
381+
assert_equal :text, d.instance.compress
382+
assert_equal :zstd, d.instance.buffer.compress
383+
384+
node = d.instance.nodes.first
385+
assert_equal :text, node.instance_variable_get(:@compress)
386+
end
387+
362388
test 'phi_failure_detector disabled' do
363389
@d = d = create_driver(config + %[phi_failure_detector false \n phi_threshold 0])
364390
node = d.instance.nodes.first
@@ -549,6 +575,36 @@ def try_write(chunk)
549575
assert_equal ['test', time, records[1]], events[1]
550576
end
551577

578+
test 'send_comprssed_message_pack_stream_if_compress_is_zstd' do
579+
target_input_driver = create_target_input_driver
580+
581+
@d = d = create_driver(config + %[
582+
flush_interval 1s
583+
compress zstd
584+
])
585+
586+
time = event_time('2011-01-02 13:14:15 UTC')
587+
588+
records = [
589+
{"a" => 1},
590+
{"a" => 2}
591+
]
592+
target_input_driver.run(expect_records: 2) do
593+
d.run(default_tag: 'test') do
594+
records.each do |record|
595+
d.feed(time, record)
596+
end
597+
end
598+
end
599+
600+
event_streams = target_input_driver.event_streams
601+
assert_true event_streams[0][1].is_a?(Fluent::CompressedMessagePackEventStream)
602+
603+
events = target_input_driver.events
604+
assert_equal ['test', time, records[0]], events[0]
605+
assert_equal ['test', time, records[1]], events[1]
606+
end
607+
552608
test 'send_to_a_node_supporting_responses' do
553609
target_input_driver = create_target_input_driver
554610

0 commit comments

Comments
 (0)