Skip to content

Commit 0d0bf95

Browse files
author
yongwoo.kim
committed
Merge pull request #1 from ddukbg/feature/zst-compression
feat: Add Zstd compression support for S3 plugin Signed-off-by: ddukbg <[email protected]>
2 parents 803cac2 + 4e1a17a commit 0d0bf95

File tree

4 files changed

+64
-1
lines changed

4 files changed

+64
-1
lines changed

fluent-plugin-s3.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@ Gem::Specification.new do |gem|
2626
# aws-sdk-core requires one of ox, oga, libxml, nokogiri or rexml,
2727
# and rexml is no longer default gem as of Ruby 3.0.
2828
gem.add_development_dependency "rexml"
29+
gem.add_development_dependency 'zstd-ruby'
2930
end

lib/fluent/plugin/out_s3.rb

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
require 'time'
77
require 'tempfile'
88
require 'securerandom'
9+
require 'zstd-ruby'
910

1011
module Fluent::Plugin
1112
class S3Output < Output
@@ -630,6 +631,28 @@ def compress(chunk, tmp)
630631
end
631632
end
632633

634+
class ZstdCompressor < Compressor
635+
def ext
636+
'zst'.freeze
637+
end
638+
639+
def content_type
640+
'application/x-zst'.freeze
641+
end
642+
643+
def compress(chunk, tmp)
644+
uncompressed_data = ''
645+
chunk.open do |io|
646+
uncompressed_data = io.read
647+
end
648+
compressed_data = Zstd.compress(uncompressed_data, level: @level)
649+
tmp.write(compressed_data)
650+
rescue => e
651+
log.warn "zstd compression failed: #{e.message}"
652+
raise e
653+
end
654+
end
655+
633656
class TextCompressor < Compressor
634657
def ext
635658
'txt'.freeze
@@ -658,7 +681,8 @@ def content_type
658681
{
659682
'gzip' => GzipCompressor,
660683
'json' => JsonCompressor,
661-
'text' => TextCompressor
684+
'text' => TextCompressor,
685+
'zstd' => ZstdCompressor
662686
}.each { |name, compressor|
663687
COMPRESSOR_REGISTRY.register(name, compressor)
664688
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
require 'zstd-ruby'
2+
3+
module Fluent::Plugin
4+
class S3Output
5+
class ZstdCompressor < Compressor
6+
S3Output.register_compressor('zstd', self)
7+
8+
config_param :level, :integer, default: 3, desc: "Compression level for zstd (1-22)"
9+
10+
def initialize(opts = {})
11+
super()
12+
@buffer_type = opts[:buffer_type]
13+
@log = opts[:log]
14+
end
15+
16+
def ext
17+
'zst'.freeze
18+
end
19+
20+
def content_type
21+
'application/x-zstd'.freeze
22+
end
23+
24+
def compress(chunk, tmp)
25+
uncompressed_data = ''
26+
chunk.open do |io|
27+
uncompressed_data = io.read
28+
end
29+
compressed_data = Zstd.compress(uncompressed_data, level: @level)
30+
tmp.write(compressed_data)
31+
rescue => e
32+
log.warn "zstd compression failed: #{e.message}"
33+
raise e
34+
end
35+
end
36+
end
37+
end

test/test_in_s3.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def test_unknown_store_as
9393
"text" => ["text", "txt", "text/plain"],
9494
"gzip" => ["gzip", "gz", "application/x-gzip"],
9595
"gzip_command" => ["gzip_command", "gz", "application/x-gzip"],
96+
"zstd" => ["zstd", "zst", "application/x-zstd"],
9697
"lzo" => ["lzo", "lzo", "application/x-lzop"],
9798
"lzma2" => ["lzma2", "xz", "application/x-xz"])
9899
def test_extractor(data)

0 commit comments

Comments
 (0)