11use crate :: rest_ingest:: rest_source:: SrcTableId ;
2- use crate :: rest_ingest:: rest_source:: { EventOperation , RestEvent } ;
2+ use crate :: rest_ingest:: rest_source:: { FileEventOperation , RestEvent , RowEventOperation } ;
33use crate :: { Error , Result } ;
44use moonlink:: TableEvent ;
55use std:: collections:: HashMap ;
6+ use std:: sync:: Arc ;
7+ use tokio:: sync:: Mutex ;
68use tokio:: sync:: { mpsc, watch} ;
79use tracing:: { debug, warn} ;
810
@@ -64,6 +66,10 @@ impl RestSink {
6466 /// This is the main entry point for REST event processing, similar to moonlink_sink's process_cdc_event
6567 pub async fn process_rest_event ( & mut self , rest_event : RestEvent ) -> Result < ( ) > {
6668 match rest_event {
69+ // ==================
70+ // Row events
71+ // ==================
72+ //
6773 RestEvent :: RowEvent {
6874 src_table_id,
6975 operation,
@@ -83,19 +89,48 @@ impl RestSink {
8389 self . process_commit_event ( lsn, src_table_id, timestamp)
8490 . await
8591 }
92+ // ==================
93+ // Table events
94+ // ==================
95+ //
96+ RestEvent :: FileEvent {
97+ operation,
98+ table_events,
99+ } => self . process_file_event_boxed ( operation, table_events) . await ,
100+ }
101+ }
102+
103+ /// Process a file event (upload files).
104+ fn process_file_event_boxed < ' a > (
105+ & ' a mut self ,
106+ operation : FileEventOperation ,
107+ table_events : Arc < Mutex < tokio:: sync:: mpsc:: UnboundedReceiver < RestEvent > > > ,
108+ ) -> std:: pin:: Pin < Box < dyn std:: future:: Future < Output = Result < ( ) > > + Send + ' a > > {
109+ Box :: pin ( async move { self . process_file_event_impl ( operation, table_events) . await } )
110+ }
111+ async fn process_file_event_impl (
112+ & mut self ,
113+ operation : FileEventOperation ,
114+ table_events : Arc < Mutex < tokio:: sync:: mpsc:: UnboundedReceiver < RestEvent > > > ,
115+ ) -> Result < ( ) > {
116+ assert_eq ! ( operation, FileEventOperation :: Upload ) ;
117+ let mut guard = table_events. lock ( ) . await ;
118+ while let Some ( event) = guard. recv ( ) . await {
119+ self . process_rest_event ( event) . await ?;
86120 }
121+ Ok ( ( ) )
87122 }
88123
89- /// Process a row event (Insert, Update, Delete)
124+ /// Process a row event (Insert, Update, Delete).
90125 async fn process_row_event (
91126 & self ,
92127 src_table_id : SrcTableId ,
93- operation : EventOperation ,
128+ operation : RowEventOperation ,
94129 row : moonlink:: row:: MoonlinkRow ,
95130 lsn : u64 ,
96131 ) -> Result < ( ) > {
97132 match operation {
98- EventOperation :: Insert => {
133+ RowEventOperation :: Insert => {
99134 let table_event = TableEvent :: Append {
100135 row,
101136 lsn,
@@ -107,7 +142,7 @@ impl RestSink {
107142 self . send_table_event ( src_table_id, table_event) . await ?;
108143 debug ! ( src_table_id, lsn, "processed REST insert event" ) ;
109144 }
110- EventOperation :: Update => {
145+ RowEventOperation :: Update => {
111146 // For updates, we send both delete and append events
112147 // First send delete for the old row (using the same row for simplicity)
113148 let delete_event = TableEvent :: Delete {
@@ -132,7 +167,7 @@ impl RestSink {
132167 self . send_table_event ( src_table_id, append_event) . await ?;
133168 debug ! ( src_table_id, lsn, "processed REST update append event" ) ;
134169 }
135- EventOperation :: Delete => {
170+ RowEventOperation :: Delete => {
136171 let table_event = TableEvent :: Delete {
137172 row,
138173 lsn,
@@ -195,7 +230,7 @@ impl RestSink {
195230#[ cfg( test) ]
196231mod tests {
197232 use super :: * ;
198- use crate :: rest_ingest:: rest_source:: { EventOperation , RestEvent } ;
233+ use crate :: rest_ingest:: rest_source:: { RestEvent , RowEventOperation } ;
199234 use moonlink:: row:: { MoonlinkRow , RowValue } ;
200235 use std:: time:: SystemTime ;
201236 use tokio:: sync:: { mpsc, watch} ;
@@ -320,7 +355,7 @@ mod tests {
320355 // Test Insert event
321356 let insert_event = RestEvent :: RowEvent {
322357 src_table_id,
323- operation : EventOperation :: Insert ,
358+ operation : RowEventOperation :: Insert ,
324359 row : test_row. clone ( ) ,
325360 lsn : 10 ,
326361 timestamp : SystemTime :: now ( ) ,
@@ -338,7 +373,7 @@ mod tests {
338373 // Test Update event (should produce both Delete and Append)
339374 let update_event = RestEvent :: RowEvent {
340375 src_table_id,
341- operation : EventOperation :: Update ,
376+ operation : RowEventOperation :: Update ,
342377 row : test_row. clone ( ) ,
343378 lsn : 20 ,
344379 timestamp : SystemTime :: now ( ) ,
0 commit comments