Skip to content

Commit 37ecf9f

Browse files
nevedarenSkCQ
authored andcommitted
[perf] Gracefully handle trace values duplicates
Should fix errors like `pgx - Exec map[args:[] err:ERROR: com.google.api.gax.rpc.OutOfRangeException: io.grpc.StatusRuntimeException: OUT_OF_RANGE: Cannot affect a row second time for key (<unprintable>, 42) (SQLSTATE P0001) pid:44 sql:INSERT INTO`. There is no reason to drop the whole batch of data because of the duplicates. Change-Id: I412ac5d8445b356c150bf4f170776d0758b7aaef Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/1124858 Reviewed-by: Maxim Sheshukov <[email protected]> Commit-Queue: Anri Sidorov <[email protected]>
1 parent 86e69ea commit 37ecf9f

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

perf/go/tracestore/sqltracestore/sqltracestore.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1189,7 +1189,12 @@ func (s *SQLTraceStore) WriteTraces(ctx context.Context, commitNumber types.Comm
11891189
continue
11901190
}
11911191
traceID := traceIDForSQLFromTraceName(traceName)
1192-
traceParams[string(traceID)] = p
1192+
traceIDKey := string(traceID)
1193+
if _, exists := traceParams[traceIDKey]; exists {
1194+
sklog.Warningf("Many values for the same trace name found, skipping all but the first: %s", traceName)
1195+
continue
1196+
}
1197+
traceParams[traceIDKey] = p
11931198
valuesTemplateContext = append(valuesTemplateContext, insertIntoTraceValuesContext{
11941199
MD5HexTraceID: traceID,
11951200
CommitNumber: commitNumber,

perf/go/tracestore/sqltracestore/sqltracestore_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,34 @@ func TestWriteTraces_InsertDifferentValueAndFile_OverwriteExistingTraceValues(t
657657
assert.Equal(t, float32(2.4), trace[1])
658658
}
659659

660+
func TestWriteTraces_DuplicateTraceIDsInBatch_Succeeds(t *testing.T) {
661+
ctx, s := commonTestSetupWithCommits(t)
662+
// We are simulating a scenario where the ingestion process sends two values
663+
// for the exactly same trace in a single batch.
664+
traceNames := []paramtools.Params{
665+
{"config": "8888", "arch": "x86"}, // Entry A
666+
{"config": "8888", "arch": "x86"}, // Entry A (Duplicate of above)
667+
}
668+
values := []float32{1.0, 99.0}
669+
ps := paramtools.ParamSet{
670+
"config": {"8888"},
671+
"arch": {"x86"},
672+
}
673+
674+
err := s.WriteTraces(ctx, types.CommitNumber(2), traceNames, values, ps, file1, time.Time{})
675+
require.NoError(t, err)
676+
677+
keys := []string{",arch=x86,config=8888,"}
678+
ts, _, _, err := s.ReadTraces(ctx, types.TileNumber(0), keys)
679+
require.NoError(t, err)
680+
trace := ts[",arch=x86,config=8888,"]
681+
actualValue := trace[2]
682+
// Verify that the stored value is one of the inputs.
683+
// We don't care if it's 1.0 (first-write-wins) or 99.0 (last-write-wins),
684+
// as long as the write succeeded and data is present.
685+
assert.Contains(t, []float32{1.0, 99.0}, actualValue, "The stored value should be one of the input values from the batch")
686+
}
687+
660688
func TestReadTraces_WithDiscontinueCommitNumbers_Succeed(t *testing.T) {
661689
ctx, s := commonTestSetupWithCommits(t)
662690

0 commit comments

Comments
 (0)