Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.StreamIngestResource;
import org.apache.druid.testing.embedded.indexing.StreamIndexTestBase;
import org.apache.druid.testing.embedded.utils.ITRetryUtil;
import org.hamcrest.Matchers;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
Expand All @@ -46,7 +47,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.OPTIMAL_TASK_COUNT_METRIC;

Expand All @@ -59,6 +65,8 @@
public class CostBasedAutoScalerIntegrationTest extends StreamIndexTestBase
{
private static final int PARTITION_COUNT = 50;
private static final int MAX_SCALE_UP_RECORD_BATCHES = 30;
private static final long SCALE_UP_PUBLISH_INTERVAL_MILLIS = 100;

private String topic;
private final KafkaResource kafkaServer = new KafkaResource();
Expand Down Expand Up @@ -182,6 +190,7 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()

@Test
public void test_autoScaler_scalesUpAndDown_withSlowPublish()
throws Exception
{
final String topic = EmbeddedClusterApis.createTestDatasourceName();
kafkaServer.createTopicWithPartitions(topic, 4);
Expand Down Expand Up @@ -215,34 +224,67 @@ public void test_autoScaler_scalesUpAndDown_withSlowPublish()
.build(dataSource, topic);
cluster.callApi().postSupervisor(supervisor);

// Ingest a large number of records to trigger a scale-up
// 10k records = 100 segments to publish * 100 rows per segment
int totalRecords = 0;
for (int i = 0; i < 10; ++i) {
totalRecords += publish1kRecords(topic, false);
}
overlord.latchableEmitter()
.waitForEvent(event -> event.hasMetricName("task/run/time")
.hasDimension(DruidMetrics.DATASOURCE, dataSource));

// Wait for tasks to scale up
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName("task/autoScaler/updatedCount")
.hasDimension(DruidMetrics.SUPERVISOR_ID, supervisor.getId())
.hasValueMatching(Matchers.greaterThan(1L))
);
Assertions.assertEquals(4, getCurrentTaskCount(supervisor.getId()));
waitUntilPublishedRecordsAreIngested(totalRecords);
final AtomicBoolean keepPublishing = new AtomicBoolean(true);
final AtomicInteger totalRecords = new AtomicInteger();
final ExecutorService publisher = Executors.newSingleThreadExecutor();
final Future<?> publisherFuture = publisher.submit(() -> {
for (int i = 0; i < MAX_SCALE_UP_RECORD_BATCHES && keepPublishing.get(); ++i) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of publishing 10k records upfront and then waiting, the test keeps publishing records in the background while the autoscaler is running. This gives the autoscaler a
stable lag signal to observe.

totalRecords.addAndGet(publish1kRecords(topic, false));
try {
TimeUnit.MILLISECONDS.sleep(SCALE_UP_PUBLISH_INTERVAL_MILLIS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});

// Let the tasks work through the lag.
// Do not publish any more records so that the idleness causes scale-down
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName("task/autoScaler/updatedCount")
.hasDimension(DruidMetrics.SUPERVISOR_ID, supervisor.getId())
.hasValueMatching(Matchers.equalTo(1L))
);
Assertions.assertEquals(1, getCurrentTaskCount(supervisor.getId()));
try {
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
.hasDimension(DruidMetrics.SUPERVISOR_ID, supervisor.getId())
.hasValueMatching(Matchers.greaterThan(1L))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait until the cost-based autoscaler computes that the optimal task count is greater than 1. This is only the autoscaler recommendation.

);

overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName("task/autoScaler/updatedCount")
.hasDimension(DruidMetrics.SUPERVISOR_ID, supervisor.getId())
.hasValueMatching(Matchers.greaterThan(1L))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the applied scale-up event.

);
keepPublishing.set(false);
publisherFuture.get(30, TimeUnit.SECONDS);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Surface publisher failures before waiting for scaler metrics

The background publisher future is only observed after both autoscaler metric waits succeed. If publish1kRecords throws before creating enough lag, the test now waits for scaler metrics that may never arrive and then exits through finally without ever calling publisherFuture.get(), masking the real producer failure. The previous synchronous publish path failed immediately. Check the future while waiting, or observe it in the failure path so producer exceptions fail the test directly.

ITRetryUtil.retryUntilTrue(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Avoid unbounded-length retries in this integration test

ITRetryUtil.retryUntilTrue uses the default 240 retries with 5 seconds between attempts, so this newly added check can add up to 20 minutes to a failing run, and the method itself has no @timeout. Since this test already uses 600-second latch waits, a regression can now occupy CI for much longer before failing. Use a bounded retry tuned for this test or add an explicit method timeout.

() -> getCurrentTaskCount(supervisor.getId()) > 1,
"supervisor task count to scale up"
);
waitUntilPublishedRecordsAreIngested(totalRecords.get());

// Let the tasks work through the lag.
// Do not publish any more records so that the idleness causes scale-down
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName("task/autoScaler/updatedCount")
.hasDimension(DruidMetrics.SUPERVISOR_ID, supervisor.getId())
.hasValueMatching(Matchers.equalTo(1L))
);
ITRetryUtil.retryUntilEquals(
() -> getCurrentTaskCount(supervisor.getId()),
1,
"supervisor task count to scale down"
);
}
finally {
keepPublishing.set(false);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up publisher and suspend supervisor

publisher.shutdownNow();
cluster.callApi().postSupervisor(supervisor.createSuspendedSpec());
}

cluster.callApi().postSupervisor(supervisor.createSuspendedSpec());
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
Assertions.assertEquals("10000", cluster.runSql("SELECT COUNT(*) FROM %s", dataSource));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert final row count from the actual number of published records.

Assertions.assertEquals(String.valueOf(totalRecords.get()), cluster.runSql("SELECT COUNT(*) FROM %s", dataSource));

final List<TaskStatusPlus> tasks = cluster.callApi().getTasks(dataSource, "complete");
Assertions.assertFalse(tasks.isEmpty());
Expand Down
Loading