Skip to content

Commit 7dd7653

Browse files
authored
feat: flag a put in future timestamp and record a metrics (#3534)
* add future delete mutation metrics * feat: add test * flag puts at future timestamp * fix formatting * add thorough test * address comments:
1 parent 2d57f14 commit 7dd7653

File tree

3 files changed

+33
-0
lines changed

3 files changed

+33
-0
lines changed

hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY;
2020
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY;
21+
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.PUTS_IN_FUTURE_METRIC_KEY;
2122

2223
import com.google.cloud.bigtable.hbase.adapters.DeleteAdapter;
2324
import com.google.cloud.bigtable.hbase.replication.metrics.MetricsExporter;
@@ -56,6 +57,10 @@ private void incrementIncompatibleMutations() {
5657
metricsExporter.incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
5758
}
5859

60+
private void incrementPutsInFutureMutations() {
61+
metricsExporter.incCounters(PUTS_IN_FUTURE_METRIC_KEY, 1);
62+
}
63+
5964
/**
6065
* Creates an IncompatibleMutationAdapter with HBase configuration, MetricSource, and CBT
6166
* connection.
@@ -101,6 +106,11 @@ public final List<Cell> adaptIncompatibleMutations(BigtableWALEntry walEntry) {
101106
Cell cell = cellsToAdapt.get(index);
102107
// All puts are valid.
103108
if (cell.getTypeByte() == KeyValue.Type.Put.getCode()) {
109+
// flag if put is issued for future timestamp
110+
// do not log as we might fill up disk space due condition being true from clock skew
111+
if (cell.getTimestamp() > walEntry.getWalWriteTime()) {
112+
incrementPutsInFutureMutations();
113+
}
104114
returnedCells.add(cell);
105115
continue;
106116
}

hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/metrics/HBaseToCloudBigtableReplicationMetrics.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ private HBaseToCloudBigtableReplicationMetrics() {}
2424
public static final String INCOMPATIBLE_MUTATION_METRIC_KEY = "bigtableIncompatibleMutations";
2525
public static final String DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY =
2626
"bigtableDroppedIncompatibleMutations";
27+
public static final String PUTS_IN_FUTURE_METRIC_KEY = "bigtablePutsInFutureMutations";
2728
}

hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapterTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY;
2020
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY;
21+
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.PUTS_IN_FUTURE_METRIC_KEY;
2122
import static org.mockito.Mockito.reset;
2223
import static org.mockito.Mockito.times;
2324
import static org.mockito.Mockito.verify;
2425
import static org.mockito.Mockito.verifyNoInteractions;
26+
import static org.mockito.Mockito.verifyNoMoreInteractions;
2527

2628
import com.google.cloud.bigtable.hbase.replication.metrics.MetricsExporter;
2729
import java.nio.charset.StandardCharsets;
@@ -35,6 +37,7 @@
3537
import org.apache.hadoop.conf.Configuration;
3638
import org.apache.hadoop.hbase.Cell;
3739
import org.apache.hadoop.hbase.KeyValue;
40+
import org.apache.hadoop.hbase.KeyValue.Type;
3841
import org.apache.hadoop.hbase.client.Connection;
3942
import org.junit.After;
4043
import org.junit.Assert;
@@ -226,4 +229,23 @@ public void testIncompatibleDeletesAreDropped() {
226229
verify(metricsExporter, times(1)).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
227230
verify(metricsExporter, times(1)).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
228231
}
232+
233+
@Test
234+
public void testFuturePutAreFlagged() {
235+
ArrayList<Cell> walEntryCells = new ArrayList<>();
236+
Cell put1 = new KeyValue(rowKey, cf, qual, 900, Type.Put);
237+
Cell put2 = new KeyValue(rowKey, cf, qual, 1005L, Type.Put);
238+
walEntryCells.add(put1);
239+
walEntryCells.add(put2);
240+
BigtableWALEntry walEntry = new BigtableWALEntry(1000L, walEntryCells, tableName);
241+
242+
Assert.assertEquals(
243+
Arrays.asList(put1, put2),
244+
incompatibleMutationAdapter.adaptIncompatibleMutations(walEntry));
245+
246+
verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
247+
verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0);
248+
verify(metricsExporter, times(1)).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 1);
249+
verifyNoMoreInteractions(metricsExporter);
250+
}
229251
}

0 commit comments

Comments
 (0)