Skip to content

Commit 498ed55

Browse files
author
abushwang
committed
fix panic when chunk size exceeds merge buffer size
Signed-off-by: abushwang <[email protected]>
1 parent ddf07e9 commit 498ed55

File tree

1 file changed

+84
-2
lines changed

1 file changed

+84
-2
lines changed

fs/reader/reader.go

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
508508
offset int64
509509
firstChunkOffset int64
510510
totalSize int64
511+
hasLargeChunk bool
511512
)
512513

513514
var chunks []chunkData
@@ -516,6 +517,10 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
516517
if !ok {
517518
break
518519
}
520+
// Check if any chunk size exceeds merge buffer size to avoid bounds out of range
521+
if chunkSize > mergeBufferSize {
522+
hasLargeChunk = true
523+
}
519524
chunks = append(chunks, chunkData{
520525
offset: chunkOffset,
521526
size: chunkSize,
@@ -530,8 +535,14 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
530535
// cache.PassThrough() is necessary to take over files
531536
r, err := sf.gr.cache.Get(id, cache.PassThrough())
532537
if err != nil {
533-
if err := sf.prefetchEntireFile(id, chunks, totalSize, mergeBufferSize, mergeWorkerCount); err != nil {
534-
return 0, err
538+
if hasLargeChunk {
539+
if err := sf.prefetchEntireFileSequential(id); err != nil {
540+
return 0, err
541+
}
542+
} else {
543+
if err := sf.prefetchEntireFile(id, chunks, totalSize, mergeBufferSize, mergeWorkerCount); err != nil {
544+
return 0, err
545+
}
535546
}
536547

537548
// just retry once to avoid exception stuck
@@ -553,6 +564,77 @@ func (sf *file) GetPassthroughFd(mergeBufferSize int64, mergeWorkerCount int) (u
553564
return fd, nil
554565
}
555566

567+
// prefetchEntireFileSequential uses the legacy sequential approach for processing chunks
568+
// when chunk size exceeds merge buffer size to avoid slice bounds out of range panic
569+
func (sf *file) prefetchEntireFileSequential(entireCacheID string) error {
570+
w, err := sf.gr.cache.Add(entireCacheID)
571+
if err != nil {
572+
return fmt.Errorf("failed to create cache writer: %w", err)
573+
}
574+
defer w.Close()
575+
576+
var (
577+
offset int64
578+
firstChunkOffset int64 = -1
579+
totalSize int64
580+
)
581+
582+
for {
583+
chunkOffset, chunkSize, chunkDigestStr, ok := sf.fr.ChunkEntryForOffset(offset)
584+
if !ok {
585+
break
586+
}
587+
if firstChunkOffset == -1 {
588+
firstChunkOffset = chunkOffset
589+
}
590+
591+
id := genID(sf.id, chunkOffset, chunkSize)
592+
b := sf.gr.bufPool.Get().(*bytes.Buffer)
593+
b.Reset()
594+
b.Grow(int(chunkSize))
595+
ip := b.Bytes()[:chunkSize]
596+
597+
if r, err := sf.gr.cache.Get(id); err == nil {
598+
n, err := r.ReadAt(ip, 0)
599+
if (err == nil || err == io.EOF) && int64(n) == chunkSize {
600+
if _, err := w.Write(ip[:n]); err != nil {
601+
r.Close()
602+
sf.gr.putBuffer(b)
603+
w.Abort()
604+
return fmt.Errorf("failed to write cached data: %w", err)
605+
}
606+
totalSize += int64(n)
607+
offset = chunkOffset + int64(n)
608+
r.Close()
609+
sf.gr.putBuffer(b)
610+
continue
611+
}
612+
r.Close()
613+
}
614+
615+
if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF {
616+
sf.gr.putBuffer(b)
617+
w.Abort()
618+
return fmt.Errorf("failed to read data: %w", err)
619+
}
620+
if err := sf.gr.verifyOneChunk(sf.id, ip, chunkDigestStr); err != nil {
621+
sf.gr.putBuffer(b)
622+
w.Abort()
623+
return err
624+
}
625+
if _, err := w.Write(ip); err != nil {
626+
sf.gr.putBuffer(b)
627+
w.Abort()
628+
return fmt.Errorf("failed to write fetched data: %w", err)
629+
}
630+
totalSize += chunkSize
631+
offset = chunkOffset + chunkSize
632+
sf.gr.putBuffer(b)
633+
}
634+
635+
return w.Commit()
636+
}
637+
556638
type batchWorkerArgs struct {
557639
workerID int
558640
chunks []chunkData

0 commit comments

Comments
 (0)