Skip to content

Commit 9c73318

Browse files
authored
feat: [hase-cbt-replication]Add a metrics for hbase<>bigtable timestamp overflow (#3540)
Add a metrics for recording the incompatible mutation when hbase<>bigtable timestamp overflows.
1 parent 164738b commit 9c73318

File tree

7 files changed

+183
-1
lines changed

7 files changed

+183
-1
lines changed

hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-1.x-replication/src/test/java/com/google/cloud/bigtable/hbase1_x/replication/HbaseToCloudBigtableReplicationEndpointTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,4 +406,28 @@ public void testWriteFailureToBigtableDoesNotStallReplication()
406406
"Value mismatch", TestUtils.getValue(0), CellUtil.cloneValue(actualCells.get(1)));
407407
Assert.assertEquals(0, actualCells.get(1).getTimestamp());
408408
}
409+
410+
@Test
411+
public void testHBaseCBTTimestampTruncation() throws IOException, InterruptedException {
412+
Put put = new Put(TestUtils.ROW_KEY);
413+
byte[] val = Bytes.toBytes(1);
414+
put.addColumn(TestUtils.CF1, TestUtils.COL_QUALIFIER, Long.MAX_VALUE - 1, val);
415+
put.addColumn(TestUtils.CF1, TestUtils.COL_QUALIFIER, Long.MAX_VALUE - 1000, val);
416+
hbaseTable.put(put);
417+
418+
TestUtils.waitForReplication(
419+
() -> {
420+
// 1put
421+
return TestReplicationEndpoint.replicatedEntries.get() >= 1;
422+
});
423+
424+
List<Cell> hbaseCells = hbaseTable.get(new Get(TestUtils.ROW_KEY).setMaxVersions()).listCells();
425+
List<Cell> bigtableCells =
426+
cbtTable.get(new Get(TestUtils.ROW_KEY).setMaxVersions()).listCells();
427+
Assert.assertEquals("bigtable cells", 1, bigtableCells.size());
428+
Assert.assertNotEquals(
429+
"Timestamp match for row " + TestUtils.ROW_KEY,
430+
hbaseCells.get(0).getTimestamp(),
431+
bigtableCells.get(0).getTimestamp());
432+
}
409433
}

hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-2.x-replication/src/test/java/com/google/cloud/bigtable/hbase2_x/replication/HbaseToCloudBigtableReplicationEndpointTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,4 +418,28 @@ public void testWriteFailureToBigtableDoesNotStallReplication()
418418
"Value mismatch", TestUtils.getValue(0), CellUtil.cloneValue(actualCells.get(1)));
419419
Assert.assertEquals(0, actualCells.get(1).getTimestamp());
420420
}
421+
422+
@Test
423+
public void testHBaseCBTTimestampTruncation() throws IOException, InterruptedException {
424+
Put put = new Put(TestUtils.ROW_KEY);
425+
byte[] val = Bytes.toBytes(1);
426+
put.addColumn(TestUtils.CF1, TestUtils.COL_QUALIFIER, Long.MAX_VALUE - 1, val);
427+
put.addColumn(TestUtils.CF1, TestUtils.COL_QUALIFIER, Long.MAX_VALUE - 1000, val);
428+
hbaseTable.put(put);
429+
430+
TestUtils.waitForReplication(
431+
() -> {
432+
// 1put
433+
return TestReplicationEndpoint.replicatedEntries.get() >= 1;
434+
});
435+
436+
List<Cell> hbaseCells = hbaseTable.get(new Get(TestUtils.ROW_KEY).setMaxVersions()).listCells();
437+
List<Cell> bigtableCells =
438+
cbtTable.get(new Get(TestUtils.ROW_KEY).setMaxVersions()).listCells();
439+
Assert.assertEquals("bigtable cells", 1, bigtableCells.size());
440+
Assert.assertNotEquals(
441+
"Timestamp match for row " + TestUtils.ROW_KEY,
442+
hbaseCells.get(0).getTimestamp(),
443+
bigtableCells.get(0).getTimestamp());
444+
}
421445
}

hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapter.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package com.google.cloud.bigtable.hbase.replication.adapters;
1818

1919
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY;
20+
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY;
2021
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY;
22+
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY;
2123
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.PUTS_IN_FUTURE_METRIC_KEY;
2224

