@@ -40,6 +40,8 @@ use crate::table::{Column, Row, Table};
40
40
use crate :: { error, Result } ;
41
41
use snafu:: { ensure, ResultExt } ;
42
42
43
+ pub type RequestId = i64 ;
44
+
43
45
// Macro to generate array conversion for simple types
44
46
macro_rules! build_primitive_array {
45
47
( $rows: expr, $col_idx: expr, $getter: ident, $array_type: ty) => { {
@@ -157,16 +159,16 @@ pub struct BulkStreamWriter {
157
159
table_schema : Vec < Column > ,
158
160
// Cache the Arrow schema to avoid recreating it for each batch
159
161
arrow_schema : Arc < Schema > ,
160
- next_request_id : i64 ,
162
+ next_request_id : RequestId ,
161
163
encoder : FlightEncoder ,
162
164
schema_sent : bool ,
163
165
// Parallel processing fields
164
166
parallelism : usize ,
165
167
timeout : Duration ,
166
168
// Track pending requests: request_id -> sent_time
167
- pending_requests : HashMap < i64 , Instant > ,
169
+ pending_requests : HashMap < RequestId , Instant > ,
168
170
// Cache completed responses that were processed but not yet retrieved
169
- completed_responses : HashMap < i64 , DoPutResponse > ,
171
+ completed_responses : HashMap < RequestId , DoPutResponse > ,
170
172
}
171
173
172
174
impl BulkStreamWriter {
@@ -219,23 +221,14 @@ impl BulkStreamWriter {
219
221
220
222
/// Write rows to the stream using the fixed table schema
221
223
pub async fn write_rows ( & mut self , rows : Vec < Row > ) -> Result < DoPutResponse > {
222
- if rows. is_empty ( ) {
223
- // Return a dummy response for empty input
224
- return Ok ( DoPutResponse :: new ( 0 , 0 ) ) ;
225
- }
226
-
227
224
// Use the async implementation and wait for the response
228
225
let request_id = self . write_rows_async ( rows) . await ?;
229
226
self . wait_for_response ( request_id) . await
230
227
}
231
228
232
229
/// Submit rows for writing without waiting for response
233
230
/// Returns a request_id that can be used to wait for the specific response
234
- pub async fn write_rows_async ( & mut self , rows : Vec < Row > ) -> Result < i64 > {
235
- if rows. is_empty ( ) {
236
- return Ok ( 0 ) ;
237
- }
238
-
231
+ pub async fn write_rows_async ( & mut self , rows : Vec < Row > ) -> Result < RequestId > {
239
232
let record_batch = self . rows_to_record_batch ( & rows) ?;
240
233
let request_id = self . submit_record_batch ( record_batch) . await ?;
241
234
@@ -381,8 +374,7 @@ impl BulkStreamWriter {
381
374
}
382
375
383
376
// Send the request
384
- self . next_request_id += 1 ;
385
- let request_id = self . next_request_id ;
377
+ let request_id = self . next_request_id ( ) ;
386
378
let message = FlightMessage :: RecordBatch ( batch) ;
387
379
let mut data = self . encoder . encode ( message) ;
388
380
let metadata = DoPutMetadata :: new ( request_id) ;
@@ -437,7 +429,7 @@ impl BulkStreamWriter {
437
429
438
430
/// Convert rows to Arrow RecordBatch using cached schema
439
431
fn rows_to_record_batch ( & self , rows : & [ Row ] ) -> Result < RecordBatch > {
440
- ensure ! ( !rows. is_empty( ) , error:: EmptyTableSnafu ) ;
432
+ ensure ! ( !rows. is_empty( ) , error:: EmptyRowsSnafu ) ;
441
433
442
434
// Convert all rows to arrays
443
435
let arrays = self . rows_to_arrays ( rows) ?;
@@ -618,6 +610,15 @@ impl BulkStreamWriter {
618
610
619
611
Ok ( all_responses)
620
612
}
613
+
614
+ fn next_request_id ( & mut self ) -> RequestId {
615
+ // Skip ID 0 as it's reserved for special cases
616
+ self . next_request_id = self . next_request_id . wrapping_add ( 1 ) ;
617
+ if self . next_request_id == 0 {
618
+ self . next_request_id = 1 ;
619
+ }
620
+ self . next_request_id
621
+ }
621
622
}
622
623
623
624
// Helper function to convert ColumnDataType to Arrow DataType
0 commit comments