Skip to content

Commit 91871b6

Browse files
committed
Support for StreamingCredentials
This enables use cases like credential rotation and token based auth without client disconnect. Especially with Pub/Sub clients will reduce the chnance of missing events.
1 parent 28a4154 commit 91871b6

9 files changed

+534
-0
lines changed
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package io.lettuce.core;
2+
3+
import io.lettuce.core.codec.StringCodec;
4+
import io.lettuce.core.protocol.AsyncCommand;
5+
import io.lettuce.core.protocol.RedisCommand;
6+
import io.netty.util.internal.logging.InternalLogger;
7+
import io.netty.util.internal.logging.InternalLoggerFactory;
8+
import reactor.core.Disposable;
9+
import reactor.core.publisher.Flux;
10+
11+
import java.nio.CharBuffer;
12+
import java.util.concurrent.atomic.AtomicReference;
13+
14+
public abstract class BaseRedisAuthenticationHandler<T extends RedisChannelHandler<?, ?>> {
15+
16+
private static final InternalLogger log = InternalLoggerFactory.getInstance(BaseRedisAuthenticationHandler.class);
17+
18+
protected final T connection;
19+
20+
private final RedisCommandBuilder<String, String> commandBuilder = new RedisCommandBuilder<>(StringCodec.UTF8);
21+
22+
private final AtomicReference<Disposable> credentialsSubscription = new AtomicReference<>();
23+
24+
public BaseRedisAuthenticationHandler(T connection) {
25+
this.connection = connection;
26+
}
27+
28+
/**
29+
* Subscribes to the provided `Flux` of credentials if the given `RedisCredentialsProvider` supports streaming credentials.
30+
* <p>
31+
* This method subscribes to a stream of credentials provided by the `StreamingCredentialsProvider`. Each time new
32+
* credentials are received, the client is reauthenticated. If the connection is not supported, the method returns without
33+
* subscribing.
34+
* <p>
35+
* The previous subscription, if any, is disposed of before setting the new subscription.
36+
*
37+
* @param credentialsProvider the credentials provider to subscribe to
38+
*/
39+
public void subscribe(RedisCredentialsProvider credentialsProvider) {
40+
if (credentialsProvider == null) {
41+
return;
42+
}
43+
44+
if (credentialsProvider instanceof StreamingCredentialsProvider) {
45+
if (!isSupportedConnection()) {
46+
return;
47+
}
48+
49+
Flux<RedisCredentials> credentialsFlux = ((StreamingCredentialsProvider) credentialsProvider).credentials();
50+
51+
Disposable subscription = credentialsFlux.subscribe(this::onNext, this::onError, this::complete);
52+
53+
Disposable oldSubscription = credentialsSubscription.getAndSet(subscription);
54+
if (oldSubscription != null && !oldSubscription.isDisposed()) {
55+
oldSubscription.dispose();
56+
}
57+
}
58+
}
59+
60+
/**
61+
* Unsubscribes from the current credentials stream.
62+
*/
63+
public void unsubscribe() {
64+
Disposable subscription = credentialsSubscription.getAndSet(null);
65+
if (subscription != null && !subscription.isDisposed()) {
66+
subscription.dispose();
67+
}
68+
}
69+
70+
protected void complete() {
71+
log.debug("Credentials stream completed");
72+
}
73+
74+
protected void onNext(RedisCredentials credentials) {
75+
reauthenticate(credentials);
76+
}
77+
78+
protected void onError(Throwable e) {
79+
log.error("Credentials renew failed.", e);
80+
}
81+
82+
/**
83+
* Performs re-authentication with the provided credentials.
84+
*
85+
* @param credentials the new credentials
86+
*/
87+
private void reauthenticate(RedisCredentials credentials) {
88+
CharSequence password = CharBuffer.wrap(credentials.getPassword());
89+
90+
AsyncCommand<String, String, String> authCmd;
91+
if (credentials.hasUsername()) {
92+
authCmd = new AsyncCommand<>(commandBuilder.auth(credentials.getUsername(), password));
93+
} else {
94+
authCmd = new AsyncCommand<>(commandBuilder.auth(password));
95+
}
96+
97+
dispatchAuth(authCmd).exceptionally(throwable -> {
98+
log.error("Re-authentication {} failed.", credentials.hasUsername() ? "with username" : "without username",
99+
throwable);
100+
return null;
101+
});
102+
}
103+
104+
protected boolean isSupportedConnection() {
105+
return true;
106+
}
107+
108+
private AsyncCommand<String, String, String> dispatchAuth(RedisCommand<String, String, String> authCommand) {
109+
AsyncCommand<String, String, String> asyncCommand = new AsyncCommand<>(authCommand);
110+
RedisCommand<String, String, String> dispatched = connection.getChannelWriter().write(asyncCommand);
111+
if (dispatched instanceof AsyncCommand) {
112+
return (AsyncCommand<String, String, String>) dispatched;
113+
}
114+
return asyncCommand;
115+
}
116+
117+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2019-Present, Redis Ltd. and Contributors
3+
* All rights reserved.
4+
*
5+
* Licensed under the MIT License.
6+
*
7+
* This file contains contributions from third-party contributors
8+
* licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* https://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
package io.lettuce.core;
21+
22+
import io.lettuce.core.protocol.ProtocolVersion;
23+
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
24+
import io.netty.util.internal.logging.InternalLogger;
25+
import io.netty.util.internal.logging.InternalLoggerFactory;
26+
27+
class RedisAuthenticationHandler extends BaseRedisAuthenticationHandler<StatefulRedisConnectionImpl<?, ?>> {
28+
29+
private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisAuthenticationHandler.class);
30+
31+
public RedisAuthenticationHandler(StatefulRedisConnectionImpl<?, ?> connection) {
32+
super(connection);
33+
}
34+
35+
protected boolean isSupportedConnection() {
36+
if (connection instanceof StatefulRedisPubSubConnection
37+
&& ProtocolVersion.RESP2 == connection.getConnectionState().getNegotiatedProtocolVersion()) {
38+
logger.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection.");
39+
return false;
40+
}
41+
return true;
42+
}
43+
44+
}