2325
import com.google.cloud.bigtable.hbase.adapters.DeleteAdapter;
@@ -27,6 +29,7 @@
2729
import org.apache.hadoop.conf.Configuration;
2830
import org.apache.hadoop.hbase.Cell;
2931
import org.apache.hadoop.hbase.CellUtil;
32+
import org.apache.hadoop.hbase.HConstants;
3033
import org.apache.hadoop.hbase.KeyValue;
3134
import org.apache.hadoop.hbase.client.Connection;
3235
import org.slf4j.Logger;
@@ -48,6 +51,8 @@ public abstract class IncompatibleMutationAdapter {
4851
private final Connection connection;
4952
private final Configuration conf;
5053
private final MetricsExporter metricsExporter;
54+
// Maximum timestamp that hbase can send to bigtable in ms.
55+
static final long BIGTABLE_EFFECTIVE_MAX = Long.MAX_VALUE / 1000L;
5156

5257
private void incrementDroppedIncompatibleMutations() {
5358
metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
@@ -57,6 +62,16 @@ private void incrementIncompatibleMutations() {
5762
metricsExporter.incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
5863
}
5964

65+
private void incrementTimestampOverflowMutations() {
66+
metricsExporter.incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 1);
67+
incrementIncompatibleMutations();
68+
}
69+
70+
private void incrementIncompatibleDeletesMutations() {
71+
metricsExporter.incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 1);
72+
incrementIncompatibleMutations();
73+
}
74+
6075
private void incrementPutsInFutureMutations() {
6176
metricsExporter.incCounters(PUTS_IN_FUTURE_METRIC_KEY, 1);
6277
}
@@ -82,6 +97,9 @@ public IncompatibleMutationAdapter(
8297
// Make sure that the counters show up.
8398
metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
8499
metricsExporter.incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
100+
metricsExporter.incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0);
101+
metricsExporter.incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0);
102+
metricsExporter.incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0);
85103
}
86104

