Skip to content

Auto-detection of nullable columns in local.order_by incorrectly applies allow_nullable_key to Distributed table instead of local tables #520

@jeffrytyn

Description

@jeffrytyn

When creating a Distributed table with local.order_by containing nullable columns, the connector's auto-detection logic adds settings.allow_nullable_key=1 to the Distributed table instead of the local MergeTree tables, causing a failure because Distributed tables don't support this setting.

Environment:

  • ClickHouse version: 24.8.14.1
  • Spark connector version: 0.10.0
  • Spark version: 3.5.6

Steps to Reproduce:

from pyspark.sql import SparkSession                                                                                                                                                                                                                                                                                 
                                                                                                                                                                                                                                                                                                                    
spark = SparkSession.builder                                                                                                                                                                                                                                                                                       
   .config("spark.jars.packages", "com.clickhouse.spark:clickhouse-spark-runtime-3.5_2.12:0.10.0,com.clickhouse:clickhouse-jdbc:0.9.4::all")                                                                                                                                                                                                                
   .getOrCreate()                                                                                                                                                                                                                                                                                                   
                                                                                                                                                                                                                                                                                                                    
# Create DataFrame with nullable string columns                                                                                                                                                                                                                                                                      
data = [                                                                                                                                                                                                                                                                                                             
   ("v1", "v2"),                                                                                                                                                                                                                                                                 
   ("v3", "v4"),                                                                                                                                                                                                                                                                 
]                                                                                                                                                                                                                                                                                                                    
df = spark.createDataFrame(data, ["col1", "col2"])                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                                                                    
# Write with Distributed engine and local.order_by containing nullable columns                                                                                                                                                                                                                                       
df.write.format("clickhouse")                                                                                                                                                                                                                                                                                      
   .option("host", "clickhouse-host")                                                                                                                                                                                                                                                                            
   .option("database", "test_db")                                                                                                                                                                                                                                                                                
   .option("table", "test_table")                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              
   .option("cluster", "cluster1shards")                                                                                                                                                                                                                                                                            
   .option("engine", "Distributed")                                                                                                                                                                                                                                                                               
   .option("local.order_by", "col1")                                                                                                                                                                                                                                                        
   .option("local.settings.allow_nullable_key", "1")                                                                                                                                                                                                                                                              
   .save()   

Expected Behavior:

The connector should:

  1. Detect nullable columns in local.order_by
  2. Apply settings.allow_nullable_key=1 to the local MergeTree tables being created on cluster nodes
  3. Successfully create both local and distributed tables

Actual Behavior:

The connector logs show:
INFO ClickHouseTableProvider: ORDER BY contains nullable columns, adding settings.allow_nullable_key=1

But the setting is applied to the Distributed table instead of the local tables, causing:

 com.clickhouse.spark.exception.CHServerException: Code: 115. DB::Exception:                                                                                                                                                                                                                                          
 Unknown setting 'allow_nullable_key': for storage Distributed. (UNKNOWN_SETTING)                                                                                                                                                                                                                                     

Root Cause:

The ClickHouseTableProvider detects nullable columns from the local.order_by parameter but adds the setting globally (without the local. prefix), which gets applied to the Distributed table creation instead of only to the local table creation.
Issue seems to be in this function:

private def ensureNullableKeySupport(schema: StructType, props: Map[String, String]): Map[String, String] = {

Suggested Fix:

When engine="Distributed" is detected and nullable columns are found in local.order_by, the auto-added setting should be prefixed with local. (i.e., local.settings.allow_nullable_key=1) so it only applies to the local MergeTree tables.

Workaround:

Currently, the only workaround is to ensure all columns in local.order_by are non-nullable by casting or coalescing nulls before writing.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions