Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clickhouse-activerecord.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'rake', '~> 13.0'
spec.add_development_dependency 'rspec', '~> 3.4'
spec.add_development_dependency 'pry', '~> 0.12'
spec.add_development_dependency 'rails', '>= 7.1.3'
end
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ def add_column_options!(sql, options)
if options[:codec]
sql.gsub!(/\s+(.*)/, " \\1 CODEC(#{options[:codec]})")
end
if options[:aggregate_function]
sql.gsub!(/(\w+)\s+(.*)/, "\\1 AggregateFunction(#{options[:aggregate_function]}, \\2)")
end
if options[:simple_aggregate_function]
sql.gsub!(/(\w+)\s+(.*)/, "\\1 SimpleAggregateFunction(#{options[:simple_aggregate_function]}, \\2)")
end
sql.gsub!(/(\sString)\(\d+\)/, '\1')
sql << " DEFAULT #{quote_default_expression(options[:default], options[:column])}" if options_include_default?(options)
sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def process_response(res, format, sql = nil)
body = res.body

if body.include?("DB::Exception") && body.match?(DB_EXCEPTION_REGEXP)
raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\n#{res.body}#{sql ? "\nQuery: #{sql}" : ''}"
raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\nQuery ID: #{res.header["x-clickhouse-query-id"]}:\n#{res.body}#{sql ? "\nQuery: #{sql}" : ''}"
else
format_body_response(res.body, format)
end
Expand All @@ -207,7 +207,7 @@ def process_response(res, format, sql = nil)
when /DB::Exception:.*\(DATABASE_ALREADY_EXISTS\)/
raise ActiveRecord::DatabaseAlreadyExists
else
raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\n#{res.body}"
raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\nQuery ID: #{res.header["x-clickhouse-query-id"]}:\n#{res.body}"
end
end
rescue JSON::ParserError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ def integer(*args, **options)
args.each { |name| column(name, kind, **options.except(:limit, :unsigned)) }
end

def float(*args, **options)
kind = :float
kind = :float32 if options[:limit] == 4
kind = :float64 if options[:limit] == 8

args.each { |name| column(name, kind, **options.except(:limit)) }
end

def datetime(*args, **options)
kind = :datetime

Expand Down Expand Up @@ -102,7 +110,7 @@ def column(name, type, index: nil, **options)
private

def valid_column_definition_options
super + [:array, :low_cardinality, :fixed_string, :value, :type, :map, :codec, :unsigned]
super + [:array, :low_cardinality, :fixed_string, :value, :type, :map, :codec, :unsigned, :aggregate_function, :simple_aggregate_function]
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class ClickhouseAdapter < AbstractAdapter
integer: { name: 'UInt32' },
big_integer: { name: 'UInt64' },
float: { name: 'Float32' },
float64: { name: 'Float64' },
decimal: { name: 'Decimal' },
datetime: { name: 'DateTime' },
datetime64: { name: 'DateTime64' },
Expand Down
25 changes: 21 additions & 4 deletions lib/clickhouse-activerecord/schema_dumper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def tables(stream)
def table(table, stream)
if table.match(/^\.inner/).nil?
sql= ""
simple || = ENV['simple'] == 'true'
simple ||= ENV['simple'] == 'true'
unless simple
stream.puts " # TABLE: #{table}"
sql = @connection.show_create_table(table)
Expand Down Expand Up @@ -128,7 +128,7 @@ def function(function, stream)
sql = @connection.show_create_function(function)
if sql
stream.puts " # SQL: #{sql}"
stream.puts " create_function \"#{function}\", \"#{sql.sub(/\ACREATE(OR REPLACE)? FUNCTION .*? AS/, '').strip}\", force: true"
stream.puts " create_function \"#{function}\", \"#{sql.sub(/\ACREATE( OR REPLACE)? FUNCTION .*? AS/, '').strip}\", force: true"
stream.puts
end
end
Expand All @@ -149,8 +149,12 @@ def format_colspec(colspec)
end