87105
private boolean isValidDelete(Cell delete) {
@@ -104,6 +122,24 @@ public final List<Cell> adaptIncompatibleMutations(BigtableWALEntry walEntry) {
104122
List<Cell> returnedCells = new ArrayList<>(cellsToAdapt.size());
105123
for (int index = 0; index < cellsToAdapt.size(); index++) {
106124
Cell cell = cellsToAdapt.get(index);
125+
// check whether there is timestamp overflow from HBase -> CBT and make sure
126+
// it does clash with valid delete which require the timestamp to be
127+
// HConstants.LATEST_TIMESTAMP,
128+
// this will be true for reverse timestamps.
129+
// Do not enable trace logging as there can be many writes of this type.
130+
// The following log message will spam the logs and degrade the replication performance.
131+
if (cell.getTimestamp() >= BIGTABLE_EFFECTIVE_MAX
132+
&& cell.getTimestamp() != HConstants.LATEST_TIMESTAMP) {
133+
incrementTimestampOverflowMutations();
134+
LOG.trace(
135+
"Incompatible entry: "
136+
+ cell
137+
+ " cell time: "
138+
+ cell.getTimestamp()
139+
+ " max timestamp from hbase to bigtable: "
140+
+ BIGTABLE_EFFECTIVE_MAX);
141+
}
142+
107143
// All puts are valid.
108144
if (cell.getTypeByte() == KeyValue.Type.Put.getCode()) {
109145
// flag if put is issued for future timestamp
@@ -126,7 +162,7 @@ public final List<Cell> adaptIncompatibleMutations(BigtableWALEntry walEntry) {
126162
// Incompatible delete: Adapt it.
127163
try {
128164
LOG.info("Encountered incompatible mutation: " + cell);
129-
incrementIncompatibleMutations();
165+
incrementIncompatibleDeletesMutations();
130166
returnedCells.addAll(adaptIncompatibleMutation(walEntry, index));
131167
} catch (UnsupportedOperationException use) {
132168
// Drop the mutation, not dropping it will lead to stalling of replication.

hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/metrics/HBaseToCloudBigtableReplicationMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,10 @@ private HBaseToCloudBigtableReplicationMetrics() {}
2424
public static final String INCOMPATIBLE_MUTATION_METRIC_KEY = "bigtableIncompatibleMutations";
2525
public static final String DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY =
2626
"bigtableDroppedIncompatibleMutations";
27+
28+
public static final String INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY =
29+
"bigtableIncompatibleDeleteMutations";
30+
public static final String INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY =
31+
"bigtableIncompatibleTimestampOverflowMutation";
2732
public static final String PUTS_IN_FUTURE_METRIC_KEY = "bigtablePutsInFutureMutations";
2833
}

hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicatorTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
package com.google.cloud.bigtable.hbase.replication;
1717

1818
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY;
19+
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY;
1920
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY;
21+
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY;
22+
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.PUTS_IN_FUTURE_METRIC_KEY;
2023
import static com.google.cloud.bigtable.hbase.replication.utils.TestUtils.CF1;
2124
import static com.google.cloud.bigtable.hbase.replication.utils.TestUtils.ROW_KEY;
2225
import static com.google.cloud.bigtable.hbase.replication.utils.TestUtils.TABLE_NAME_STRING;
@@ -107,8 +110,13 @@ public void testReplicateDryRun() {
107110
// Called during the constructor
108111
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
109112
verify(mockMetricExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
113+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0);
114+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0);
115+
verify(mockMetricExporter).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0);
116+
110117
// Incremented due to incompatible DeleteFamilyVersion mutation
111118
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
119+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 1);
112120
verify(mockMetricExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
113121

114122
Mockito.verifyNoMoreInteractions(mockMetricExporter);
@@ -151,6 +159,10 @@ public void testReplicateDoesNotSplitInBatches() throws IOException {
151159
// called during constructor
152160
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
153161
verify(mockMetricExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
162+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0);
163+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0);
164+
verify(mockMetricExporter).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0);
165+
154166
// Replicator should submit just 1 CloudBigtableReplicationTask for both WALEntries
155167
verify(mockExecutorService)
156168
.submit(
@@ -198,6 +210,9 @@ public void testReplicateSplitsBatchesOnRowBoundary() throws IOException {
198210
// called during constructor
199211
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
200212
verify(mockMetricExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
213+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0);
214+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0);
215+
verify(mockMetricExporter).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0);
201216

202217
// Replicator should split WALs into 2 CloudBigtableReplicationTask at row keys
203218
verify(mockExecutorService)
@@ -251,6 +266,9 @@ public void testReplicateSplitsBatchesOnTableBoundary() throws IOException {
251266
// called during constructor
252267
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
253268
verify(mockMetricExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
269+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0);
270+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0);
271+
verify(mockMetricExporter).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0);
254272

255273
// Replicator should split WALs into 2 CloudBigtableReplicationTask at table boundary
256274
verify(mockExecutorService)
@@ -300,6 +318,9 @@ public void testReplicateFailsOnAnyFailure() throws IOException {
300318
// called during constructor
301319
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
302320
verify(mockMetricExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
321+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0);
322+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0);
323+
verify(mockMetricExporter).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0);
303324

304325
// Replicator should split WALs into 2 CloudBigtableReplicationTask at row keys
305326
verify(mockExecutorService)
@@ -349,6 +370,9 @@ public void testReplicateFailsOnAnyFutureFailure() throws IOException {
349370
// called during constructor
350371
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
351372
verify(mockMetricExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
373+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0);
374+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0);
375+
verify(mockMetricExporter).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0);
352376

353377
// Replicator should split WALs into 2 CloudBigtableReplicationTask at row keys
354378
verify(mockExecutorService)
@@ -398,6 +422,9 @@ public void testReplicateFailsToSubmitTask() throws IOException {
398422
// called during constructor
399423
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
400424
verify(mockMetricExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
425+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0);
426+
verify(mockMetricExporter).incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0);
427+
verify(mockMetricExporter).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0);
401428
// Replicator should split WALs into 2 CloudBigtableReplicationTask at row keys
402429
verify(mockExecutorService)
403430
.submit(

hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/adapters/ApproximatingIncompatibleMutationAdapterTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DELETE_FAMILY_WRITE_THRESHOLD_KEY;
2020
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY;
21+
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY;
2122
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY;
2223
import static org.apache.hadoop.hbase.HConstants.LATEST_TIMESTAMP;
2324
import static org.mockito.Mockito.atLeast;
@@ -102,6 +103,7 @@ public void testDeletesAreAdapted() {
102103

103104
verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
104105
verify(metricsExporter, times(1)).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
106+
verify(metricsExporter, times(1)).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 1);
105107
verify(metricsExporter, times(1)).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
106108
}
107109

@@ -128,6 +130,7 @@ public void testIncompatibleDeletesAreDropped() {
128130

129131
verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
130132
verify(metricsExporter, times(3)).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
133+
verify(metricsExporter, times(3)).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 1);
131134
verify(metricsExporter, times(3)).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
132135
}
133136
}

0 commit comments

Comments
 (0)