Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.api.services.bigquery.model.TableSchema;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
Expand Down Expand Up @@ -88,13 +87,16 @@ private void parseJson(String jsonSchema) throws UnsupportedOperationException {
jsonBeamSchema = BigQueryHelpers.toJsonString(schema.getFields());
}

@SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION")
private void validateSchemaTypes(TableSchema bigQuerySchema) {
if (bigQuerySchema == null) {
LOG.error("Provided BigQuery schema is null. Please check your input.");
return;
}
try {
beamSchema = fromTableSchema(bigQuerySchema);
} catch (UnsupportedOperationException exception) {
LOG.error("Check json schema, {}", exception.getMessage());
} catch (NullPointerException npe) {
} catch (Exception e) {
LOG.error("Missing schema keywords, please check what all required fields presented");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public SubProcessConfiguration create(PipelineOptions options) {
configuration.setWorkerPath(subProcessPipelineOptions.getWorkerPath());
configuration.setWaitTime(subProcessPipelineOptions.getWaitTime());
configuration.setOnlyUpLoadLogsOnError(subProcessPipelineOptions.getOnlyUpLoadLogsOnError());
configuration.concurrency = subProcessPipelineOptions.getConcurrency();
configuration.setConcurrency(subProcessPipelineOptions.getConcurrency());

return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,29 @@
*/
package org.apache.beam.examples.subprocess.configuration;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;

/**
* Configuration file used to setup the Process kernel for execution of the external library Values
* are copied from the Options to all them to be Serializable.
*/
@SuppressWarnings({"serial", "nullness"}) // TODO(https://github.com/apache/beam/issues/20497)
@SuppressFBWarnings("PA_PUBLIC_PRIMITIVE_ATTRIBUTE") // TODO(#35312)
public class SubProcessConfiguration implements Serializable {

// Source GCS directory where the C++ library is located gs://bucket/tests
public String sourcePath;
private String sourcePath;

// Working directory for the process I/O
public String workerPath;
private String workerPath;

// The maximum time to wait for the sub-process to complete
public Integer waitTime;
private Integer waitTime;

// "As sub-processes can be heavy weight match the concurrency level to num cores on the machines"
public Integer concurrency;
private Integer concurrency;

// Should log files only be uploaded if error
public Boolean onlyUpLoadLogsOnError;
private Boolean onlyUpLoadLogsOnError;

public Boolean getOnlyUpLoadLogsOnError() {
return onlyUpLoadLogsOnError;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,8 @@ object TriggerExample {

@ProcessElement
@Throws(Exception::class)
@SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE")
fun processElement(c: ProcessContext) {
var timestamp = Instant.now()
val random = Random()
if (random.nextDouble() < THRESHOLD) {
val range = MAX_DELAY - MIN_DELAY
val delayInMinutes = random.nextInt(range) + MIN_DELAY
Expand All @@ -504,6 +502,7 @@ object TriggerExample {
// MIN_DELAY and MAX_DELAY in minutes.
private const val MIN_DELAY = 1
private const val MAX_DELAY = 100
private val random = Random()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -238,6 +237,7 @@ public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState, Proce
private static class ExactlyOnceWriter<K, V>
extends DoFn<KV<Integer, Iterable<KV<Long, TimestampedValue<ProducerRecord<K, V>>>>>, Void> {

private static final Random RANDOM = new Random();
private static final String NEXT_ID = "nextId";
private static final String MIN_BUFFERED_ID = "minBufferedId";
private static final String OUT_OF_ORDER_BUFFER = "outOfOrderBuffer";
Expand Down Expand Up @@ -551,7 +551,6 @@ void commitTxn(long lastRecordId, Counter numTransactions) throws IOException {
}
}

@SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // TODO(#35312)
private ShardWriter<K, V> initShardWriter(
int shard, ValueState<String> writerIdState, long nextId) throws IOException {

Expand Down Expand Up @@ -586,7 +585,7 @@ private ShardWriter<K, V> initShardWriter(
writerId =
String.format(
"%X - %s",
new Random().nextInt(Integer.MAX_VALUE),
RANDOM.nextInt(Integer.MAX_VALUE),
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
.withZone(DateTimeZone.UTC)
.print(DateTimeUtils.currentTimeMillis()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -40,6 +39,9 @@
* KafkaIO.ReadSourceDescriptors}.
*/
public final class KafkaIOUtils {

private static final Random RANDOM = new Random();

// A set of config defaults.
static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
ImmutableMap.of(
Expand Down Expand Up @@ -99,7 +101,6 @@ static Map<String, Object> updateKafkaProperties(
return config;
}

@SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // TODO(#35312)
static Map<String, Object> getOffsetConsumerConfig(
String name, @Nullable Map<String, Object> offsetConfig, Map<String, Object> consumerConfig) {
Map<String, Object> offsetConsumerConfig = new HashMap<>(consumerConfig);
Expand All @@ -110,7 +111,7 @@ static Map<String, Object> getOffsetConsumerConfig(
String offsetGroupId =
String.format(
"%s_offset_consumer_%d_%s",
name, new Random().nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
name, RANDOM.nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);

if (offsetConfig != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.fasterxml.jackson.annotation.JsonProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.synthetic.delay.SyntheticDelay;
Expand Down Expand Up @@ -53,6 +52,7 @@
*/
public class SyntheticStep extends DoFn<KV<byte[], byte[]>, KV<byte[], byte[]>> {

private static final Random RANDOM = new Random();
private final Options options;

// used when maxWorkerThroughput is set
Expand All @@ -75,13 +75,11 @@ public RateLimiter load(KV<Long, Long> pair) {
}
});

@SuppressFBWarnings("DMI_RANDOM_USED_ONLY_ONCE") // TODO(#35312)
public SyntheticStep(Options options) {
options.validate();
this.options = options;
Random rand = new Random();
// use a random id so that a pipeline could have multiple SyntheticSteps
this.idAndThroughput = KV.of(rand.nextLong(), options.maxWorkerThroughput);
this.idAndThroughput = KV.of(RANDOM.nextLong(), options.maxWorkerThroughput);
}

@ProcessElement
Expand Down
Loading