@@ -31,6 +31,19 @@ import (
3131
3232const merger = "merger"
3333
34+ // used in the context of mergerCtrl to provide a way to verify
35+ // the completion of a merge operation
36+ const mergeDoneKey = "mergeDone"
37+
38+ type mergeDoneChan chan error
39+
40+ // used in the context of mergerCtrl to provide a way to use
41+ // a custom merge plan instead of the one generated by the
42+ // default merge planner
43+ const mergePlanFuncKey = "mergePlanFunc"
44+
45+ type mergePlanFunc func (* IndexSnapshot ) (* mergeplan.MergePlan , error )
46+
3447func (s * Scorch ) mergerLoop () {
3548 defer func () {
3649 if r := recover (); r != nil {
@@ -95,11 +108,9 @@ OUTER:
95108 continue OUTER
96109 }
97110
98- startTime := time .Now ()
99-
100111 // lets get started
101- err := s . planMergeAtSnapshot ( ctrlMsg . ctx , ctrlMsg . options ,
102- ourSnapshot )
112+ startTime := time . Now ()
113+ err := s . planMergeAtSnapshot ( ctrlMsg , ourSnapshot )
103114 if err != nil {
104115 atomic .StoreUint64 (& s .iStats .mergeEpoch , 0 )
105116 if err == segment .ErrClosed {
@@ -286,42 +297,64 @@ func (w *closeChWrapper) listen() {
286297 }
287298}
288299
289- func (s * Scorch ) planMergeAtSnapshot (ctx context.Context ,
290- options * mergeplan.MergePlanOptions , ourSnapshot * IndexSnapshot ) error {
291- // build list of persisted segments in this snapshot
292- var onlyPersistedSnapshots []mergeplan.Segment
293- for _ , segmentSnapshot := range ourSnapshot .segment {
294- if _ , ok := segmentSnapshot .segment .(segment.PersistedSegment ); ok {
295- onlyPersistedSnapshots = append (onlyPersistedSnapshots , segmentSnapshot )
300+ // planMergeAtSnapshot plans and executes the merge operations for a given snapshot
301+ // if there is a custom merge plan function provided, it uses that to get the merge plan
302+ // otherwise, it builds the merge plan using the default planner and executes the merge tasks in the plan.
303+ func (s * Scorch ) planMergeAtSnapshot (ctrlMsg * mergerCtrl , ourSnapshot * IndexSnapshot ) error {
304+ var mergePlan * mergeplan.MergePlan
305+ // if a merge plan function is provided in the context, use it to get the merge plan
306+ if mergePlanFunc , ok := ctrlMsg .ctx .Value (mergePlanFuncKey ).(mergePlanFunc ); ok {
307+ var err error
308+ mergePlan , err = mergePlanFunc (ourSnapshot )
309+ if err != nil {
310+ atomic .AddUint64 (& s .stats .TotFileMergePlanErr , 1 )
311+ return fmt .Errorf ("merge planning err: %v" , err )
296312 }
297313 }
298314
299- atomic .AddUint64 (& s .stats .TotFileMergePlan , 1 )
315+ // default to making a merge plan if a custom one is not provided
316+ if mergePlan == nil {
317+ // build list of persisted segments in this snapshot
318+ var onlyPersistedSnapshots []mergeplan.Segment
319+ for _ , segmentSnapshot := range ourSnapshot .segment {
320+ if _ , ok := segmentSnapshot .segment .(segment.PersistedSegment ); ok {
321+ onlyPersistedSnapshots = append (onlyPersistedSnapshots , segmentSnapshot )
322+ }
323+ }
300324
301- // give this list to the planner
302- resultMergePlan , err := mergeplan .Plan (onlyPersistedSnapshots , options )
303- if err != nil {
304- atomic .AddUint64 (& s .stats .TotFileMergePlanErr , 1 )
305- return fmt .Errorf ("merge planning err: %v" , err )
306- }
307- if resultMergePlan == nil {
308- // nothing to do
309- atomic .AddUint64 (& s .stats .TotFileMergePlanNone , 1 )
310- return nil
311- }
312- atomic .AddUint64 (& s .stats .TotFileMergePlanOk , 1 )
325+ atomic .AddUint64 (& s .stats .TotFileMergePlan , 1 )
313326
314- atomic .AddUint64 (& s .stats .TotFileMergePlanTasks , uint64 (len (resultMergePlan .Tasks )))
327+ // give this list to the planner
328+ var err error
329+ mergePlan , err = mergeplan .Plan (onlyPersistedSnapshots , ctrlMsg .options )
330+ if err != nil {
331+ atomic .AddUint64 (& s .stats .TotFileMergePlanErr , 1 )
332+ return fmt .Errorf ("merge planning err: %v" , err )
333+ }
334+ if mergePlan == nil {
335+ // nothing to do
336+ atomic .AddUint64 (& s .stats .TotFileMergePlanNone , 1 )
337+ return nil
338+ }
339+ }
315340
316- // process tasks in serial for now
317- var filenames [] string
341+ atomic . AddUint64 ( & s . stats . TotFileMergePlanOk , 1 )
342+ atomic . AddUint64 ( & s . stats . TotFileMergePlanTasks , uint64 ( len ( mergePlan . Tasks )))
318343
319- cw := newCloseChWrapper (s .closeCh , ctx )
344+ cw := newCloseChWrapper (s .closeCh , ctrlMsg . ctx )
320345 defer cw .close ()
321-
322346 go cw .listen ()
323347
324- for _ , task := range resultMergePlan .Tasks {
348+ var filenames []string
349+ var err error
350+ defer func () {
351+ // send error to done channel if present
352+ if done , ok := cw .ctx .Value (mergeDoneKey ).(chan error ); ok {
353+ done <- err
354+ }
355+ }()
356+
357+ for _ , task := range mergePlan .Tasks {
325358 if len (task .Segments ) == 0 {
326359 atomic .AddUint64 (& s .stats .TotFileMergePlanTasksSegmentsEmpty , 1 )
327360 continue
@@ -369,7 +402,8 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
369402
370403 atomic .AddUint64 (& s .stats .TotFileMergeZapBeg , 1 )
371404 prevBytesReadTotal := cumulateBytesRead (segmentsToMerge )
372- newDocNums , _ , err := s .segPlugin .MergeUsing (segmentsToMerge , docsToDrop , path ,
405+ var newDocNums [][]uint64
406+ newDocNums , _ , err = s .segPlugin .MergeUsing (segmentsToMerge , docsToDrop , path ,
373407 cw .cancelCh , s , s .segmentConfig )
374408 atomic .AddUint64 (& s .stats .TotFileMergeZapEnd , 1 )
375409
@@ -422,7 +456,8 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
422456 select {
423457 case <- s .closeCh :
424458 _ = seg .Close ()
425- return segment .ErrClosed
459+ err = segment .ErrClosed
460+ return err
426461 case s .merges <- sm :
427462 atomic .AddUint64 (& s .stats .TotFileMergeIntroductions , 1 )
428463 }
0 commit comments