Skip to content

Commit 3c021f1

Browse files
author
abushwang
committed
fix panic when chunk size exceeds merge buffer size
Signed-off-by: abushwang <[email protected]>
1 parent ddf07e9 commit 3c021f1

File tree

1 file changed

+75
-2
lines changed

1 file changed

+75
-2
lines changed

fs/reader/reader.go

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
508508
offset int64
509509
firstChunkOffset int64
510510
totalSize int64
511+
hasLargeChunk bool
511512
)
512513

513514
var chunks []chunkData
@@ -516,6 +517,10 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
516517
if !ok {
517518
break
518519
}
520+
// Check if any chunk size exceeds merge buffer size to avoid bounds out of range
521+
if chunkSize > mergeBufferSize {
522+
hasLargeChunk = true
523+
}
519524
chunks = append(chunks, chunkData{
520525
offset: chunkOffset,
521526
size: chunkSize,
@@ -530,8 +535,14 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
530535
// cache.PassThrough() is necessary to take over files
531536
r, err := sf.gr.cache.Get(id, cache.PassThrough())
532537
if err != nil {
533-
if err := sf.prefetchEntireFile(id, chunks, totalSize, mergeBufferSize, mergeWorkerCount); err != nil {
534-
return 0, err
538+
if hasLargeChunk {
539+
if err := sf.prefetchEntireFileSequential(id); err != nil {
540+
return 0, err
541+
}
542+
} else {
543+
if err := sf.prefetchEntireFile(id, chunks, totalSize, mergeBufferSize, mergeWorkerCount); err != nil {
544+
return 0, err
545+
}
535546
}
536547

537548
// just retry once to avoid exception stuck
@@ -553,6 +564,68 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
553564
return fd, nil
554565
}
555566

567+
// prefetchEntireFileSequential uses the legacy sequential approach for processing chunks
568+
// when chunk size exceeds merge buffer size to avoid slice bounds out of range panic
569+
func (sf *file) prefetchEntireFileSequential(entireCacheID string) error {
570+
w, err := sf.gr.cache.Add(entireCacheID)
571+
if err != nil {
572+
return fmt.Errorf("failed to create cache writer: %w", err)
573+
}
574+
defer w.Close()
575+
576+
var offset int64
577+
578+
for {
579+
chunkOffset, chunkSize, chunkDigestStr, ok := sf.fr.ChunkEntryForOffset(offset)
580+
if !ok {
581+
break
582+
}
583+
584+
id := genID(sf.id, chunkOffset, chunkSize)
585+
b := sf.gr.bufPool.Get().(*bytes.Buffer)
586+
b.Reset()
587+
b.Grow(int(chunkSize))
588+
ip := b.Bytes()[:chunkSize]
589+
590+
if r, err := sf.gr.cache.Get(id); err == nil {
591+
n, err := r.ReadAt(ip, 0)
592+
if (err == nil || err == io.EOF) && int64(n) == chunkSize {
593+
if _, err := w.Write(ip[:n]); err != nil {
594+
r.Close()
595+
sf.gr.putBuffer(b)
596+
w.Abort()
597+
return fmt.Errorf("failed to write cached data: %w", err)
598+
}
599+
offset = chunkOffset + int64(n)
600+
r.Close()
601+
sf.gr.putBuffer(b)
602+
continue
603+
}
604+
r.Close()
605+
}
606+
607+
if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF {
608+
sf.gr.putBuffer(b)
609+
w.Abort()
610+
return fmt.Errorf("failed to read data: %w", err)
611+
}
612+
if err := sf.gr.verifyOneChunk(sf.id, ip, chunkDigestStr); err != nil {
613+
sf.gr.putBuffer(b)
614+
w.Abort()
615+
return err
616+
}
617+
if _, err := w.Write(ip); err != nil {
618+
sf.gr.putBuffer(b)
619+
w.Abort()
620+
return fmt.Errorf("failed to write fetched data: %w", err)
621+
}
622+
offset = chunkOffset + chunkSize
623+
sf.gr.putBuffer(b)
624+
}
625+
626+
return w.Commit()
627+
}
628+
556629
type batchWorkerArgs struct {
557630
workerID int
558631
chunks []chunkData

0 commit comments

Comments
 (0)