Skip to content

Commit 8c1854d

Browse files
TracyCuiCanTracy Cuigcf-owl-bot[bot]
authored
feat: add enableSnappy flag to import snapshot pipeline and select th… (#3586)
* feat: add enableSnappy flag to import snapshot pipeline and select the right container image based on beam version if flag is set. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Use DataflowServiceOptions for runner_v2 and update snapshot name in test. * Resolve merge conflict and update E2E test. * address comments * fix zip data and generate script. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * upload correct zip file. * Fix clirr and address comment. * Filter by job name and add back pagination in snappy snapshot test. Co-authored-by: Tracy Cui <tracycui@google.com> Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 413a2fd commit 8c1854d

File tree

5 files changed

+326
-116
lines changed

5 files changed

+326
-116
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
3+
<differences>
4+
<difference>
5+
<differenceType>7012</differenceType>
6+
<className>com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot$ImportOptions</className>
7+
<method>*</method>
8+
</difference>
9+
</differences>

bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,21 @@
2222
import com.google.cloud.bigtable.beam.sequencefiles.ImportJob;
2323
import com.google.cloud.bigtable.beam.sequencefiles.Utils;
2424
import com.google.common.annotations.VisibleForTesting;
25+
import com.google.common.base.MoreObjects;
26+
import java.util.ArrayList;
2527
import java.util.Arrays;
2628
import java.util.List;
29+
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
2730
import org.apache.beam.sdk.Pipeline;
2831
import org.apache.beam.sdk.PipelineResult;
2932
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
33+
import org.apache.beam.sdk.options.Default;
3034
import org.apache.beam.sdk.options.Description;
3135
import org.apache.beam.sdk.options.PipelineOptionsFactory;
3236
import org.apache.beam.sdk.transforms.Create;
3337
import org.apache.beam.sdk.transforms.ParDo;
3438
import org.apache.beam.sdk.transforms.Wait;
39+
import org.apache.beam.sdk.util.ReleaseInfo;
3540
import org.apache.beam.sdk.values.KV;
3641
import org.apache.beam.sdk.values.PCollection;
3742
import org.apache.commons.logging.Log;
@@ -66,6 +71,8 @@
6671
@InternalExtensionOnly
6772
public class ImportJobFromHbaseSnapshot {
6873
private static final Log LOG = LogFactory.getLog(ImportJobFromHbaseSnapshot.class);
74+
private static final String CONTAINER_IMAGE_PATH_PREFIX =
75+
"gcr.io/cloud-bigtable-ecosystem/unified-harness:";
6976

7077
public interface ImportOptions extends ImportJob.ImportOptions {
7178
@Description("The HBase root dir where HBase snapshot files resides.")
@@ -79,6 +86,13 @@ public interface ImportOptions extends ImportJob.ImportOptions {
7986

8087
@SuppressWarnings("unused")
8188
void setSnapshotName(String snapshotName);
89+
90+
@Description("Is importing Snappy compressed snapshot.")
91+
@Default.Boolean(false)
92+
Boolean getEnableSnappy();
93+
94+
@SuppressWarnings("unused")
95+
void setEnableSnappy(Boolean enableSnappy);
8296
}
8397

8498
public static void main(String[] args) throws Exception {
@@ -100,6 +114,20 @@ public static void main(String[] args) throws Exception {
100114
@VisibleForTesting
101115
static Pipeline buildPipeline(ImportOptions opts) throws Exception {
102116

117+
if (opts.getEnableSnappy()) {
118+
DataflowPipelineOptions dataFlowOpts = opts.as(DataflowPipelineOptions.class);
119+
dataFlowOpts.setSdkContainerImage(
120+
CONTAINER_IMAGE_PATH_PREFIX + ReleaseInfo.getReleaseInfo().getVersion());
121+
122+
List<String> expOpts =
123+
MoreObjects.firstNonNull(dataFlowOpts.getExperiments(), new ArrayList());
124+
if (!expOpts.contains("use_runner_v2")) {
125+
expOpts = new ArrayList<>(expOpts);
126+
expOpts.add("use_runner_v2");
127+
}
128+
dataFlowOpts.setExperiments(expOpts);
129+
}
130+
103131
Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
104132
HBaseSnapshotInputConfigBuilder configurationBuilder =
105133
new HBaseSnapshotInputConfigBuilder()

bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/EndToEndIT.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public class EndToEndIT {
8484
private static final Logger LOG = LoggerFactory.getLogger(HBaseResultToMutationFn.class);
8585
private static final String SNAPSHOT_FIXTURE_NAME = "EndToEndIT-snapshot.zip";
8686
private static final String TEST_SNAPSHOT_NAME = "test-snapshot";
87+
private static final String TEST_SNAPPY_SNAPSHOT_NAME = "test-snappy-snapshot";
8788
private static final String CF = "cf";
8889

8990
private TestProperties properties;
@@ -340,7 +341,7 @@ public void testHBaseSnapshotImport() throws Exception {
340341

341342
// Validate the counters.
342343
Map<String, Long> counters = getCountMap(result);
343-
Assert.assertEquals(counters.get("ranges_matched"), (Long) 101L);
344+
Assert.assertEquals(counters.get("ranges_matched"), (Long) 100L);
344345
Assert.assertEquals(counters.get("ranges_not_matched"), (Long) 0L);
345346
}
346347

@@ -403,7 +404,58 @@ public void testHBaseSnapshotImportWithCorruptions() throws Exception {
403404

404405
// Assert that the output collection is the right one.
405406
Map<String, Long> counters = getCountMap(result);
406-
Assert.assertEquals(counters.get("ranges_matched"), (Long) 97L);
407+
Assert.assertEquals(counters.get("ranges_matched"), (Long) 96L);
407408
Assert.assertEquals(counters.get("ranges_not_matched"), (Long) 4L);
408409
}
410+
411+
@Test
412+
public void testSnappyCompressedHBaseSnapshotImport() throws Exception {
413+
// Start import
414+
ImportOptions importOpts = createImportOptions();
415+
importOpts.setEnableSnappy(true);
416+
importOpts.setSnapshotName(TEST_SNAPPY_SNAPSHOT_NAME);
417+
418+
// run pipeline
419+
State state = ImportJobFromHbaseSnapshot.buildPipeline(importOpts).run().waitUntilFinish();
420+
Assert.assertEquals(State.DONE, state);
421+
422+
// check that the .restore dir used for temp files has been removed
423+
// The restore directory is stored relative to the snapshot directory and contains the job name
424+
String bucket = GcsPath.fromUri(hbaseSnapshotDir).getBucket();
425+
String restorePathPrefix =
426+
CleanupHBaseSnapshotRestoreFilesFn.getListPrefix(
427+
HBaseSnapshotInputConfigBuilder.RESTORE_DIR);
428+
429+
List<StorageObject> allObjects = new ArrayList<>();
430+
String nextToken;
431+
do {
432+
Objects objects = gcsUtil.listObjects(bucket, restorePathPrefix, null);
433+
List<StorageObject> items = objects.getItems();
434+
if (items != null) {
435+
allObjects.addAll(items);
436+
}
437+
nextToken = objects.getNextPageToken();
438+
} while (nextToken != null);
439+
440+
List<StorageObject> myObjects =
441+
allObjects.stream()
442+
.filter(o -> o.getName().contains(importOpts.getJobName()))
443+
.collect(Collectors.toList());
444+
Assert.assertTrue("Restore directory wasn't deleted", myObjects.isEmpty());
445+
446+
// Verify the import using the sync job
447+
SyncTableOptions syncOpts = createSyncTableOptions();
448+
449+
PipelineResult result = SyncTableJob.buildPipeline(syncOpts).run();
450+
state = result.waitUntilFinish();
451+
Assert.assertEquals(State.DONE, state);
452+
453+
// Read the output files and validate that there are no mismatches.
454+
Assert.assertEquals(0, readMismatchesFromOutputFiles().size());
455+
456+
// Validate the counters.
457+
Map<String, Long> counters = getCountMap(result);
458+
Assert.assertEquals(counters.get("ranges_matched"), (Long) 100L);
459+
Assert.assertEquals(counters.get("ranges_not_matched"), (Long) 0L);
460+
}
409461
}

0 commit comments

Comments
 (0)