Skip to content

Commit d0397cf

Browse files
committed
1 parent a7fe813 commit d0397cf

File tree

7 files changed

+127
-4
lines changed

7 files changed

+127
-4
lines changed

zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ static class AuthData {
168168

169169
private final int sessionTimeout;
170170

171+
private final long newSessionTimeout;
172+
171173
private final ZKWatchManager watchManager;
172174

173175
private long sessionId;
@@ -398,6 +400,36 @@ public ClientCnxn(
398400
long sessionId,
399401
byte[] sessionPasswd,
400402
boolean canBeReadOnly
403+
) throws IOException {
404+
this(hostProvider, sessionTimeout, Long.MAX_VALUE, clientConfig, defaultWatcher, clientCnxnSocket, sessionId, sessionPasswd, canBeReadOnly);
405+
}
406+
407+
/**
408+
* Creates a connection object. The actual network connect doesn't get
409+
* established until needed. The start() instance method must be called
410+
* after construction.
411+
*
412+
* @param hostProvider the list of ZooKeeper servers to connect to
413+
* @param sessionTimeout the timeout for connections.
414+
* @param newSessionTimeout the timeout before giving up brand-new session establishment.
415+
* @param clientConfig the client configuration.
416+
* @param defaultWatcher default watcher for this connection
417+
* @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
418+
* @param sessionId session id if re-establishing session
419+
* @param sessionPasswd session passwd if re-establishing session
420+
* @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning
421+
* @throws IOException in cases of broken network
422+
*/
423+
public ClientCnxn(
424+
HostProvider hostProvider,
425+
int sessionTimeout,
426+
long newSessionTimeout,
427+
ZKClientConfig clientConfig,
428+
Watcher defaultWatcher,
429+
ClientCnxnSocket clientCnxnSocket,
430+
long sessionId,
431+
byte[] sessionPasswd,
432+
boolean canBeReadOnly
401433
) throws IOException {
402434
this.hostProvider = hostProvider;
403435
this.sessionTimeout = sessionTimeout;
@@ -413,6 +445,7 @@ public ClientCnxn(
413445
this.connectTimeout = sessionTimeout / hostProvider.size();
414446
this.readTimeout = sessionTimeout * 2 / 3;
415447
this.expirationTimeout = sessionTimeout * 4 / 3;
448+
this.newSessionTimeout = newSessionTimeout == 0 ? expirationTimeout : newSessionTimeout;
416449

417450
this.sendThread = new SendThread(clientCnxnSocket);
418451
this.eventThread = new EventThread();
@@ -1192,7 +1225,12 @@ public void run() {
11921225
to = connectTimeout - clientCnxnSocket.getIdleSend();
11931226
}
11941227

1195-
int expiration = sessionId == 0 ? Integer.MAX_VALUE : expirationTimeout - clientCnxnSocket.getIdleRecv();
1228+
long expiration;
1229+
if (sessionId == 0) {
1230+
expiration = newSessionTimeout - clientCnxnSocket.getIdleRecv();
1231+
} else {
1232+
expiration = expirationTimeout - clientCnxnSocket.getIdleRecv();
1233+
}
11961234
if (expiration <= 0) {
11971235
String warnInfo = String.format(
11981236
"Client session timed out, have not heard from server in %dms for session id 0x%s",

zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,7 @@ public ZooKeeper(
697697
ClientCnxn createConnection(
698698
HostProvider hostProvider,
699699
int sessionTimeout,
700+
long newSessionTimeout,
700701
ZKClientConfig clientConfig,
701702
Watcher defaultWatcher,
702703
ClientCnxnSocket clientCnxnSocket,
@@ -707,6 +708,7 @@ ClientCnxn createConnection(
707708
return new ClientCnxn(
708709
hostProvider,
709710
sessionTimeout,
711+
newSessionTimeout,
710712
clientConfig,
711713
defaultWatcher,
712714
clientCnxnSocket,
@@ -1148,6 +1150,7 @@ public ZooKeeper(ZooKeeperOptions options) throws IOException {
11481150
cnxn = createConnection(
11491151
hostProvider,
11501152
sessionTimeout,
1153+
options.getNewSessionTimeoutMs(),
11511154
this.clientConfig,
11521155
watcher,
11531156
getClientCnxnSocket(),

zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
public class ZooKeeperBuilder {
4040
private final String connectString;
4141
private final Duration sessionTimeout;
42+
private Duration newSessionTimeout = Duration.ofSeconds(Long.MAX_VALUE, 999_999_999L);
4243
private Function<Collection<InetSocketAddress>, HostProvider> hostProvider;
4344
private Watcher defaultWatcher;
4445
private boolean canBeReadOnly = false;
@@ -117,6 +118,21 @@ public ZooKeeperBuilder withSession(long sessionId, byte[] sessionPasswd) {
117118
return this;
118119
}
119120

121+
/**
122+
* Specifies timeout to establish a brand-new session.
123+
*
124+
* @param timeout timeout to get {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} in establishing a
125+
* brand-new session. {@code Duration.ofSeconds(Long.MAX_VALUE, 999_999_999L)}, which is the default,
126+
* means endless retry until connected. {@code Duration.ZERO} means a sensible value deduced from
127+
* specified session timeout, currently, it is approximate {@code sessionTimeout * 4 / 3}.
128+
* @return this
129+
* @since 3.10.0
130+
*/
131+
public ZooKeeperBuilder withNewSessionTimeout(Duration timeout) {
132+
this.newSessionTimeout = timeout;
133+
return this;
134+
}
135+
120136
/**
121137
* Specifies the client config used to construct ZooKeeper instances.
122138
*
@@ -143,6 +159,7 @@ public ZooKeeperOptions toOptions() {
143159
return new ZooKeeperOptions(
144160
connectString,
145161
sessionTimeout,
162+
newSessionTimeout,
146163
defaultWatcher,
147164
hostProvider,
148165
canBeReadOnly,

zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
public class ZooKeeperOptions {
3434
private final String connectString;
3535
private final Duration sessionTimeout;
36+
private final Duration newSessionTimeout;
3637
private final Watcher defaultWatcher;
3738
private final Function<Collection<InetSocketAddress>, HostProvider> hostProvider;
3839
private final boolean canBeReadOnly;
@@ -42,6 +43,7 @@ public class ZooKeeperOptions {
4243

4344
ZooKeeperOptions(String connectString,
4445
Duration sessionTimeout,
46+
Duration newSessionTimeout,
4547
Watcher defaultWatcher,
4648
Function<Collection<InetSocketAddress>, HostProvider> hostProvider,
4749
boolean canBeReadOnly,
@@ -50,6 +52,7 @@ public class ZooKeeperOptions {
5052
ZKClientConfig clientConfig) {
5153
this.connectString = connectString;
5254
this.sessionTimeout = sessionTimeout;
55+
this.newSessionTimeout = newSessionTimeout;
5356
this.hostProvider = hostProvider;
5457
this.defaultWatcher = defaultWatcher;
5558
this.canBeReadOnly = canBeReadOnly;
@@ -66,6 +69,14 @@ public int getSessionTimeoutMs() {
6669
return (int) Long.min(Integer.MAX_VALUE, sessionTimeout.toMillis());
6770
}
6871

72+
public long getNewSessionTimeoutMs() {
73+
try {
74+
return newSessionTimeout.toMillis();
75+
} catch (ArithmeticException ignored) {
76+
return Long.MAX_VALUE;
77+
}
78+
}
79+
6980
public Watcher getDefaultWatcher() {
7081
return defaultWatcher;
7182
}

zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void testSocketClosedAfterFailure() throws Exception {
9696
BusyServer server = new BusyServer();
9797
ZooKeeper zk = new ZooKeeper(server.getHostPort(), (int) sessionTimeout.toMillis(), null) {
9898
@Override
99-
ClientCnxn createConnection(HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException {
99+
ClientCnxn createConnection(HostProvider hostProvider, int sessionTimeout, long newSessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException {
100100
ClientCnxnSocketNIO socket = spy((ClientCnxnSocketNIO) clientCnxnSocket);
101101

102102
doAnswer(mock -> {
@@ -110,7 +110,7 @@ ClientCnxn createConnection(HostProvider hostProvider, int sessionTimeout, ZKCli
110110
}).when(socket).createSock();
111111

112112
nioSelector.set(socket.getSelector());
113-
return super.createConnection(hostProvider, sessionTimeout, clientConfig, defaultWatcher, socket, sessionId, sessionPasswd, canBeReadOnly);
113+
return super.createConnection(hostProvider, sessionTimeout, newSessionTimeout, clientConfig, defaultWatcher, socket, sessionId, sessionPasswd, canBeReadOnly);
114114
}
115115
}) {
116116

@@ -328,6 +328,7 @@ class CustomClientCnxn extends ClientCnxn {
328328
public CustomClientCnxn(
329329
HostProvider hostProvider,
330330
int sessionTimeout,
331+
long newSessionTimeout,
331332
ZKClientConfig zkClientConfig,
332333
Watcher defaultWatcher,
333334
ClientCnxnSocket clientCnxnSocket,
@@ -338,6 +339,7 @@ public CustomClientCnxn(
338339
super(
339340
hostProvider,
340341
sessionTimeout,
342+
newSessionTimeout,
341343
zkClientConfig,
342344
defaultWatcher,
343345
clientCnxnSocket,
@@ -403,6 +405,7 @@ public boolean isAlive() {
403405
ClientCnxn createConnection(
404406
HostProvider hostProvider,
405407
int sessionTimeout,
408+
long newSessionTimeout,
406409
ZKClientConfig clientConfig,
407410
Watcher defaultWatcher,
408411
ClientCnxnSocket clientCnxnSocket,
@@ -415,6 +418,7 @@ ClientCnxn createConnection(
415418
ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(
416419
hostProvider,
417420
sessionTimeout,
421+
newSessionTimeout,
418422
clientConfig,
419423
defaultWatcher,
420424
clientCnxnSocket,
@@ -424,4 +428,4 @@ ClientCnxn createConnection(
424428
return ClientCnxnSocketFragilityTest.this.cnxn;
425429
}
426430
}
427-
}
431+
}

zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ class CustomClientCnxn extends ClientCnxn {
225225
CustomClientCnxn(
226226
HostProvider hostProvider,
227227
int sessionTimeout,
228+
long newSessionTimeout,
228229
ZKClientConfig clientConfig,
229230
Watcher defaultWatcher,
230231
ClientCnxnSocket clientCnxnSocket,
@@ -235,6 +236,7 @@ class CustomClientCnxn extends ClientCnxn {
235236
super(
236237
hostProvider,
237238
sessionTimeout,
239+
newSessionTimeout,
238240
clientConfig,
239241
defaultWatcher,
240242
clientCnxnSocket,
@@ -286,6 +288,7 @@ public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher
286288
ClientCnxn createConnection(
287289
HostProvider hostProvider,
288290
int sessionTimeout,
291+
long newSessionTimeout,
289292
ZKClientConfig clientConfig,
290293
Watcher defaultWatcher,
291294
ClientCnxnSocket clientCnxnSocket,
@@ -296,6 +299,7 @@ ClientCnxn createConnection(
296299
return new CustomClientCnxn(
297300
hostProvider,
298301
sessionTimeout,
302+
newSessionTimeout,
299303
clientConfig,
300304
defaultWatcher,
301305
clientCnxnSocket,

zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.zookeeper.test;
2020

2121
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.hamcrest.Matchers.greaterThan;
2223
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
2324
import static org.hamcrest.Matchers.lessThan;
2425
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -27,6 +28,7 @@
2728
import static org.junit.jupiter.api.Assertions.assertTrue;
2829
import static org.junit.jupiter.api.Assertions.fail;
2930
import java.io.IOException;
31+
import java.time.Duration;
3032
import java.util.Arrays;
3133
import java.util.List;
3234
import java.util.concurrent.CompletableFuture;
@@ -40,6 +42,7 @@
4042
import org.apache.zookeeper.Watcher;
4143
import org.apache.zookeeper.ZooDefs;
4244
import org.apache.zookeeper.ZooKeeper;
45+
import org.apache.zookeeper.client.ZooKeeperBuilder;
4346
import org.apache.zookeeper.common.BusyServer;
4447
import org.apache.zookeeper.common.Time;
4548
import org.junit.jupiter.api.BeforeEach;
@@ -176,6 +179,49 @@ public void testSessionExpirationWhenNoServerUp() throws Exception {
176179
assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS));
177180
assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null));
178181
}
182+
183+
// when: try to establish a brand-new session using builder with default newSessionTimeout
184+
watcher.reset();
185+
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, Duration.ofMillis(sessionTimeout))
186+
.withDefaultWatcher(watcher)
187+
.build()) {
188+
// then: never Expired
189+
assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS));
190+
assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null));
191+
}
192+
193+
// when: try to establish a brand-new session using builder with Duration.ZERO newSessionTimeout
194+
watcher.reset();
195+
long start = Time.currentElapsedTime();
196+
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, Duration.ofMillis(sessionTimeout))
197+
.withDefaultWatcher(watcher)
198+
.withNewSessionTimeout(Duration.ZERO)
199+
.build()) {
200+
// then: get Expired after some delay
201+
watcher.expired.join();
202+
long elapsed = Time.currentElapsedTime() - start;
203+
assertThat(elapsed, greaterThan((long) sessionTimeout));
204+
assertThat(elapsed, lessThan(sessionTimeout * 10L));
205+
// then: future request will get SessionExpiredException
206+
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
207+
}
208+
209+
// when: try to establish a brand-new session using builder with custom newSessionTimeout
210+
watcher.reset();
211+
start = Time.currentElapsedTime();
212+
Duration newSessionTimeout = Duration.ofMillis(300);
213+
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, Duration.ofMillis(30000))
214+
.withDefaultWatcher(watcher)
215+
.withNewSessionTimeout(newSessionTimeout)
216+
.build()) {
217+
// then: get Expired after newSessionTimeout
218+
watcher.expired.join();
219+
long elapsed = Time.currentElapsedTime() - start;
220+
assertThat(elapsed, greaterThanOrEqualTo(newSessionTimeout.toMillis()));
221+
assertThat(elapsed, lessThan(newSessionTimeout.toMillis() * 10));
222+
// then: future request will get SessionExpiredException
223+
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
224+
}
179225
}
180226

181227
@Test

0 commit comments

Comments
 (0)