@@ -22,6 +22,7 @@ use std::pin::Pin;
22
22
use std:: sync:: Arc ;
23
23
use std:: time:: { Duration , Instant } ;
24
24
25
+ use greptime_proto:: v1:: SemanticType ;
25
26
use tokio:: select;
26
27
use tokio:: time:: timeout;
27
28
@@ -166,8 +167,9 @@ impl BulkStreamWriter {
166
167
. columns ( )
167
168
. iter ( )
168
169
. map ( |col| {
170
+ let nullable = col. semantic_type != SemanticType :: Timestamp ;
169
171
column_to_arrow_data_type ( col)
170
- . map ( |data_type| Field :: new ( & col. name , data_type, true ) )
172
+ . map ( |data_type| Field :: new ( & col. name , data_type, nullable ) )
171
173
} )
172
174
. collect ( ) ;
173
175
let arrow_schema = Arc :: new ( Schema :: new ( fields?) ) ;
@@ -827,8 +829,9 @@ impl RowBatchBuilder {
827
829
let fields: Result < Vec < Field > > = column_schemas
828
830
. iter ( )
829
831
. map ( |col| {
832
+ let nullable = col. semantic_type != SemanticType :: Timestamp ;
830
833
column_to_arrow_data_type ( col)
831
- . map ( |data_type| Field :: new ( & col. name , data_type, true ) )
834
+ . map ( |data_type| Field :: new ( & col. name , data_type, nullable ) )
832
835
} )
833
836
. collect ( ) ;
834
837
let schema = Arc :: new ( Schema :: new ( fields?) ) ;
@@ -1299,4 +1302,135 @@ mod tests {
1299
1302
assert_eq ! ( rows. len( ) , 2 ) ;
1300
1303
assert ! ( !rows. is_empty( ) ) ;
1301
1304
}
1305
+
1306
+ #[ test]
1307
+ fn test_non_nullable_timestamp_field_with_null_should_error ( ) {
1308
+ // Create schema with timestamp field (non-nullable)
1309
+ let schema = vec ! [
1310
+ Column {
1311
+ name: "ts" . to_string( ) ,
1312
+ data_type: ColumnDataType :: TimestampMillisecond ,
1313
+ semantic_type: SemanticType :: Timestamp ,
1314
+ data_type_extension: None ,
1315
+ } ,
1316
+ Column {
1317
+ name: "value" . to_string( ) ,
1318
+ data_type: ColumnDataType :: Int64 ,
1319
+ semantic_type: SemanticType :: Field ,
1320
+ data_type_extension: None ,
1321
+ } ,
1322
+ ] ;
1323
+
1324
+ let mut rows = Rows :: new ( & schema, 5 , 5 ) . expect ( "Failed to create rows" ) ;
1325
+
1326
+ // Add a row with null timestamp (should cause error when converting to RecordBatch)
1327
+ let row_with_null_timestamp =
1328
+ crate :: table:: Row :: new ( ) . add_values ( vec ! [ Value :: Null , Value :: Int64 ( 42 ) ] ) ;
1329
+
1330
+ rows. add_row ( row_with_null_timestamp)
1331
+ . expect ( "Failed to add row" ) ;
1332
+
1333
+ // Converting to RecordBatch should fail because timestamp is null but field is non-nullable
1334
+ let result = RecordBatch :: try_from ( rows) ;
1335
+ assert ! (
1336
+ result. is_err( ) ,
1337
+ "Should fail when timestamp field contains null value"
1338
+ ) ;
1339
+ }
1340
+
1341
+ #[ test]
1342
+ fn test_nullable_field_with_null_should_succeed ( ) {
1343
+ // Create schema with nullable field
1344
+ let schema = vec ! [
1345
+ Column {
1346
+ name: "ts" . to_string( ) ,
1347
+ data_type: ColumnDataType :: TimestampMillisecond ,
1348
+ semantic_type: SemanticType :: Timestamp ,
1349
+ data_type_extension: None ,
1350
+ } ,
1351
+ Column {
1352
+ name: "value" . to_string( ) ,
1353
+ data_type: ColumnDataType :: Int64 ,
1354
+ semantic_type: SemanticType :: Field ,
1355
+ data_type_extension: None ,
1356
+ } ,
1357
+ ] ;
1358
+
1359
+ let mut rows = Rows :: new ( & schema, 5 , 5 ) . expect ( "Failed to create rows" ) ;
1360
+
1361
+ // Add a row with null value field (should succeed since value field is nullable)
1362
+ let row_with_null_value = crate :: table:: Row :: new ( )
1363
+ . add_values ( vec ! [ Value :: TimestampMillisecond ( 1234567890 ) , Value :: Null ] ) ;
1364
+
1365
+ rows. add_row ( row_with_null_value)
1366
+ . expect ( "Failed to add row" ) ;
1367
+
1368
+ // Converting to RecordBatch should succeed because value field is nullable
1369
+ let result = RecordBatch :: try_from ( rows) ;
1370
+ assert ! (
1371
+ result. is_ok( ) ,
1372
+ "Should succeed when nullable field contains null value"
1373
+ ) ;
1374
+ }
1375
+
1376
+ #[ test]
1377
+ fn test_arrow_schema_nullable_fields ( ) {
1378
+ use arrow_schema:: { DataType , Field } ;
1379
+
1380
+ // Create columns with different semantic types
1381
+ let columns = [
1382
+ Column {
1383
+ name : "ts" . to_string ( ) ,
1384
+ data_type : ColumnDataType :: TimestampMillisecond ,
1385
+ semantic_type : SemanticType :: Timestamp ,
1386
+ data_type_extension : None ,
1387
+ } ,
1388
+ Column {
1389
+ name : "value" . to_string ( ) ,
1390
+ data_type : ColumnDataType :: Int64 ,
1391
+ semantic_type : SemanticType :: Field ,
1392
+ data_type_extension : None ,
1393
+ } ,
1394
+ Column {
1395
+ name : "tag" . to_string ( ) ,
1396
+ data_type : ColumnDataType :: String ,
1397
+ semantic_type : SemanticType :: Tag ,
1398
+ data_type_extension : None ,
1399
+ } ,
1400
+ ] ;
1401
+
1402
+ // Test the logic that creates Arrow schema fields
1403
+ let fields: Vec < Field > = columns
1404
+ . iter ( )
1405
+ . map ( |col| {
1406
+ let nullable = col. semantic_type != SemanticType :: Timestamp ;
1407
+ let data_type = match col. data_type {
1408
+ ColumnDataType :: TimestampMillisecond => {
1409
+ DataType :: Timestamp ( TimeUnit :: Millisecond , None )
1410
+ }
1411
+ ColumnDataType :: Int64 => DataType :: Int64 ,
1412
+ ColumnDataType :: String => DataType :: Utf8 ,
1413
+ _ => DataType :: Utf8 , // fallback
1414
+ } ;
1415
+ Field :: new ( & col. name , data_type, nullable)
1416
+ } )
1417
+ . collect ( ) ;
1418
+
1419
+ assert_eq ! ( fields. len( ) , 3 ) ;
1420
+
1421
+ // Timestamp field should be non-nullable
1422
+ assert ! (
1423
+ !fields[ 0 ] . is_nullable( ) ,
1424
+ "Timestamp field should be non-nullable"
1425
+ ) ;
1426
+ assert_eq ! ( fields[ 0 ] . name( ) , "ts" ) ;
1427
+
1428
+ // Value field should be nullable
1429
+ assert ! ( fields[ 1 ] . is_nullable( ) , "Value field should be nullable" ) ;
1430
+ assert_eq ! ( fields[ 1 ] . name( ) , "value" ) ;
1431
+
1432
+ // Tag field should be nullable
1433
+ assert ! ( fields[ 2 ] . is_nullable( ) , "Tag field should be nullable" ) ;
1434
+ assert_eq ! ( fields[ 2 ] . name( ) , "tag" ) ;
1435
+ }
1302
1436
}
0 commit comments