From 94d2d4fafc234a672ba10508e0cc1538ee36e8d7 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 17 Jan 2022 11:30:33 +0100 Subject: [PATCH 1/6] using futures in queueTime tests --- src/test/java/com/arangodb/ArangoDBTest.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/test/java/com/arangodb/ArangoDBTest.java b/src/test/java/com/arangodb/ArangoDBTest.java index ca05e060f..ae951f34f 100644 --- a/src/test/java/com/arangodb/ArangoDBTest.java +++ b/src/test/java/com/arangodb/ArangoDBTest.java @@ -44,6 +44,9 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -725,13 +728,15 @@ public void accessMultipleDatabases() { } @Test - public void queueTime() throws InterruptedException { - List threads = IntStream.range(0, 80) - .mapToObj(__ -> new Thread(() -> arangoDB.db().query("RETURN SLEEP(1)", Void.class))) + public void queueTime() throws InterruptedException, ExecutionException { + List> futures = IntStream.range(0, 80) + .mapToObj(i -> CompletableFuture.runAsync( + () -> arangoDB.db().query("RETURN SLEEP(1)", Void.class), + Executors.newFixedThreadPool(80)) + ) .collect(Collectors.toList()); - threads.forEach(Thread::start); - for (Thread it : threads) { - it.join(); + for (CompletableFuture f : futures) { + f.get(); } QueueTimeMetrics qt = arangoDB.metrics().getQueueTime(); From c7d92e4c5772820fc178119ad84af7594b4e5997 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 17 Jan 2022 11:59:25 +0100 Subject: [PATCH 2/6] added concurrency tests --- .../java/com/arangodb/ConcurrencyTests.java | 42 +++++++++++++++++++ .../com/arangodb/async/ConcurrencyTests.java | 24 +++++++++++ 2 files changed, 66 insertions(+) create mode 100644 src/test/java/com/arangodb/ConcurrencyTests.java create mode 100644 src/test/java/com/arangodb/async/ConcurrencyTests.java diff --git a/src/test/java/com/arangodb/ConcurrencyTests.java b/src/test/java/com/arangodb/ConcurrencyTests.java new file mode 100644 index 000000000..32e954571 --- /dev/null +++ b/src/test/java/com/arangodb/ConcurrencyTests.java @@ -0,0 +1,42 @@ +package com.arangodb; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@RunWith(Parameterized.class) +public class ConcurrencyTests { + + final Protocol protocol; + + public ConcurrencyTests(Protocol protocol) { + this.protocol = protocol; + } + + @Parameterized.Parameters + public static Protocol[] protocols() { + return Protocol.values(); + } + + @Test + public void concurrentPendingRequests() throws ExecutionException, InterruptedException { + ArangoDB adb = new ArangoDB.Builder().useProtocol(protocol).build(); + List> futures = IntStream.range(0, 10) + .mapToObj(i -> CompletableFuture.runAsync( + () -> adb.db().query("RETURN SLEEP(1)", Void.class), + Executors.newFixedThreadPool(10)) + ) + .collect(Collectors.toList()); + for (CompletableFuture f : futures) { + f.get(); + } + } + +} diff --git a/src/test/java/com/arangodb/async/ConcurrencyTests.java b/src/test/java/com/arangodb/async/ConcurrencyTests.java new file mode 100644 index 000000000..f68cfe6bb --- /dev/null +++ b/src/test/java/com/arangodb/async/ConcurrencyTests.java @@ -0,0 +1,24 @@ +package com.arangodb.async; + +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ConcurrencyTests { + + @Test + public void concurrentPendingRequests() throws ExecutionException, InterruptedException { + ArangoDBAsync adb = new ArangoDBAsync.Builder().build(); + List>> reqs = IntStream.range(0, 10) + .mapToObj(__ -> adb.db().query("RETURN SLEEP(1)", Void.class)) + .collect(Collectors.toList()); + for (CompletableFuture> req : reqs) { + req.get(); + } + } + +} From 7b1900b45d6855c813c80a566b2e533918f118b8 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 17 Jan 2022 12:10:39 +0100 Subject: [PATCH 3/6] CI: print AF leader --- docker/start_db.sh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docker/start_db.sh b/docker/start_db.sh index 02f48db2e..9c01447d0 100755 --- a/docker/start_db.sh +++ b/docker/start_db.sh @@ -96,3 +96,9 @@ for a in ${COORDINATORS[*]} ; do echo "$SCHEME://$a" echo "" done + +if [ "$STARTER_MODE" == "activefailover" ]; then + LEADER=$("$LOCATION"/find_active_endpoint.sh) + echo "Leader: $SCHEME://$LEADER" + echo "" +fi From fec7a8a4d9678140dbea871cb4988415bf078d03 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 17 Jan 2022 10:44:30 +0100 Subject: [PATCH 4/6] AF: check host before closing --- .../internal/net/DirtyReadHostHandler.java | 10 ++++++++++ .../arangodb/internal/net/FallbackHostHandler.java | 14 ++++++++++++++ .../com/arangodb/internal/net/HostHandler.java | 4 ++++ .../arangodb/internal/net/RandomHostHandler.java | 14 ++++++++++++++ .../internal/net/RoundRobinHostHandler.java | 10 ++++++++++ .../velocystream/VstCommunicationSync.java | 4 ++-- 6 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java b/src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java index d8e3f0033..db3879d80 100644 --- a/src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java +++ b/src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java @@ -60,6 +60,11 @@ public void fail(Exception exception) { determineHostHandler().fail(exception); } + @Override + public void failIfNotMatch(HostDescription host, Exception exception) { + determineHostHandler().failIfNotMatch(host, exception); + } + @Override public void reset() { determineHostHandler().reset(); @@ -81,6 +86,11 @@ public void closeCurrentOnError() { determineHostHandler().closeCurrentOnError(); } + @Override + public void closeCurrentOnErrorIfNotMatch(HostDescription host) { + determineHostHandler().closeCurrentOnErrorIfNotMatch(host); + } + @Override public void setJwt(String jwt) { master.setJwt(jwt); diff --git a/src/main/java/com/arangodb/internal/net/FallbackHostHandler.java b/src/main/java/com/arangodb/internal/net/FallbackHostHandler.java index a1427d426..241a1b270 100644 --- a/src/main/java/com/arangodb/internal/net/FallbackHostHandler.java +++ b/src/main/java/com/arangodb/internal/net/FallbackHostHandler.java @@ -79,6 +79,13 @@ public void fail(Exception exception) { lastFailExceptions.add(exception); } + @Override + public synchronized void failIfNotMatch(HostDescription host, Exception exception) { + if (!host.equals(current.getDescription())) { + fail(exception); + } + } + @Override public void reset() { iterations = 0; @@ -104,6 +111,13 @@ public void closeCurrentOnError() { current.closeOnError(); } + @Override + public synchronized void closeCurrentOnErrorIfNotMatch(HostDescription host) { + if (!host.equals(current.getDescription())) { + closeCurrentOnError(); + } + } + @Override public void setJwt(String jwt) { hosts.setJwt(jwt); diff --git a/src/main/java/com/arangodb/internal/net/HostHandler.java b/src/main/java/com/arangodb/internal/net/HostHandler.java index 42c1e7570..1a3ad8d92 100644 --- a/src/main/java/com/arangodb/internal/net/HostHandler.java +++ b/src/main/java/com/arangodb/internal/net/HostHandler.java @@ -33,6 +33,8 @@ public interface HostHandler { void fail(Exception exception); + void failIfNotMatch(HostDescription host, Exception exception); + void reset(); void confirm(); @@ -41,6 +43,8 @@ public interface HostHandler { void closeCurrentOnError(); + void closeCurrentOnErrorIfNotMatch(HostDescription host); + void setJwt(String jwt); } diff --git a/src/main/java/com/arangodb/internal/net/RandomHostHandler.java b/src/main/java/com/arangodb/internal/net/RandomHostHandler.java index 3ab1ce7da..7e7a4464b 100644 --- a/src/main/java/com/arangodb/internal/net/RandomHostHandler.java +++ b/src/main/java/com/arangodb/internal/net/RandomHostHandler.java @@ -59,6 +59,13 @@ public void fail(Exception exception) { current = fallback.get(null, null); } + @Override + public synchronized void failIfNotMatch(HostDescription host, Exception exception) { + if (!host.equals(current.getDescription())) { + fail(exception); + } + } + private Host getRandomHost(final boolean initial, final boolean closeConnections) { hosts = resolver.resolve(initial, closeConnections); final ArrayList hostList = new ArrayList<>(hosts.getHostsList()); @@ -85,6 +92,13 @@ public void closeCurrentOnError() { current.closeOnError(); } + @Override + public synchronized void closeCurrentOnErrorIfNotMatch(HostDescription host) { + if (!host.equals(current.getDescription())) { + closeCurrentOnError(); + } + } + @Override public void setJwt(String jwt) { fallback.setJwt(jwt); diff --git a/src/main/java/com/arangodb/internal/net/RoundRobinHostHandler.java b/src/main/java/com/arangodb/internal/net/RoundRobinHostHandler.java index 99f3c24eb..b79c85c14 100644 --- a/src/main/java/com/arangodb/internal/net/RoundRobinHostHandler.java +++ b/src/main/java/com/arangodb/internal/net/RoundRobinHostHandler.java @@ -89,6 +89,11 @@ public void fail(Exception exception) { lastFailExceptions.add(exception); } + @Override + public void failIfNotMatch(HostDescription host, Exception exception) { + fail(exception); + } + @Override public void reset() { fails = 0; @@ -109,6 +114,11 @@ public void closeCurrentOnError() { currentHost.closeOnError(); } + @Override + public void closeCurrentOnErrorIfNotMatch(HostDescription host) { + closeCurrentOnError(); + } + @Override public void setJwt(String jwt) { hosts.setJwt(jwt); diff --git a/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java b/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java index 59a03a551..85099bacd 100644 --- a/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java +++ b/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java @@ -151,8 +151,8 @@ protected Response execute(final Request request, final VstConnectionSync connec } final String location = e.getLocation(); final HostDescription redirectHost = HostUtils.createFromLocation(location); - hostHandler.closeCurrentOnError(); - hostHandler.fail(e); + hostHandler.closeCurrentOnErrorIfNotMatch(redirectHost); + hostHandler.failIfNotMatch(redirectHost, e); return execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1); } } From b10a8a1c84e8ad1394e4de1ced702a285f1e0b02 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 17 Jan 2022 11:11:31 +0100 Subject: [PATCH 5/6] AF: keep failed host with pending requests --- .../async/internal/velocystream/VstCommunicationAsync.java | 3 +-- .../java/com/arangodb/internal/http/HttpCommunication.java | 3 +-- .../arangodb/internal/velocystream/VstCommunicationSync.java | 1 - 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/arangodb/async/internal/velocystream/VstCommunicationAsync.java b/src/main/java/com/arangodb/async/internal/velocystream/VstCommunicationAsync.java index f07eeba2a..38acb45be 100644 --- a/src/main/java/com/arangodb/async/internal/velocystream/VstCommunicationAsync.java +++ b/src/main/java/com/arangodb/async/internal/velocystream/VstCommunicationAsync.java @@ -85,8 +85,7 @@ protected CompletableFuture execute(final Request request, final VstCo } final String location = e.getLocation(); final HostDescription redirectHost = HostUtils.createFromLocation(location); - hostHandler.closeCurrentOnError(); - hostHandler.fail(e); + hostHandler.failIfNotMatch(redirectHost, e); execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1) .whenComplete((v, err) -> { if (v != null) { diff --git a/src/main/java/com/arangodb/internal/http/HttpCommunication.java b/src/main/java/com/arangodb/internal/http/HttpCommunication.java index c40106b8a..5468bbbbb 100644 --- a/src/main/java/com/arangodb/internal/http/HttpCommunication.java +++ b/src/main/java/com/arangodb/internal/http/HttpCommunication.java @@ -106,8 +106,7 @@ private Response execute(final Request request, final HostHandle hostHandle, fin if (e instanceof ArangoDBRedirectException && attemptCount < 3) { final String location = ((ArangoDBRedirectException) e).getLocation(); final HostDescription redirectHost = HostUtils.createFromLocation(location); - hostHandler.closeCurrentOnError(); - hostHandler.fail(e); + hostHandler.failIfNotMatch(redirectHost, e); return execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1); } else { throw e; diff --git a/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java b/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java index 85099bacd..e01e22bc2 100644 --- a/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java +++ b/src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java @@ -151,7 +151,6 @@ protected Response execute(final Request request, final VstConnectionSync connec } final String location = e.getLocation(); final HostDescription redirectHost = HostUtils.createFromLocation(location); - hostHandler.closeCurrentOnErrorIfNotMatch(redirectHost); hostHandler.failIfNotMatch(redirectHost, e); return execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1); } From 6d558cc1da43fc474e5e4197e6ac8a4f3560009f Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 17 Jan 2022 12:14:44 +0100 Subject: [PATCH 6/6] tests shutdown --- src/test/java/com/arangodb/ConcurrencyTests.java | 1 + src/test/java/com/arangodb/async/ConcurrencyTests.java | 1 + 2 files changed, 2 insertions(+) diff --git a/src/test/java/com/arangodb/ConcurrencyTests.java b/src/test/java/com/arangodb/ConcurrencyTests.java index 32e954571..62645cb82 100644 --- a/src/test/java/com/arangodb/ConcurrencyTests.java +++ b/src/test/java/com/arangodb/ConcurrencyTests.java @@ -37,6 +37,7 @@ public void concurrentPendingRequests() throws ExecutionException, InterruptedEx for (CompletableFuture f : futures) { f.get(); } + adb.shutdown(); } } diff --git a/src/test/java/com/arangodb/async/ConcurrencyTests.java b/src/test/java/com/arangodb/async/ConcurrencyTests.java index f68cfe6bb..746ef2923 100644 --- a/src/test/java/com/arangodb/async/ConcurrencyTests.java +++ b/src/test/java/com/arangodb/async/ConcurrencyTests.java @@ -19,6 +19,7 @@ public void concurrentPendingRequests() throws ExecutionException, InterruptedEx for (CompletableFuture> req : reqs) { req.get(); } + adb.shutdown(); } }