Skip to content

Commit b077a9d

Browse files
authored
Merge pull request #52 from databendcloud/fix/enlarge-retry
fix: enlarge retry count
2 parents e18d0af + 5b75003 commit b077a9d

File tree

2 files changed

+21
-7
lines changed

2 files changed

+21
-7
lines changed

ingester/ingest_databend.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
var (
2626
ErrUploadStageFailed = errors.New("upload stage failed")
2727
ErrCopyIntoFailed = errors.New("copy into failed")
28+
ErrGetPresignUrl = errors.New("failed to get presigned url")
2829
)
2930

3031
type databendIngester struct {
@@ -86,14 +87,12 @@ func (ig *databendIngester) IngestData(threadNum int, columns []string, batchDat
8687

8788
stage, err := ig.uploadToStage(fileName)
8889
if err != nil {
89-
l.Errorf("upload to stage failed: %v\n", err)
9090
return err
9191
}
9292

9393
copyIntoStartTime := time.Now()
9494
err = ig.copyInto(stage)
9595
if err != nil {
96-
l.Errorf("copy into failed: %v\n", err)
9796
return err
9897
}
9998
l.Infof("thread-%d: copy into cost: %v ms", threadNum, time.Since(copyIntoStartTime).Milliseconds())
@@ -137,7 +136,7 @@ func (ig *databendIngester) uploadToStage(fileName string) (*godatabend.StageLoc
137136
presignedStartTime := time.Now()
138137
presigned, err := apiClient.GetPresignedURL(context.Background(), stage)
139138
if err != nil {
140-
return nil, errors.Wrap(err, "failed to get presigned url")
139+
return nil, errors.Wrap(ErrGetPresignUrl, err.Error())
141140
}
142141
logrus.Infof("get presigned url cost: %v ms", time.Since(presignedStartTime).Milliseconds())
143142

@@ -206,22 +205,37 @@ func execute(db *sql.DB, sql string) error {
206205

207206
func (ig *databendIngester) DoRetry(f retry.RetryableFunc) error {
208207
delay := time.Second
209-
maxDelay := 30 * time.Minute
208+
maxDelay := 60 * time.Minute
209+
maxAttempts := 500
210+
attempt := 0
211+
210212
return retry.Do(
211213
func() error {
212-
return f()
214+
err := f()
215+
if err != nil {
216+
logrus.Infof("Attempt %d failed: %v", attempt, err)
217+
}
218+
attempt++
219+
return err
213220
},
214221
retry.RetryIf(func(err error) bool {
215222
if err == nil {
216223
return false
217224
}
218-
if errors.Is(err, ErrUploadStageFailed) || errors.Is(err, ErrCopyIntoFailed) {
225+
if attempt >= maxAttempts {
226+
logrus.Warnf("Reached maximum retry attempts (%d)", maxAttempts)
227+
return false
228+
}
229+
if errors.Is(err, ErrUploadStageFailed) ||
230+
errors.Is(err, ErrCopyIntoFailed) ||
231+
errors.Is(err, ErrGetPresignUrl) {
219232
return true
220233
}
221234
return false
222235
}),
223236
retry.Delay(delay),
224237
retry.MaxDelay(maxDelay),
225238
retry.DelayType(retry.BackOffDelay),
239+
retry.Attempts(uint(maxAttempts)),
226240
)
227241
}

worker/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (w *Worker) stepBatch() error {
111111
logrus.Infof("condition: %s", condition)
112112
err := w.stepBatchWithCondition(idx, condition)
113113
if err != nil {
114-
logrus.Errorf("stepBatchWithCondition failed: %v", err)
114+
logrus.Errorf("Thread %d, stepBatchWithCondition failed: %v", idx, err)
115115
}
116116
}
117117
}(i)

0 commit comments

Comments
 (0)