def schema_limit(column)
return nil if column.type == :float
super
if column.type == :float
return 4 if column.sql_type == "Float32"
return 8 if column.sql_type == "Float64"
else
super
end
end

def schema_unsigned(column)
Expand All @@ -174,6 +178,18 @@ def schema_low_cardinality(column)
(column.sql_type =~ /LowCardinality\(/).nil? ? nil : true
end

def schema_aggregate_function(column)
match = column.sql_type.match(/((?:Simple|)AggregateFunction)\((.+), (\S+)\)/)

return {} if match.nil? || match.size != 4

type = match[1] == "AggregateFunction" ? :aggregate_function : :simple_aggregate_function
{ type => match[2].inspect }.tap do |spec|
spec[:limit] = 4 if match[3] == "Float32"
spec[:limit] = 8 if match[3] == "Float64"
end
end

# @param [ActiveRecord::ConnectionAdapters::Clickhouse::Column] column
def prepare_column_options(column)
spec = {}
Expand All @@ -185,6 +201,7 @@ def prepare_column_options(column)
end
spec[:low_cardinality] = schema_low_cardinality(column)
spec[:codec] = column.codec.inspect if column.codec
spec.merge! schema_aggregate_function(column)
spec.merge(super).compact
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

class CreateSomeTable < ActiveRecord::Migration[7.1]
def up
create_table :some, id: false do |t|
t.column :col1, "AggregateFunction(sum, Float32)", null: false
t.column :col2, "AggregateFunction(anyLast, Float64)", null: false
t.column :col3, "AggregateFunction(anyLast, DateTime64)", null: false
t.column :col4, "SimpleAggregateFunction(anyLast, DateTime64)", null: false
end
end
end
64 changes: 64 additions & 0 deletions spec/single/schema_dumper_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# frozen_string_literal: true

require 'clickhouse-activerecord/schema_dumper'

RSpec.describe ClickhouseActiverecord::SchemaDumper, :migrations do
let(:directory) { raise 'NotImplemented' }
let(:migrations_dir) { File.join(FIXTURES_PATH, 'migrations', directory) }
let(:migration_context) { ActiveRecord::MigrationContext.new(migrations_dir) }

before do
quietly { migration_context.up }
end

subject do
ClickhouseActiverecord::SchemaDumper.dump
end

describe ".dump" do
context 'aggregate_function' do
let(:directory) { 'schema_table_with_aggregate_function_creation' }

it 'creates a table with aggregate function column for an Int32' do
expect { subject }.to output(
satisfy do |schema|
expect(schema).to match(/t\.float "col1"/)
expect(schema).to match(/"col1"[^\n]+aggregate_function: "sum"/)
expect(schema).to match(/"col1"[^\n]+limit: 4/)
end
).to_stdout_from_any_process
end

it 'creates a table with aggregate function column for an Int64' do
expect { subject }.to output(
satisfy do |schema|
expect(schema).to match(/t\.float "col2"/)
expect(schema).to match(/"col2"[^\n]+aggregate_function: "anyLast"/)
expect(schema).to match(/"col2"[^\n]+limit: 8/)
end
).to_stdout_from_any_process
end

it 'creates a table with aggregate function column for an DateTime64' do
expect { subject }.to output(
satisfy do |schema|
expect(schema).to match(/t\.datetime "col3"/)
expect(schema).to match(/"col3"[^\n]+aggregate_function: "anyLast"/)
expect(schema).to match(/"col3"[^\n]+precision: 3/)
end
).to_stdout_from_any_process
end

it 'creates a table with simple aggregate function column for an DateTime64' do
expect { subject }.to output(
satisfy do |schema|
expect(schema).to match(/t\.datetime "col3"/)
expect(schema).to match(/"col3"[^\n]+aggregate_function: "anyLast"/)
expect(schema).to match(/"col3"[^\n]+precision: 3/)
expect(schema).to match(/t\.datetime "col4", simple_aggregate_function: "anyLast"/)
end
).to_stdout_from_any_process
end
end
end
end