From 99e21e594eab16195c43828ad6fb98f836e20cfc Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Sun, 8 Sep 2019 19:55:11 -0700 Subject: [PATCH 1/4] Fix to prevent the onNext event going to stale subscription when restart happens in poller --- .../polling/PrefetchRecordsPublisher.java | 43 +++++++++++-------- .../polling/PrefetchRecordsPublisherTest.java | 10 ++--- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 4c513d6a3..dea7b476b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -216,12 +216,15 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) recordsRetrieved; resetLock.writeLock().lock(); try { + // Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java + // Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber. + requestedResponses.set(0); + + // Clear the queue, so that the publisher repopulates the queue based on sequence number from subscriber. getRecordsResultQueue.clear(); // Give the drain control to publisher/demand-notifier thread. - log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, - getRecordsResultQueue.size(), requestedResponses.get()); - shouldDrainEventOnlyOnAck.set(false); + giveDrainingControlToPublisherOrDemandNotifier(); prefetchCounters.reset(); @@ -260,9 +263,7 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { pollNextResultAndUpdatePrefetchCounters(); // Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread. if (getRecordsResultQueue.isEmpty()) { - log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", - shardId, getRecordsResultQueue.size(), requestedResponses.get()); - shouldDrainEventOnlyOnAck.set(false); + giveDrainingControlToPublisherOrDemandNotifier(); } else { // Else attempt to drain the queue. drainQueueForRequests(); @@ -317,21 +318,29 @@ private synchronized void drainQueueForRequests() { // If there is an event available to drain and if there is at least one demand, // then schedule it for delivery if (requestedResponses.get() > 0 && recordsToDeliver != null) { - lastEventDeliveryTime = Instant.now(); subscriber.onNext(recordsToDeliver); - if (!shouldDrainEventOnlyOnAck.get()) { - log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, - getRecordsResultQueue.size(), requestedResponses.get()); - shouldDrainEventOnlyOnAck.set(true); - } + lastEventDeliveryTime = Instant.now(); + giveDrainingControlToEventNotifier(); } else { // Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier // thread. - if (shouldDrainEventOnlyOnAck.get()) { - log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", - shardId, getRecordsResultQueue.size(), requestedResponses.get()); - shouldDrainEventOnlyOnAck.set(false); - } + giveDrainingControlToPublisherOrDemandNotifier(); + } + } + + private void giveDrainingControlToEventNotifier() { + if (!shouldDrainEventOnlyOnAck.get()) { + log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, + getRecordsResultQueue.size(), requestedResponses.get()); + shouldDrainEventOnlyOnAck.set(true); + } + } + + private void giveDrainingControlToPublisherOrDemandNotifier() { + if (shouldDrainEventOnlyOnAck.get()) { + log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", + shardId, getRecordsResultQueue.size(), requestedResponses.get()); + shouldDrainEventOnlyOnAck.set(false); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 8c1aa7438..1b741b38b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -276,7 +276,7 @@ public void testRetryableRetrievalExceptionContinues() { assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest())); } - @Test(timeout = 20000L) + @Test(timeout = 10000L) public void testNoDeadlockOnFullQueue() { // // Fixes https://github.com/awslabs/amazon-kinesis-client/issues/448 @@ -304,7 +304,7 @@ public void testNoDeadlockOnFullQueue() { log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size()); AtomicInteger receivedItems = new AtomicInteger(0); - final int expectedItems = MAX_SIZE * 1000; + final int expectedItems = MAX_SIZE * 20; Object lock = new Object(); @@ -359,7 +359,7 @@ public void onComplete() { assertThat(receivedItems.get(), equalTo(expectedItems)); } - @Test(timeout = 20000L) + @Test(timeout = 10000L) public void testNoDeadlockOnFullQueueAndLossOfNotification() { // // Fixes https://github.com/awslabs/amazon-kinesis-client/issues/602 @@ -383,7 +383,7 @@ public void testNoDeadlockOnFullQueueAndLossOfNotification() { log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size()); AtomicInteger receivedItems = new AtomicInteger(0); - final int expectedItems = MAX_SIZE * 100; + final int expectedItems = MAX_SIZE * 50; Object lock = new Object(); @@ -521,7 +521,7 @@ public GetRecordsResponse answer(InvocationOnMock invocation) throws Throwable { private static class LossyNotificationSubscriber extends ShardConsumerNotifyingSubscriber { - private static final int LOSS_EVERY_NTH_RECORD = 100; + private static final int LOSS_EVERY_NTH_RECORD = 50; private static int recordCounter = 0; private static final ScheduledExecutorService consumerHealthChecker = Executors.newScheduledThreadPool(1); From f008f5c3e06ba8f0b77cd927f9cac0fc93a4ad94 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 12 Sep 2019 00:58:25 -0700 Subject: [PATCH 2/4] Isolating session variables into a new class. Replacing thread control shifting logic for publishing with monitor based control --- .../polling/PrefetchRecordsPublisher.java | 196 +++++++++--------- ...efetchRecordsPublisherIntegrationTest.java | 16 +- .../polling/PrefetchRecordsPublisherTest.java | 61 +++--- 3 files changed, 137 insertions(+), 136 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index dea7b476b..b8b9fee08 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -22,11 +22,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; import org.reactivestreams.Subscriber; @@ -79,8 +81,6 @@ @KinesisClientInternalApi public class PrefetchRecordsPublisher implements RecordsPublisher { private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; - @VisibleForTesting - LinkedBlockingQueue getRecordsResultQueue; private int maxPendingProcessRecordsInput; private int maxByteSize; private int maxRecordsCount; @@ -91,26 +91,71 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { private final long idleMillisBetweenCalls; private Instant lastSuccessfulCall; private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; - private PrefetchCounters prefetchCounters; private boolean started = false; private final String operation; - private final KinesisDataFetcher dataFetcher; private final String shardId; - private Subscriber subscriber; - private final AtomicLong requestedResponses = new AtomicLong(0); - - private String highestSequenceNumber; - private InitialPositionInStreamExtended initialPositionInStreamExtended; - + @VisibleForTesting @Getter + private final PublisherSession publisherSession; private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock(); private boolean wasReset = false; private Instant lastEventDeliveryTime = Instant.EPOCH; - // This flag controls who should drain the next request in the prefetch queue. - // When set to false, the publisher and demand-notifier thread would have the control. - // When set to true, the event-notifier thread would have the control. - private AtomicBoolean shouldDrainEventOnlyOnAck = new AtomicBoolean(false); + + @Data + @Accessors(fluent = true) + static final class PublisherSession { + private final AtomicLong requestedResponses = new AtomicLong(0); + @VisibleForTesting @Getter + private final LinkedBlockingQueue prefetchRecordsQueue; + private final PrefetchCounters prefetchCounters; + private final KinesisDataFetcher dataFetcher; + private InitialPositionInStreamExtended initialPositionInStreamExtended; + private String highestSequenceNumber; + + // Initialize the session on publisher start. + void init(ExtendedSequenceNumber extendedSequenceNumber, + InitialPositionInStreamExtended initialPositionInStreamExtended) { + this.initialPositionInStreamExtended = initialPositionInStreamExtended; + this.highestSequenceNumber = extendedSequenceNumber.sequenceNumber(); + this.dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended); + } + + // Reset the session when publisher restarts. + void reset(PrefetchRecordsRetrieved prefetchRecordsRetrieved) { + // Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java + // Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber. + requestedResponses.set(0); + // Clear the queue, so that the publisher repopulates the queue based on sequence number from subscriber. + prefetchRecordsQueue.clear(); + prefetchCounters.reset(); + highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber(); + dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), highestSequenceNumber, + initialPositionInStreamExtended); + } + + // Take action on successful event delivery. + RecordsRetrieved eventDeliveredAction(String shardId) { + final PrefetchRecordsRetrieved result = prefetchRecordsQueue.poll(); + if (result != null) { + updateDemandTrackersOnPublish(result); + } else { + log.info( + "{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer" + + "was reset.", shardId); + } + return result; + } + + boolean hasDemandToPublish() { + return requestedResponses.get() > 0; + } + + private void updateDemandTrackersOnPublish(PrefetchRecordsRetrieved result) { + prefetchCounters.removed(result.processRecordsInput); + requestedResponses.decrementAndGet(); + } + } /** * Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a @@ -140,15 +185,14 @@ public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final i this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; this.maxByteSize = maxByteSize; this.maxRecordsCount = maxRecordsCount; - this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput); - this.prefetchCounters = new PrefetchCounters(); + this.publisherSession = new PublisherSession(new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput), + new PrefetchCounters(), this.getRecordsRetrievalStrategy.getDataFetcher()); this.executorService = executorService; this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory); this.idleMillisBetweenCalls = idleMillisBetweenCalls; this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); Validate.notEmpty(operation, "Operation cannot be empty"); this.operation = operation; - this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher(); this.shardId = shardId; } @@ -158,9 +202,7 @@ public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPosition throw new IllegalStateException("ExecutorService has been shutdown."); } - this.initialPositionInStreamExtended = initialPositionInStreamExtended; - highestSequenceNumber = extendedSequenceNumber.sequenceNumber(); - dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended); + publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); if (!started) { log.info("{} : Starting prefetching thread.", shardId); @@ -181,23 +223,13 @@ private void throwOnIllegalState() { private RecordsRetrieved peekNextResult() { throwOnIllegalState(); - final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek(); - return result == null ? result : result.prepareForPublish(); + return publisherSession.prefetchRecordsQueue().peek(); } @VisibleForTesting - RecordsRetrieved pollNextResultAndUpdatePrefetchCounters() { + RecordsRetrieved evictPublishedEvent() { throwOnIllegalState(); - final PrefetchRecordsRetrieved result = getRecordsResultQueue.poll(); - if (result != null) { - prefetchCounters.removed(result.processRecordsInput); - requestedResponses.decrementAndGet(); - } else { - log.info( - "{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer" - + "was reset.", shardId); - } - return result; + return publisherSession.eventDeliveredAction(shardId); } @Override @@ -213,24 +245,9 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { throw new IllegalArgumentException( "Provided RecordsRetrieved was not produced by the PrefetchRecordsPublisher"); } - PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) recordsRetrieved; resetLock.writeLock().lock(); try { - // Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java - // Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber. - requestedResponses.set(0); - - // Clear the queue, so that the publisher repopulates the queue based on sequence number from subscriber. - getRecordsResultQueue.clear(); - - // Give the drain control to publisher/demand-notifier thread. - giveDrainingControlToPublisherOrDemandNotifier(); - - prefetchCounters.reset(); - - highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber(); - dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), highestSequenceNumber, - initialPositionInStreamExtended); + publisherSession.reset((PrefetchRecordsRetrieved)recordsRetrieved); wasReset = true; } finally { resetLock.writeLock().unlock(); @@ -243,13 +260,13 @@ public void subscribe(Subscriber s) { subscriber.onSubscribe(new Subscription() { @Override public void request(long n) { - requestedResponses.addAndGet(n); - drainQueueForRequestsIfAllowed(); + publisherSession.requestedResponses().addAndGet(n); + drainQueueForRequests(); } @Override public void cancel() { - requestedResponses.set(0); + publisherSession.requestedResponses().set(0); } }); } @@ -260,14 +277,9 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { // Verify if the ack matches the head of the queue and evict it. if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier() .equals(recordsDeliveryAck.batchUniqueIdentifier())) { - pollNextResultAndUpdatePrefetchCounters(); + evictPublishedEvent(); // Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread. - if (getRecordsResultQueue.isEmpty()) { - giveDrainingControlToPublisherOrDemandNotifier(); - } else { - // Else attempt to drain the queue. - drainQueueForRequests(); - } + drainQueueForRequests(); } else { // Log and ignore any other ack received. As long as an ack is received for head of the queue // we are good. Any stale or future ack received can be ignored, though the latter is not feasible @@ -284,7 +296,7 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { // Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue. private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException { wasReset = false; - while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) { + while (!publisherSession.prefetchRecordsQueue().offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) { // // Unlocking the read lock, and then reacquiring the read lock, should allow any waiters on the write lock a // chance to run. If the write lock is acquired by restartFrom than the readLock will now block until @@ -297,51 +309,25 @@ private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) t throw new PositionResetException(); } } - prefetchCounters.added(recordsRetrieved.processRecordsInput); - } - - /** - * Method that will be called by the 'publisher thread' and the 'demand notifying thread', - * to drain the events if the 'event notifying thread' do not have the control. - */ - private synchronized void drainQueueForRequestsIfAllowed() { - if (!shouldDrainEventOnlyOnAck.get()) { - drainQueueForRequests(); - } + publisherSession.prefetchCounters().added(recordsRetrieved.processRecordsInput); } /** * Method to drain the queue based on the demand and the events availability in the queue. */ private synchronized void drainQueueForRequests() { - final RecordsRetrieved recordsToDeliver = peekNextResult(); + final PrefetchRecordsRetrieved recordsToDeliver = (PrefetchRecordsRetrieved) peekNextResult(); // If there is an event available to drain and if there is at least one demand, // then schedule it for delivery - if (requestedResponses.get() > 0 && recordsToDeliver != null) { - subscriber.onNext(recordsToDeliver); + if (publisherSession.hasDemandToPublish() && canDispatchRecord(recordsToDeliver)) { + subscriber.onNext(recordsToDeliver.prepareForPublish()); + recordsToDeliver.dispatched(); lastEventDeliveryTime = Instant.now(); - giveDrainingControlToEventNotifier(); - } else { - // Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier - // thread. - giveDrainingControlToPublisherOrDemandNotifier(); - } - } - - private void giveDrainingControlToEventNotifier() { - if (!shouldDrainEventOnlyOnAck.get()) { - log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, - getRecordsResultQueue.size(), requestedResponses.get()); - shouldDrainEventOnlyOnAck.set(true); } } - private void giveDrainingControlToPublisherOrDemandNotifier() { - if (shouldDrainEventOnlyOnAck.get()) { - log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", - shardId, getRecordsResultQueue.size(), requestedResponses.get()); - shouldDrainEventOnlyOnAck.set(false); - } + private static boolean canDispatchRecord(PrefetchRecordsRetrieved recordsToDeliver) { + return recordsToDeliver != null && !recordsToDeliver.isDispatched(); } @Accessors(fluent = true) @@ -352,6 +338,7 @@ static class PrefetchRecordsRetrieved implements RecordsRetrieved { final String lastBatchSequenceNumber; final String shardIterator; final BatchUniqueIdentifier batchUniqueIdentifier; + @Accessors() @Setter(AccessLevel.NONE) boolean dispatched = false; PrefetchRecordsRetrieved prepareForPublish() { return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(), @@ -363,6 +350,9 @@ public BatchUniqueIdentifier batchUniqueIdentifier() { return batchUniqueIdentifier; } + // Indicates if this record batch was already dispatched for delivery. + void dispatched() { dispatched = true; } + /** * Generate batch unique identifier for PrefetchRecordsRetrieved, where flow will be empty. * @return BatchUniqueIdentifier @@ -371,10 +361,11 @@ public static BatchUniqueIdentifier generateBatchUniqueIdentifier() { return new BatchUniqueIdentifier(UUID.randomUUID().toString(), StringUtils.EMPTY); } + } private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) { - String result = this.highestSequenceNumber; + String result = publisherSession.highestSequenceNumber(); if (processRecordsInput.records() != null && !processRecordsInput.records().isEmpty()) { result = processRecordsInput.records().get(processRecordsInput.records().size() - 1).sequenceNumber(); } @@ -413,7 +404,7 @@ public void run() { private void makeRetrievalAttempt() { MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); - if (prefetchCounters.shouldGetNewRecords()) { + if (publisherSession.prefetchCounters().shouldGetNewRecords()) { try { sleepBeforeNextCall(); GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); @@ -428,13 +419,12 @@ private void makeRetrievalAttempt() { .isAtShardEnd(getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached()) .build(); - highestSequenceNumber = calculateHighestSequenceNumber(processRecordsInput); PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput, - highestSequenceNumber, getRecordsResult.nextShardIterator(), + calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(), PrefetchRecordsRetrieved.generateBatchUniqueIdentifier()); - highestSequenceNumber = recordsRetrieved.lastBatchSequenceNumber; + publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber); addArrivedRecordsInput(recordsRetrieved); - drainQueueForRequestsIfAllowed(); + drainQueueForRequests(); } catch (PositionResetException pse) { throw pse; } catch (RetryableRetrievalException rre) { @@ -447,7 +437,7 @@ private void makeRetrievalAttempt() { scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); - dataFetcher.restartIterator(); + publisherSession.dataFetcher().restartIterator(); } catch (SdkException e) { log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e); } catch (Throwable e) { @@ -463,7 +453,7 @@ private void makeRetrievalAttempt() { // Consumer isn't ready to receive new records will allow prefetch counters to pause // try { - prefetchCounters.waitForConsumer(); + publisherSession.prefetchCounters().waitForConsumer(); } catch (InterruptedException ie) { log.info("{} : Thread was interrupted while waiting for the consumer. " + "Shutdown has probably been started", shardId); @@ -532,7 +522,7 @@ void reset() { @Override public String toString() { - return String.format("{ Requests: %d, Records: %d, Bytes: %d }", getRecordsResultQueue.size(), size, + return String.format("{ Requests: %d, Records: %d, Bytes: %d }", publisherSession.prefetchRecordsQueue().size(), size, byteSize); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index 55e15b415..048d3ae91 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -122,14 +122,14 @@ public void testRollingCache() { getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); - ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) .processRecordsInput(); assertTrue(processRecordsInput1.records().isEmpty()); assertEquals(processRecordsInput1.millisBehindLatest(), new Long(1000)); assertNotNull(processRecordsInput1.cacheEntryTime()); - ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) .processRecordsInput(); assertNotEquals(processRecordsInput1, processRecordsInput2); @@ -140,11 +140,11 @@ public void testFullCache() { getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(MAX_SIZE * IDLE_MILLIS_BETWEEN_CALLS); - assertEquals(getRecordsCache.getRecordsResultQueue.size(), MAX_SIZE); + assertEquals(getRecordsCache.getPublisherSession().prefetchRecordsQueue().size(), MAX_SIZE); - ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) .processRecordsInput(); - ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) .processRecordsInput(); assertNotEquals(processRecordsInput1, processRecordsInput2); @@ -184,9 +184,9 @@ public void testDifferentShardCaches() { sleep(IDLE_MILLIS_BETWEEN_CALLS); - ProcessRecordsInput p1 = getRecordsCache.pollNextResultAndUpdatePrefetchCounters().processRecordsInput(); + ProcessRecordsInput p1 = getRecordsCache.evictPublishedEvent().processRecordsInput(); - ProcessRecordsInput p2 = recordsPublisher2.pollNextResultAndUpdatePrefetchCounters().processRecordsInput(); + ProcessRecordsInput p2 = recordsPublisher2.evictPublishedEvent().processRecordsInput(); assertNotEquals(p1, p2); assertTrue(p1.records().isEmpty()); @@ -212,7 +212,7 @@ public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); - ProcessRecordsInput processRecordsInput = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + ProcessRecordsInput processRecordsInput = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) .processRecordsInput(); assertNotNull(processRecordsInput); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 1b741b38b..68ee0a56f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -18,6 +18,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -53,7 +54,6 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -import io.reactivex.plugins.RxJavaPlugins; import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; @@ -131,7 +131,7 @@ public void setup() { new NullMetricsFactory(), operation, "shardId"); - spyQueue = spy(getRecordsCache.getRecordsResultQueue); + spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue()); records = spy(new ArrayList<>()); getRecordsResponse = GetRecordsResponse.builder().records(records).build(); @@ -148,7 +148,7 @@ record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build(); .map(KinesisClientRecord::fromRecord).collect(Collectors.toList()); getRecordsCache.start(sequenceNumber, initialPosition); - ProcessRecordsInput result = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000L) + ProcessRecordsInput result = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) .processRecordsInput(); assertEquals(expectedRecords, result.records()); @@ -218,7 +218,7 @@ record = Record.builder().data(createByteBufferWithSize(1024)).build(); .map(KinesisClientRecord::fromRecord).collect(Collectors.toList()); getRecordsCache.start(sequenceNumber, initialPosition); - ProcessRecordsInput processRecordsInput = getRecordsCache.pollNextResultAndUpdatePrefetchCounters().processRecordsInput(); + ProcessRecordsInput processRecordsInput = getRecordsCache.evictPublishedEvent().processRecordsInput(); verify(executorService).execute(any()); assertEquals(expectedRecords, processRecordsInput.records()); @@ -227,7 +227,7 @@ record = Record.builder().data(createByteBufferWithSize(1024)).build(); sleep(2000); - ProcessRecordsInput processRecordsInput2 = getRecordsCache.pollNextResultAndUpdatePrefetchCounters().processRecordsInput(); + ProcessRecordsInput processRecordsInput2 = getRecordsCache.evictPublishedEvent().processRecordsInput(); assertNotEquals(processRecordsInput, processRecordsInput2); assertEquals(expectedRecords, processRecordsInput2.records()); assertNotEquals(processRecordsInput2.timeSpentInCache(), Duration.ZERO); @@ -238,13 +238,13 @@ record = Record.builder().data(createByteBufferWithSize(1024)).build(); @Test(expected = IllegalStateException.class) public void testGetNextRecordsWithoutStarting() { verify(executorService, times(0)).execute(any()); - getRecordsCache.pollNextResultAndUpdatePrefetchCounters(); + getRecordsCache.evictPublishedEvent(); } @Test(expected = IllegalStateException.class) public void testCallAfterShutdown() { when(executorService.isShutdown()).thenReturn(true); - getRecordsCache.pollNextResultAndUpdatePrefetchCounters(); + getRecordsCache.evictPublishedEvent(); } @Test @@ -257,7 +257,7 @@ public void testExpiredIteratorException() { doNothing().when(dataFetcher).restartIterator(); - blockUntilRecordsAvailable(() -> getRecordsCache.pollNextResultAndUpdatePrefetchCounters(), 1000L); + blockUntilRecordsAvailable(() -> getRecordsCache.evictPublishedEvent(), 1000L); sleep(1000); @@ -272,7 +272,7 @@ public void testRetryableRetrievalExceptionContinues() { getRecordsCache.start(sequenceNumber, initialPosition); - RecordsRetrieved records = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); + RecordsRetrieved records = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest())); } @@ -285,12 +285,12 @@ public void testNoDeadlockOnFullQueue() { // If the test times out before starting the subscriber it means something went wrong while filling the queue. // After the subscriber is started one of the things that can trigger a timeout is a deadlock. // - GetRecordsResponse response = GetRecordsResponse.builder().records( - Record.builder().data(SdkBytes.fromByteArray(new byte[] { 1, 2, 3 })).sequenceNumber("123").build()) - .build(); - when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response); - RxJavaPlugins.setErrorHandler(e -> e.printStackTrace()); + final int[] sequenceNumberInResponse = { 0 }; + + when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenAnswer( i -> GetRecordsResponse.builder().records( + Record.builder().data(SdkBytes.fromByteArray(new byte[] { 1, 2, 3 })).sequenceNumber(++sequenceNumberInResponse[0] + "").build()) + .build()); getRecordsCache.start(sequenceNumber, initialPosition); @@ -298,18 +298,22 @@ public void testNoDeadlockOnFullQueue() { // Wait for the queue to fill up, and the publisher to block on adding items to the queue. // log.info("Waiting for queue to fill up"); - while (getRecordsCache.getRecordsResultQueue.size() < MAX_SIZE) { + while (getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() < MAX_SIZE) { Thread.yield(); } - log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size()); + log.info("Queue is currently at {} starting subscriber", getRecordsCache.getPublisherSession().prefetchRecordsQueue().size()); AtomicInteger receivedItems = new AtomicInteger(0); final int expectedItems = MAX_SIZE * 20; Object lock = new Object(); + final boolean[] isRecordNotInorder = { false }; + final String[] recordNotInOrderMessage = { "" }; + Subscriber delegateSubscriber = new Subscriber() { Subscription sub; + int receivedSeqNum = 0; @Override public void onSubscribe(Subscription s) { @@ -320,6 +324,13 @@ public void onSubscribe(Subscription s) { @Override public void onNext(RecordsRetrieved recordsRetrieved) { receivedItems.incrementAndGet(); + if (Integer.parseInt(((PrefetchRecordsPublisher.PrefetchRecordsRetrieved) recordsRetrieved) + .lastBatchSequenceNumber()) != ++receivedSeqNum) { + isRecordNotInorder[0] = true; + recordNotInOrderMessage[0] = "Expected : " + receivedSeqNum + " Actual : " + + ((PrefetchRecordsPublisher.PrefetchRecordsRetrieved) recordsRetrieved) + .lastBatchSequenceNumber(); + } if (receivedItems.get() >= expectedItems) { synchronized (lock) { log.info("Notifying waiters"); @@ -357,6 +368,7 @@ public void onComplete() { } verify(getRecordsRetrievalStrategy, atLeast(expectedItems)).getRecords(anyInt()); assertThat(receivedItems.get(), equalTo(expectedItems)); + assertFalse(recordNotInOrderMessage[0], isRecordNotInorder[0]); } @Test(timeout = 10000L) @@ -377,11 +389,11 @@ public void testNoDeadlockOnFullQueueAndLossOfNotification() { // Wait for the queue to fill up, and the publisher to block on adding items to the queue. // log.info("Waiting for queue to fill up"); - while (getRecordsCache.getRecordsResultQueue.size() < MAX_SIZE) { + while (getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() < MAX_SIZE) { Thread.yield(); } - log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size()); + log.info("Queue is currently at {} starting subscriber", getRecordsCache.getPublisherSession().prefetchRecordsQueue().size()); AtomicInteger receivedItems = new AtomicInteger(0); final int expectedItems = MAX_SIZE * 50; @@ -459,23 +471,23 @@ public void testResetClearsRemainingData() { getRecordsCache.start(sequenceNumber, initialPosition); - RecordsRetrieved lastProcessed = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); - RecordsRetrieved expected = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); + RecordsRetrieved lastProcessed = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); + RecordsRetrieved expected = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); // // Skip some of the records the cache // - blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); - blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); + blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); + blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); verify(getRecordsRetrievalStrategy, atLeast(2)).getRecords(anyInt()); - while(getRecordsCache.getRecordsResultQueue.remainingCapacity() > 0) { + while(getRecordsCache.getPublisherSession().prefetchRecordsQueue().remainingCapacity() > 0) { Thread.yield(); } getRecordsCache.restartFrom(lastProcessed); - RecordsRetrieved postRestart = blockUntilRecordsAvailable(getRecordsCache::pollNextResultAndUpdatePrefetchCounters, 1000); + RecordsRetrieved postRestart = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); assertThat(postRestart.processRecordsInput(), eqProcessRecordsInput(expected.processRecordsInput())); verify(dataFetcher).resetIterator(eq(responses.get(0).nextShardIterator()), @@ -531,7 +543,6 @@ public LossyNotificationSubscriber(Subscriber delegate, Record @Override public void onNext(RecordsRetrieved recordsRetrieved) { - log.info("Subscriber received onNext"); if (!(recordCounter % LOSS_EVERY_NTH_RECORD == LOSS_EVERY_NTH_RECORD - 1)) { getRecordsPublisher().notify(getRecordsDeliveryAck(recordsRetrieved)); getDelegateSubscriber().onNext(recordsRetrieved); From f03e40763a6864fc775b605ac240f484b8909a02 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 13 Sep 2019 17:54:00 -0700 Subject: [PATCH 3/4] Refactoring based on review comments --- .../polling/PrefetchRecordsPublisher.java | 68 +++++++++++-------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index b8b9fee08..47fcf3b8b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -72,10 +72,7 @@ * the record processor is blocked till records are retrieved from Kinesis. * * There are three threads namely publisher, demand-notifier and ack-notifier which will contend to drain the events - * to the Subscriber (ShardConsumer in KCL). The publisher/demand-notifier thread gains the control to drain only when - * there is no pending event in the prefetch queue waiting for the ack. Otherwise, it will be the ack-notifier thread - * which will drain an event on the receipt of an ack. - * + * to the Subscriber (ShardConsumer in KCL). */ @Slf4j @KinesisClientInternalApi @@ -134,8 +131,29 @@ void reset(PrefetchRecordsRetrieved prefetchRecordsRetrieved) { initialPositionInStreamExtended); } - // Take action on successful event delivery. - RecordsRetrieved eventDeliveredAction(String shardId) { + // Handle records delivery ack and execute nextEventDispatchAction. + // This method is not thread-safe and needs to be called after acquiring a monitor. + void handleRecordsDeliveryAck(RecordsDeliveryAck recordsDeliveryAck, String shardId, Runnable nextEventDispatchAction) { + final PrefetchRecordsRetrieved recordsToCheck = peekNextRecord(); + // Verify if the ack matches the head of the queue and evict it. + if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier().equals(recordsDeliveryAck.batchUniqueIdentifier())) { + evictPublishedRecordAndUpdateDemand(shardId); + nextEventDispatchAction.run(); + } else { + // Log and ignore any other ack received. As long as an ack is received for head of the queue + // we are good. Any stale or future ack received can be ignored, though the latter is not feasible + // to happen. + final BatchUniqueIdentifier peekedBatchUniqueIdentifier = + recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier(); + log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.", + shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now()); + } + } + + // Evict the published record from the prefetch queue. + // This method is not thread-safe and needs to be called after acquiring a monitor. + @VisibleForTesting + RecordsRetrieved evictPublishedRecordAndUpdateDemand(String shardId) { final PrefetchRecordsRetrieved result = prefetchRecordsQueue.poll(); if (result != null) { updateDemandTrackersOnPublish(result); @@ -151,10 +169,19 @@ boolean hasDemandToPublish() { return requestedResponses.get() > 0; } + PrefetchRecordsRetrieved peekNextRecord() { + return prefetchRecordsQueue.peek(); + } + + boolean offerRecords(PrefetchRecordsRetrieved recordsRetrieved, long idleMillisBetweenCalls) throws InterruptedException { + return prefetchRecordsQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS); + } + private void updateDemandTrackersOnPublish(PrefetchRecordsRetrieved result) { prefetchCounters.removed(result.processRecordsInput); requestedResponses.decrementAndGet(); } + } /** @@ -221,15 +248,15 @@ private void throwOnIllegalState() { } } - private RecordsRetrieved peekNextResult() { + private PrefetchRecordsRetrieved peekNextResult() { throwOnIllegalState(); - return publisherSession.prefetchRecordsQueue().peek(); + return publisherSession.peekNextRecord(); } @VisibleForTesting RecordsRetrieved evictPublishedEvent() { throwOnIllegalState(); - return publisherSession.eventDeliveredAction(shardId); + return publisherSession.evictPublishedRecordAndUpdateDemand(shardId); } @Override @@ -273,22 +300,7 @@ public void cancel() { @Override public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { - final RecordsRetrieved recordsToCheck = peekNextResult(); - // Verify if the ack matches the head of the queue and evict it. - if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier() - .equals(recordsDeliveryAck.batchUniqueIdentifier())) { - evictPublishedEvent(); - // Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread. - drainQueueForRequests(); - } else { - // Log and ignore any other ack received. As long as an ack is received for head of the queue - // we are good. Any stale or future ack received can be ignored, though the latter is not feasible - // to happen. - final BatchUniqueIdentifier peekedBatchUniqueIdentifier = - recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier(); - log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.", - shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now()); - } + publisherSession.handleRecordsDeliveryAck(recordsDeliveryAck, shardId, () -> drainQueueForRequests()); // Take action based on the time spent by the event in queue. takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log); } @@ -296,7 +308,7 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { // Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue. private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException { wasReset = false; - while (!publisherSession.prefetchRecordsQueue().offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) { + while (!publisherSession.offerRecords(recordsRetrieved, idleMillisBetweenCalls)) { // // Unlocking the read lock, and then reacquiring the read lock, should allow any waiters on the write lock a // chance to run. If the write lock is acquired by restartFrom than the readLock will now block until @@ -316,7 +328,7 @@ private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) t * Method to drain the queue based on the demand and the events availability in the queue. */ private synchronized void drainQueueForRequests() { - final PrefetchRecordsRetrieved recordsToDeliver = (PrefetchRecordsRetrieved) peekNextResult(); + final PrefetchRecordsRetrieved recordsToDeliver = peekNextResult(); // If there is an event available to drain and if there is at least one demand, // then schedule it for delivery if (publisherSession.hasDemandToPublish() && canDispatchRecord(recordsToDeliver)) { @@ -326,6 +338,8 @@ private synchronized void drainQueueForRequests() { } } + // This method is thread-safe and informs the caller on whether this record is eligible to be dispatched. + // If this record was already dispatched earlier, then this method would return false. private static boolean canDispatchRecord(PrefetchRecordsRetrieved recordsToDeliver) { return recordsToDeliver != null && !recordsToDeliver.isDispatched(); } From e788f10f8228e4d161826783963c521dfcf9cb5d Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 17 Sep 2019 17:50:30 -0700 Subject: [PATCH 4/4] Addressing review comments on unit test cases --- .../polling/PrefetchRecordsPublisher.java | 12 ++++---- ...efetchRecordsPublisherIntegrationTest.java | 19 +++++++----- .../polling/PrefetchRecordsPublisherTest.java | 29 +++++++++++-------- 3 files changed, 34 insertions(+), 26 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 47fcf3b8b..3f28ddf44 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -253,12 +253,6 @@ private PrefetchRecordsRetrieved peekNextResult() { return publisherSession.peekNextRecord(); } - @VisibleForTesting - RecordsRetrieved evictPublishedEvent() { - throwOnIllegalState(); - return publisherSession.evictPublishedRecordAndUpdateDemand(shardId); - } - @Override public void shutdown() { defaultGetRecordsCacheDaemon.isShutdown = true; @@ -293,6 +287,9 @@ public void request(long n) { @Override public void cancel() { + // When the subscription is cancelled, the demand is set to 0, to prevent further + // records from being dispatched to the consumer/subscriber. The publisher session state will be + // reset when restartFrom(*) is called by the consumer/subscriber. publisherSession.requestedResponses().set(0); } }); @@ -327,7 +324,8 @@ private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) t /** * Method to drain the queue based on the demand and the events availability in the queue. */ - private synchronized void drainQueueForRequests() { + @VisibleForTesting + synchronized void drainQueueForRequests() { final PrefetchRecordsRetrieved recordsToDeliver = peekNextResult(); // If there is an event available to drain and if there is at least one demand, // then schedule it for delivery diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java index 048d3ae91..f940faf29 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java @@ -62,6 +62,7 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.DataFetcherResult; import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; +import software.amazon.kinesis.retrieval.RecordsRetrieved; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -122,14 +123,14 @@ public void testRollingCache() { getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); - ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) + ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L) .processRecordsInput(); assertTrue(processRecordsInput1.records().isEmpty()); assertEquals(processRecordsInput1.millisBehindLatest(), new Long(1000)); assertNotNull(processRecordsInput1.cacheEntryTime()); - ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) + ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L) .processRecordsInput(); assertNotEquals(processRecordsInput1, processRecordsInput2); @@ -142,9 +143,9 @@ public void testFullCache() { assertEquals(getRecordsCache.getPublisherSession().prefetchRecordsQueue().size(), MAX_SIZE); - ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) + ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L) .processRecordsInput(); - ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) + ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L) .processRecordsInput(); assertNotEquals(processRecordsInput1, processRecordsInput2); @@ -184,9 +185,9 @@ public void testDifferentShardCaches() { sleep(IDLE_MILLIS_BETWEEN_CALLS); - ProcessRecordsInput p1 = getRecordsCache.evictPublishedEvent().processRecordsInput(); + ProcessRecordsInput p1 = evictPublishedEvent(getRecordsCache, shardId).processRecordsInput(); - ProcessRecordsInput p2 = recordsPublisher2.evictPublishedEvent().processRecordsInput(); + ProcessRecordsInput p2 = evictPublishedEvent(recordsPublisher2, shardId).processRecordsInput(); assertNotEquals(p1, p2); assertTrue(p1.records().isEmpty()); @@ -212,7 +213,7 @@ public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws getRecordsCache.start(extendedSequenceNumber, initialPosition); sleep(IDLE_MILLIS_BETWEEN_CALLS); - ProcessRecordsInput processRecordsInput = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) + ProcessRecordsInput processRecordsInput = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L) .processRecordsInput(); assertNotNull(processRecordsInput); @@ -220,6 +221,10 @@ public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws verify(dataFetcher).restartIterator(); } + private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) { + return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId); + } + @After public void shutdown() { getRecordsCache.shutdown(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 68ee0a56f..d9b992b7d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -54,6 +54,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; @@ -148,7 +149,7 @@ record = Record.builder().data(createByteBufferWithSize(SIZE_512_KB)).build(); .map(KinesisClientRecord::fromRecord).collect(Collectors.toList()); getRecordsCache.start(sequenceNumber, initialPosition); - ProcessRecordsInput result = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000L) + ProcessRecordsInput result = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L) .processRecordsInput(); assertEquals(expectedRecords, result.records()); @@ -218,7 +219,7 @@ record = Record.builder().data(createByteBufferWithSize(1024)).build(); .map(KinesisClientRecord::fromRecord).collect(Collectors.toList()); getRecordsCache.start(sequenceNumber, initialPosition); - ProcessRecordsInput processRecordsInput = getRecordsCache.evictPublishedEvent().processRecordsInput(); + ProcessRecordsInput processRecordsInput = evictPublishedEvent(getRecordsCache, "shardId").processRecordsInput(); verify(executorService).execute(any()); assertEquals(expectedRecords, processRecordsInput.records()); @@ -227,7 +228,7 @@ record = Record.builder().data(createByteBufferWithSize(1024)).build(); sleep(2000); - ProcessRecordsInput processRecordsInput2 = getRecordsCache.evictPublishedEvent().processRecordsInput(); + ProcessRecordsInput processRecordsInput2 = evictPublishedEvent(getRecordsCache, "shardId").processRecordsInput(); assertNotEquals(processRecordsInput, processRecordsInput2); assertEquals(expectedRecords, processRecordsInput2.records()); assertNotEquals(processRecordsInput2.timeSpentInCache(), Duration.ZERO); @@ -238,13 +239,13 @@ record = Record.builder().data(createByteBufferWithSize(1024)).build(); @Test(expected = IllegalStateException.class) public void testGetNextRecordsWithoutStarting() { verify(executorService, times(0)).execute(any()); - getRecordsCache.evictPublishedEvent(); + getRecordsCache.drainQueueForRequests(); } @Test(expected = IllegalStateException.class) public void testCallAfterShutdown() { when(executorService.isShutdown()).thenReturn(true); - getRecordsCache.evictPublishedEvent(); + getRecordsCache.drainQueueForRequests(); } @Test @@ -257,7 +258,7 @@ public void testExpiredIteratorException() { doNothing().when(dataFetcher).restartIterator(); - blockUntilRecordsAvailable(() -> getRecordsCache.evictPublishedEvent(), 1000L); + blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000L); sleep(1000); @@ -272,7 +273,7 @@ public void testRetryableRetrievalExceptionContinues() { getRecordsCache.start(sequenceNumber, initialPosition); - RecordsRetrieved records = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); + RecordsRetrieved records = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000); assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest())); } @@ -471,14 +472,14 @@ public void testResetClearsRemainingData() { getRecordsCache.start(sequenceNumber, initialPosition); - RecordsRetrieved lastProcessed = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); - RecordsRetrieved expected = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); + RecordsRetrieved lastProcessed = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000); + RecordsRetrieved expected = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000); // // Skip some of the records the cache // - blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); - blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); + blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000); + blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000); verify(getRecordsRetrievalStrategy, atLeast(2)).getRecords(anyInt()); @@ -487,7 +488,7 @@ public void testResetClearsRemainingData() { } getRecordsCache.restartFrom(lastProcessed); - RecordsRetrieved postRestart = blockUntilRecordsAvailable(getRecordsCache::evictPublishedEvent, 1000); + RecordsRetrieved postRestart = blockUntilRecordsAvailable(() -> evictPublishedEvent(getRecordsCache, "shardId"), 1000); assertThat(postRestart.processRecordsInput(), eqProcessRecordsInput(expected.processRecordsInput())); verify(dataFetcher).resetIterator(eq(responses.get(0).nextShardIterator()), @@ -495,6 +496,10 @@ public void testResetClearsRemainingData() { } + private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) { + return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId); + } + private static class RetrieverAnswer implements Answer { private final List responses;