Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import org.junit.After;
import org.junit.Before;
Expand All @@ -54,16 +55,6 @@ public class DefaultControllerBuilderTest {

@Rule public WireMockRule wireMockRule = new WireMockRule(PORT);

private final int stepCooldownIntervalInMillis = 500;

private void cooldown() {
try {
Thread.sleep(stepCooldownIntervalInMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Before
public void setUp() throws Exception {
client = new ClientBuilder().setBasePath("http://localhost:" + PORT).build();
Expand Down Expand Up @@ -134,7 +125,7 @@ public Result reconcile(Request request) {
}

@Test
public void testBuildWatchEventNotificationShouldWork() {
public void testBuildWatchEventNotificationShouldWork() throws InterruptedException {
V1PodList podList =
new V1PodList()
.metadata(new V1ListMeta().resourceVersion("0"))
Expand Down Expand Up @@ -183,13 +174,17 @@ public void testBuildWatchEventNotificationShouldWork() {
};

List<Request> controllerReceivingRequests = new ArrayList<>();
Controller testController =
final Semaphore latch = new Semaphore(1);
latch.acquire();

final Controller testController =
ControllerBuilder.defaultBuilder(informerFactory)
.withReconciler(
new Reconciler() {
@Override
public Result reconcile(Request request) {
controllerReceivingRequests.add(request);
latch.release();
return new Result(false);
}
})
Expand All @@ -203,10 +198,10 @@ public Result reconcile(Request request) {
controllerThead.submit(testController::run);
informerFactory.startAllRegisteredInformers();

Request expectedRequest = new Request("hostname1/test-pod1");

cooldown();
// Wait for the request to be processed.
latch.acquire(1);

Request expectedRequest = new Request("hostname1/test-pod1");
assertEquals(1, keyFuncReceivingRequests.size());
assertEquals(expectedRequest, keyFuncReceivingRequests.get(0));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,37 @@ public void testSimpleDelayingQueue() throws Exception {

@Test
public void testDeduping() throws Exception {
final Instant staticTime = Instant.now();
DefaultDelayingQueue<String> queue = new DefaultDelayingQueue<>();
String item = "foo";

// Hold time still
queue.injectTimeSource(
() -> {
return staticTime;
});

queue.addAfter(item, Duration.ofMillis(50));
assertTrue(waitForWaitingQueueToFill(queue));
queue.addAfter(item, Duration.ofMillis(70));
assertTrue(waitForWaitingQueueToFill(queue));
assertTrue("should not have added", queue.length() == 0);

// step past the first block, we should receive now
Thread.sleep(60L);
// Advance time
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(60);
});
assertTrue(waitForAdded(queue, 1));
item = queue.get();
queue.done(item);

// step past the second add
Thread.sleep(20L);
// Advance time
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(90);
});
assertTrue("should not have added", queue.length() == 0);

// test again, but this time the earlier should override
Expand All @@ -77,19 +91,33 @@ public void testDeduping() throws Exception {
assertTrue(waitForWaitingQueueToFill(queue));
assertTrue("should not have added", queue.length() == 0);

Thread.sleep(40L);
// Advance time
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(150);
});
assertTrue(waitForAdded(queue, 1));
item = queue.get();
queue.done(item);

// step past the second add
Thread.sleep(1L);
// Advance time
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(190);
});
assertTrue("should not have added", queue.length() == 0);
}

@Test
public void testCopyShifting() throws Exception {
final Instant staticTime = Instant.now();
DefaultDelayingQueue<String> queue = new DefaultDelayingQueue<>();
queue.injectTimeSource(
() -> {
return staticTime;
});

final String first = "foo";
final String second = "bar";
final String third = "baz";
Expand All @@ -100,7 +128,10 @@ public void testCopyShifting() throws Exception {
assertTrue(waitForWaitingQueueToFill(queue));
assertTrue("should not have added", queue.length() == 0);

Thread.sleep(2000L);
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(2000);
});
assertTrue(waitForAdded(queue, 3));
String actualFirst = queue.get();
assertEquals(actualFirst, third);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import com.github.tomakehurst.wiremock.core.Admin;
import com.github.tomakehurst.wiremock.extension.Parameters;
import com.github.tomakehurst.wiremock.extension.PostServeAction;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.github.tomakehurst.wiremock.stubbing.ServeEvent;
import com.google.gson.Gson;
import io.kubernetes.client.informer.SharedIndexInformer;
import io.kubernetes.client.informer.SharedInformerFactory;
Expand All @@ -37,7 +42,8 @@
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import java.util.Arrays;
import org.junit.Rule;
import java.util.concurrent.Semaphore;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -50,14 +56,30 @@
@SpringBootTest
public class KubernetesInformerCreatorTest {

@Rule public WireMockRule wireMockRule = new WireMockRule(8188);
public static class CountRequestAction extends PostServeAction {
@Override
public String getName() {
return "semaphore";
}

@Override
public void doAction(ServeEvent serveEvent, Admin admin, Parameters parameters) {
Semaphore count = (Semaphore) parameters.get("semaphore");
count.release();
}
}

@ClassRule
public static WireMockRule wireMockRule =
new WireMockRule(options().dynamicPort().extensions(new CountRequestAction()));

@SpringBootApplication
static class App {

@Bean
public ApiClient testingApiClient() {
ApiClient apiClient = new ClientBuilder().setBasePath("http://localhost:" + 8188).build();
ApiClient apiClient =
new ClientBuilder().setBasePath("http://localhost:" + wireMockRule.port()).build();
return apiClient;
}

Expand Down Expand Up @@ -91,6 +113,13 @@ public void testInformerInjection() throws InterruptedException {
assertNotNull(podInformer);
assertNotNull(configMapInformer);

Semaphore getCount = new Semaphore(2);
Semaphore watchCount = new Semaphore(2);
Parameters getParams = new Parameters();
Parameters watchParams = new Parameters();
getParams.put("semaphore", getCount);
watchParams.put("semaphore", watchCount);

V1Pod foo1 =
new V1Pod().kind("Pod").metadata(new V1ObjectMeta().namespace("default").name("foo1"));
V1ConfigMap bar1 =
Expand All @@ -100,6 +129,7 @@ public void testInformerInjection() throws InterruptedException {

wireMockRule.stubFor(
get(urlMatching("^/api/v1/pods.*"))
.withPostServeAction("semaphore", getParams)
.withQueryParam("watch", equalTo("false"))
.willReturn(
aResponse()
Expand All @@ -112,11 +142,13 @@ public void testInformerInjection() throws InterruptedException {
.items(Arrays.asList(foo1))))));
wireMockRule.stubFor(
get(urlMatching("^/api/v1/pods.*"))
.withPostServeAction("semaphore", watchParams)
.withQueryParam("watch", equalTo("true"))
.willReturn(aResponse().withStatus(200).withBody("{}")));

wireMockRule.stubFor(
get(urlMatching("^/api/v1/namespaces/default/configmaps.*"))
.withPostServeAction("semaphore", getParams)
.withQueryParam("watch", equalTo("false"))
.willReturn(
aResponse()
Expand All @@ -129,12 +161,19 @@ public void testInformerInjection() throws InterruptedException {
.items(Arrays.asList(bar1))))));
wireMockRule.stubFor(
get(urlMatching("^/api/v1/namespaces/default/configmaps.*"))
.withPostServeAction("semaphore", watchParams)
.withQueryParam("watch", equalTo("true"))
.willReturn(aResponse().withStatus(200).withBody("{}")));

// These will be released for each web call above.
getCount.acquire(2);
watchCount.acquire(2);

informerFactory.startAllRegisteredInformers();

Thread.sleep(200);
// Wait for the GETs to complete and the watches to start.
getCount.acquire(2);
watchCount.acquire(2);

verify(
1,
Expand Down