diff --git a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java index 39a8f2ab..27e4ae6b 100644 --- a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java +++ b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java @@ -42,6 +42,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -365,4 +366,9 @@ public void commitRecord(SourceRecord record) { MoreExecutors.directExecutor()); log.trace("Committed {}", ackId); } + + @Override + public void commitRecord(SourceRecord record, RecordMetadata metadata) { + this.commitRecord(record); + } }