src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class StatefulRedisConnectionImpl<K, V> extends RedisChannelHandler<K, V>
6767

6868
private final PushHandler pushHandler;
6969

70+
private final RedisAuthenticationHandler authHandler;
71+
7072
private final Mono<JsonParser> parser;
7173

7274
protected MultiOutput<K, V> multi;
@@ -104,6 +106,8 @@ public StatefulRedisConnectionImpl(RedisChannelWriter writer, PushHandler pushHa
104106
this.async = newRedisAsyncCommandsImpl();
105107
this.sync = newRedisSyncCommandsImpl();
106108
this.reactive = newRedisReactiveCommandsImpl();
109+
110+
this.authHandler = new RedisAuthenticationHandler(this);
107111
}
108112

109113
public RedisCodec<K, V> getCodec() {
@@ -315,4 +319,16 @@ public ConnectionState getConnectionState() {
315319
return state;
316320
}
317321

322+
@Override
323+
public void activated() {
324+
super.activated();
325+
authHandler.subscribe(state.getCredentialsProvider());
326+
}
327+
328+
@Override
329+
public void deactivated() {
330+
authHandler.unsubscribe();
331+
super.deactivated();
332+
}
333+
318334
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.lettuce.core;
2+
3+
import reactor.core.publisher.Flux;
4+
5+
public interface StreamingCredentialsProvider extends RedisCredentialsProvider {
6+
7+
/**
8+
* Returns a {@link Flux} emitting {@link RedisCredentials} that can be used to authorize a Redis connection. This
9+
* credential provider supports streaming credentials, meaning that it can emit multiple credentials over time.
10+
*
11+
* @return
12+
*/
13+
Flux<RedisCredentials> credentials();
14+
15+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2019-Present, Redis Ltd. and Contributors
3+
* All rights reserved.
4+
*
5+
* Licensed under the MIT License.
6+
*
7+
* This file contains contributions from third-party contributors
8+
* licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* https://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
package io.lettuce.core.cluster;
21+
22+
import io.lettuce.core.BaseRedisAuthenticationHandler;
23+
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
24+
import io.lettuce.core.protocol.ProtocolVersion;
25+
import io.netty.util.internal.logging.InternalLogger;
26+
import io.netty.util.internal.logging.InternalLoggerFactory;
27+
28+
class RedisClusterAuthenticationHandler extends BaseRedisAuthenticationHandler<StatefulRedisClusterConnectionImpl<?, ?>> {
29+
30+
private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClusterAuthenticationHandler.class);
31+
32+
public RedisClusterAuthenticationHandler(StatefulRedisClusterConnectionImpl<?, ?> connection) {
33+
super(connection);
34+
}
35+
36+
protected boolean isSupportedConnection() {
37+
if (connection instanceof StatefulRedisClusterPubSubConnection
38+
&& ProtocolVersion.RESP2 == connection.getConnectionState().getNegotiatedProtocolVersion()) {
39+
logger.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection.");
40+
return false;
41+
}
42+
return true;
43+
}
44+
45+
}

src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ public class StatefulRedisClusterConnectionImpl<K, V> extends RedisChannelHandle
8989

9090
private volatile Partitions partitions;
9191

92+
private final RedisClusterAuthenticationHandler authHandler;
93+
9294
/**
9395
* Initialize a new connection.
9496
*
@@ -123,6 +125,8 @@ public StatefulRedisClusterConnectionImpl(RedisChannelWriter writer, ClusterPush
123125
this.async = newRedisAdvancedClusterAsyncCommandsImpl();
124126
this.sync = newRedisAdvancedClusterCommandsImpl();
125127
this.reactive = newRedisAdvancedClusterReactiveCommandsImpl();
128+
129+
this.authHandler = new RedisClusterAuthenticationHandler(this);
126130
}
127131

128132
protected RedisAdvancedClusterReactiveCommandsImpl<K, V> newRedisAdvancedClusterReactiveCommandsImpl() {
@@ -230,6 +234,12 @@ public void activated() {
230234
super.activated();
231235

232236
async.clusterMyId().thenAccept(connectionState::setNodeId);
237+
authHandler.subscribe(connectionState.getCredentialsProvider());
238+
}
239+
240+
@Override
241+
public void deactivated() {
242+
authHandler.unsubscribe();
233243
}
234244

235245
ClusterDistributionChannelWriter getClusterDistributionChannelWriter() {

0 commit comments

Comments
 (0)