1616
1717package com .google .cloud .bigtable .hbase1_x .replication ;
1818
19+ import static com .google .cloud .bigtable .hbase .replication .utils .TestUtils .CF1 ;
20+ import static com .google .cloud .bigtable .hbase .replication .utils .TestUtils .FILTERED_ROW_KEY ;
21+ import static com .google .cloud .bigtable .hbase .replication .utils .TestUtils .ROW_KEY ;
22+ import static com .google .cloud .bigtable .hbase .replication .utils .TestUtils .assertEquals ;
23+
1924import com .google .cloud .bigtable .emulator .v2 .BigtableEmulatorRule ;
2025import com .google .cloud .bigtable .hbase .BigtableConfiguration ;
2126import com .google .cloud .bigtable .hbase .replication .utils .TestUtils ;
2227import java .io .IOException ;
28+ import java .util .ArrayList ;
2329import java .util .List ;
2430import java .util .UUID ;
2531import java .util .concurrent .atomic .AtomicInteger ;
3743import org .apache .hadoop .hbase .client .Get ;
3844import org .apache .hadoop .hbase .client .Increment ;
3945import org .apache .hadoop .hbase .client .Put ;
46+ import org .apache .hadoop .hbase .client .Result ;
4047import org .apache .hadoop .hbase .client .Table ;
4148import org .apache .hadoop .hbase .client .replication .ReplicationAdmin ;
42- import org .apache .hadoop .hbase .replication .BaseReplicationEndpoint ;
49+ import org .apache .hadoop .hbase .replication .ChainWALEntryFilter ;
4350import org .apache .hadoop .hbase .replication .ReplicationException ;
4451import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
52+ import org .apache .hadoop .hbase .replication .WALEntryFilter ;
4553import org .apache .hadoop .hbase .util .Bytes ;
54+ import org .apache .hadoop .hbase .wal .WAL .Entry ;
4655import org .junit .AfterClass ;
4756import org .junit .Assert ;
4857import org .junit .Before ;
5766@ RunWith (JUnit4 .class )
5867public class HbaseToCloudBigtableReplicationEndpointTest {
5968
60- public static class TestReplicationEndpoint extends BaseReplicationEndpoint {
69+ public static class TestReplicationEndpoint extends HbaseToCloudBigtableReplicationEndpoint {
6170
6271 static AtomicInteger replicatedEntries = new AtomicInteger ();
63- static HbaseToCloudBigtableReplicationEndpoint delegate ;
64-
65- public TestReplicationEndpoint () {
66- delegate = new HbaseToCloudBigtableReplicationEndpoint ();
67- }
68-
69- @ Override
70- protected void doStart () {
71- try {
72- delegate .start ().get ();
73- } catch (Exception e ) {
74- new RuntimeException ("Failed to start Replication Endpoint." , e );
75- }
76- notifyStarted ();
77- }
78-
79- @ Override
80- protected void doStop () {
81- try {
82- delegate .stop ().get ();
83- } catch (Exception e ) {
84- new RuntimeException ("Failed to stop Replication Endpoint." , e );
85- }
86- notifyStopped ();
87- }
88-
89- @ Override
90- public UUID getPeerUUID () {
91- return delegate .getPeerUUID ();
92- }
93-
94- @ Override
95- public void init (Context ctx ) throws IOException {
96- super .init (ctx );
97- delegate .init (ctx );
98- }
9972
10073 @ Override
10174 public boolean replicate (ReplicateContext replicateContext ) {
102- boolean result = delegate .replicate (replicateContext );
75+ boolean result = super .replicate (replicateContext );
10376 replicatedEntries .getAndAdd (replicateContext .getEntries ().size ());
10477 return result ;
10578 }
79+
80+ // return a WALEntry filter which accepts all rows except FILTERED_ROW_ENTRY
81+ @ Override
82+ public WALEntryFilter getWALEntryfilter () {
83+ return new ChainWALEntryFilter (
84+ super .getWALEntryfilter (),
85+ new WALEntryFilter () {
86+ @ Override
87+ public Entry filter (Entry entry ) {
88+ ArrayList <Cell > cells = entry .getEdit ().getCells ();
89+ int size = cells .size ();
90+ for (int i = size - 1 ; i >= 0 ; i --) {
91+ Cell cell = cells .get (i );
92+ if (Bytes .equals (
93+ cell .getRowArray (),
94+ cell .getRowOffset (),
95+ cell .getRowLength (),
96+ FILTERED_ROW_KEY ,
97+ 0 ,
98+ FILTERED_ROW_KEY .length )) {
99+ cells .remove (i );
100+ }
101+ }
102+ return entry ;
103+ }
104+ });
105+ }
106106 }
107107
108108 private static final Logger LOG =
@@ -155,8 +155,8 @@ public void setupTestCase() throws IOException {
155155 // Create and set the empty tables
156156 TableName table1 = TableName .valueOf (UUID .randomUUID ().toString ());
157157 TableName table2 = TableName .valueOf (UUID .randomUUID ().toString ());
158- createTables (table1 );
159- createTables (table2 );
158+ createTables (table1 , HConstants . REPLICATION_SCOPE_GLOBAL , HConstants . REPLICATION_SCOPE_GLOBAL );
159+ createTables (table2 , HConstants . REPLICATION_SCOPE_GLOBAL , HConstants . REPLICATION_SCOPE_GLOBAL );
160160
161161 cbtTable = cbtConnection .getTable (table1 );
162162 cbtTable2 = cbtConnection .getTable (table2 );
@@ -167,7 +167,7 @@ public void setupTestCase() throws IOException {
167167 TestReplicationEndpoint .replicatedEntries .set (0 );
168168 }
169169
170- private void createTables (TableName tableName ) throws IOException {
170+ private void createTables (TableName tableName , int cf1Scope , int cf2Scope ) throws IOException {
171171 // Create table in HBase
172172 HTableDescriptor htd = hbaseTestingUtil .createTableDescriptor (tableName .getNameAsString ());
173173 HColumnDescriptor cf1 = new HColumnDescriptor (TestUtils .CF1 );
@@ -178,8 +178,8 @@ private void createTables(TableName tableName) throws IOException {
178178 htd .addFamily (cf2 );
179179
180180 // Enables replication to all peers, including CBT
181- cf1 .setScope (HConstants . REPLICATION_SCOPE_GLOBAL );
182- cf2 .setScope (HConstants . REPLICATION_SCOPE_GLOBAL );
181+ cf1 .setScope (cf1Scope );
182+ cf2 .setScope (cf2Scope );
183183 hbaseTestingUtil .getHBaseAdmin ().createTable (htd );
184184
185185 cbtConnection .getAdmin ().createTable (htd );
@@ -407,6 +407,75 @@ public void testWriteFailureToBigtableDoesNotStallReplication()
407407 Assert .assertEquals (0 , actualCells .get (1 ).getTimestamp ());
408408 }
409409
410+ @ Test
411+ public void testMutationReplicationWithWALEntryFilter () throws IOException , InterruptedException {
412+ Put put1 = new Put (FILTERED_ROW_KEY );
413+ put1 .addColumn (TestUtils .CF1 , TestUtils .COL_QUALIFIER , 0 , FILTERED_ROW_KEY );
414+ put1 .addColumn (TestUtils .CF2 , TestUtils .COL_QUALIFIER , 0 , FILTERED_ROW_KEY );
415+ hbaseTable .put (put1 );
416+
417+ Put put2 = new Put (ROW_KEY );
418+ put2 .addColumn (TestUtils .CF1 , TestUtils .COL_QUALIFIER , 0 , ROW_KEY );
419+ put2 .addColumn (TestUtils .CF2 , TestUtils .COL_QUALIFIER , 0 , ROW_KEY );
420+ hbaseTable .put (put2 );
421+
422+ TestUtils .waitForReplication (
423+ () -> {
424+ // replicate Entries will be one as FILTERED KEY will not be passed.
425+ return TestReplicationEndpoint .replicatedEntries .get () >= 1 ;
426+ });
427+
428+ Result cbtResult = cbtTable .get (new Get (FILTERED_ROW_KEY ).setMaxVersions ());
429+ Result hbaseResult = hbaseTable .get (new Get (FILTERED_ROW_KEY ).setMaxVersions ());
430+ Assert .assertTrue (cbtResult .isEmpty ());
431+ Assert .assertFalse (hbaseResult .isEmpty ());
432+ Assert .assertEquals (
433+ "Number of cells , actual cells: " + hbaseResult .listCells (),
434+ 2 ,
435+ 2 ,
436+ hbaseResult .listCells ().size ());
437+
438+ Result cbtResult2 = cbtTable .get (new Get (ROW_KEY ).setMaxVersions ());
439+ Result hbaseResult2 = hbaseTable .get (new Get (ROW_KEY ).setMaxVersions ());
440+ TestUtils .assertEquals (cbtResult2 , hbaseResult2 );
441+ }
442+
443+ @ Test
444+ public void testReplicationWithScope () throws IOException , InterruptedException {
445+ TableName tableName = TableName .valueOf (UUID .randomUUID ().toString ());
446+ createTables (
447+ tableName , HConstants .REPLICATION_SCOPE_GLOBAL , HConstants .REPLICATION_SCOPE_LOCAL );
448+ hbaseTable = hbaseConnection .getTable (tableName );
449+ cbtTable = cbtConnection .getTable (tableName );
450+
451+ Put put1 = new Put (ROW_KEY );
452+ put1 .addColumn (TestUtils .CF1 , TestUtils .COL_QUALIFIER , 0 , ROW_KEY );
453+ put1 .addColumn (TestUtils .CF2 , TestUtils .COL_QUALIFIER_2 , 0 , ROW_KEY );
454+ hbaseTable .put (put1 );
455+
456+ TestUtils .waitForReplication (
457+ () -> {
458+ return TestReplicationEndpoint .replicatedEntries .get () >= 1 ;
459+ });
460+
461+ Result cbtResult = cbtTable .get (new Get (ROW_KEY ).setMaxVersions ());
462+ Result hbaseResult = hbaseTable .get (new Get (ROW_KEY ).setMaxVersions ());
463+
464+ // make sure we have replicated cells
465+ Assert .assertFalse (cbtResult .isEmpty ());
466+ List <Cell > hbaseCells = hbaseResult .listCells ();
467+ List <Cell > cbtCells = cbtResult .listCells ();
468+
469+ Assert .assertEquals ("bigtable cells" , 1 , cbtCells .size ());
470+ // make sure that only CF1 is replicated
471+ for (int i = 0 ; i < hbaseCells .size (); i ++) {
472+ // make sure CF1 is only replicated
473+ if (CellUtil .cloneFamily (hbaseCells .get (i )) == CF1 ) {
474+ assertEquals (hbaseCells .get (i ), cbtCells .get (0 ));
475+ }
476+ }
477+ }
478+
410479 @ Test
411480 public void testHBaseCBTTimestampTruncation () throws IOException , InterruptedException {
412481 Put put = new Put (TestUtils .ROW_KEY );
@@ -424,7 +493,9 @@ public void testHBaseCBTTimestampTruncation() throws IOException, InterruptedExc
424493 List <Cell > hbaseCells = hbaseTable .get (new Get (TestUtils .ROW_KEY ).setMaxVersions ()).listCells ();
425494 List <Cell > bigtableCells =
426495 cbtTable .get (new Get (TestUtils .ROW_KEY ).setMaxVersions ()).listCells ();
427- Assert .assertEquals ("bigtable cells" , 1 , bigtableCells .size ());
496+ // timestamp will get truncated and value will be overwritten at BIGTABLE_MAX_TIMESTAMP.
497+ Assert .assertEquals (
498+ "bigtable cells truncated at BIGTABLE_MAX_TIMESTAMP." , 1 , bigtableCells .size ());
428499 Assert .assertNotEquals (
429500 "Timestamp match for row " + TestUtils .ROW_KEY ,
430501 hbaseCells .get (0 ).getTimestamp (),
0 commit comments