@@ -508,7 +508,7 @@ def create_driver(conf=base_config)
508
508
end
509
509
510
510
sub_test_case 'compressed packed forward' do
511
- test 'set_compress_to_option ' do
511
+ test 'set_compress_to_option_gzip ' do
512
512
@d = d = create_driver
513
513
514
514
time_i = event_time ( "2011-01-02 13:14:15 UTC" ) . to_i
@@ -535,6 +535,33 @@ def create_driver(conf=base_config)
535
535
assert_equal events , d . events
536
536
end
537
537
538
+ test 'set_compress_to_option_zstd' do
539
+ @d = d = create_driver
540
+
541
+ time_i = event_time ( "2011-01-02 13:14:15 UTC" ) . to_i
542
+ events = [
543
+ [ "tag1" , time_i , { "a" => 1 } ] ,
544
+ [ "tag1" , time_i , { "a" => 2 } ]
545
+ ]
546
+
547
+ # create compressed entries
548
+ entries = ''
549
+ events . each do |_tag , _time , record |
550
+ v = [ _time , record ] . to_msgpack
551
+ entries << compress ( v , type : :zstd )
552
+ end
553
+ chunk = [ "tag1" , entries , { 'compressed' => 'zstd' } ] . to_msgpack
554
+
555
+ d . run do
556
+ Fluent ::MessagePackFactory . msgpack_unpacker . feed_each ( chunk ) do |obj |
557
+ option = d . instance . send ( :on_message , obj , chunk . size , DUMMY_SOCK )
558
+ assert_equal 'zstd' , option [ 'compressed' ]
559
+ end
560
+ end
561
+
562
+ assert_equal events , d . events
563
+ end
564
+
538
565
test 'create_CompressedMessagePackEventStream_with_gzip_compress_option' do
539
566
@d = d = create_driver
540
567
@@ -562,6 +589,34 @@ def create_driver(conf=base_config)
562
589
end
563
590
end
564
591
end
592
+
593
+ test 'create_CompressedMessagePackEventStream_with_zstd_compress_option' do
594
+ @d = d = create_driver
595
+
596
+ time_i = event_time ( "2011-01-02 13:14:15 UTC" ) . to_i
597
+ events = [
598
+ [ "tag1" , time_i , { "a" => 1 } ] ,
599
+ [ "tag1" , time_i , { "a" => 2 } ]
600
+ ]
601
+
602
+ # create compressed entries
603
+ entries = ''
604
+ events . each do |_tag , _time , record |
605
+ v = [ _time , record ] . to_msgpack
606
+ entries << compress ( v )
607
+ end
608
+ chunk = [ "tag1" , entries , { 'compressed' => 'zstd' } ] . to_msgpack
609
+
610
+ # check CompressedMessagePackEventStream is created
611
+ mock ( Fluent ::CompressedMessagePackEventStream ) . new ( entries , nil , 0 , compress : :zstd )
612
+
613
+ d . run do
614
+ Fluent ::MessagePackFactory . msgpack_unpacker . feed_each ( chunk ) do |obj |
615
+ option = d . instance . send ( :on_message , obj , chunk . size , DUMMY_SOCK )
616
+ assert_equal 'zstd' , option [ 'compressed' ]
617
+ end
618
+ end
619
+ end
565
620
end
566
621
567
622
sub_test_case 'warning' do
0 commit comments