5
5
6
6
import javax .inject .Inject ;
7
7
8
- import io .lettuce .authx .TokenBasedRedisCredentialsProvider ;
9
8
import io .lettuce .authx .TokenBasedRedisCredentialsProvider ;
10
9
import io .lettuce .core .event .command .CommandListener ;
11
10
import io .lettuce .core .event .command .CommandSucceededEvent ;
12
11
import io .lettuce .core .protocol .RedisCommand ;
13
- import io .lettuce .test .Delay ;
14
- import io .lettuce .test .Delay ;
15
12
import org .awaitility .Awaitility ;
16
13
import org .junit .jupiter .api .BeforeEach ;
17
14
import org .junit .jupiter .api .Tag ;
27
24
import io .lettuce .test .WithPassword ;
28
25
import io .lettuce .test .condition .EnabledOnCommand ;
29
26
import io .lettuce .test .settings .TestSettings ;
30
- import reactor .core .publisher .Flux ;
31
27
import reactor .core .publisher .Mono ;
28
+ import redis .clients .authentication .core .SimpleToken ;
32
29
33
30
import java .time .Duration ;
31
+ import java .time .Instant ;
34
32
import java .util .ArrayList ;
33
+ import java .util .Collections ;
35
34
import java .util .List ;
36
35
37
36
/**
@@ -126,6 +125,43 @@ void streamingCredentialProvider(RedisClient client) {
126
125
client .getOptions ().mutate ().reauthenticateBehavior (ClientOptions .ReauthenticateBehavior .DEFAULT ).build ());
127
126
}
128
127
128
+ @ Test
129
+ @ Inject
130
+ void tokenBasedCredentialProvider (RedisClient client ) {
131
+
132
+ TestCommandListener listener = new TestCommandListener ();
133
+ client .addListener (listener );
134
+ client .setOptions (client .getOptions ().mutate ()
135
+ .reauthenticateBehavior (ClientOptions .ReauthenticateBehavior .ON_NEW_CREDENTIALS ).build ());
136
+
137
+ TestTokenManager tokenManager = new TestTokenManager (null , null );
138
+ TokenBasedRedisCredentialsProvider credentialsProvider = new TokenBasedRedisCredentialsProvider (tokenManager );
139
+
140
+ // Build RedisURI with streaming credentials provider
141
+ RedisURI uri = RedisURI .builder ().withHost (TestSettings .host ()).withPort (TestSettings .port ())
142
+ .withClientName ("streaming_cred_test" ).withAuthentication (credentialsProvider )
143
+ .withTimeout (Duration .ofSeconds (5 )).build ();
144
+ tokenManager .emitToken (testToken (TestSettings .username (), TestSettings .password ().toString ().toCharArray ()));
145
+
146
+ StatefulRedisConnection <String , String > connection = client .connect (StringCodec .UTF8 , uri );
147
+ assertThat (connection .sync ().aclWhoami ()).isEqualTo (TestSettings .username ());
148
+
149
+ // rotate the credentials
150
+ tokenManager .emitToken (testToken ("steave" , "foobared" .toCharArray ()));
151
+
152
+ Awaitility .await ().atMost (Duration .ofSeconds (1 )).until (() -> listener .succeeded .stream ()
153
+ .anyMatch (command -> isAuthCommandWithCredentials (command , "steave" , "foobared" .toCharArray ())));
154
+
155
+ // verify that the connection is re-authenticated with the new user credentials
156
+ assertThat (connection .sync ().aclWhoami ()).isEqualTo ("steave" );
157
+
158
+ credentialsProvider .shutdown ();
159
+ connection .close ();
160
+ client .removeListener (listener );
161
+ client .setOptions (
162
+ client .getOptions ().mutate ().reauthenticateBehavior (ClientOptions .ReauthenticateBehavior .DEFAULT ).build ());
163
+ }
164
+
129
165
static class TestCommandListener implements CommandListener {
130
166
131
167
final List <RedisCommand <?, ?, ?>> succeeded = new ArrayList <>();
@@ -147,52 +183,9 @@ private boolean isAuthCommandWithCredentials(RedisCommand<?, ?, ?> command, Stri
147
183
return false ;
148
184
}
149
185
150
- }
151
-
152
- @ Test
153
- @ Inject
154
- void tokenBasedCredentialProvider (RedisClient client ) {
155
-
156
- ClientOptions clientOptions = ClientOptions .builder ()
157
- .disconnectedBehavior (ClientOptions .DisconnectedBehavior .REJECT_COMMANDS ).build ();
158
- client .setOptions (clientOptions );
159
- // Connection used to simulate test user credential rotation
160
- StatefulRedisConnection <String , String > defaultConnection = client .connect ();
161
-
162
- String testUser = "streaming_cred_test_user" ;
163
- char [] testPassword1 = "token_1" .toCharArray ();
164
- char [] testPassword2 = "token_2" .toCharArray ();
165
-
166
- TestTokenManager tokenManager = new TestTokenManager (null , null );
167
-
168
- // streaming credentials provider that emits redis credentials which will trigger connection re-authentication
169
- // token manager is used to emit updated credentials
170
- TokenBasedRedisCredentialsProvider credentialsProvider = new TokenBasedRedisCredentialsProvider (tokenManager );
171
-
172
- RedisURI uri = RedisURI .builder ().withTimeout (Duration .ofSeconds (1 )).withClientName ("streaming_cred_test" )
173
- .withHost (TestSettings .host ()).withPort (TestSettings .port ()).withAuthentication (credentialsProvider ).build ();
174
-
175
- // create test user with initial credentials set to 'testPassword1'
176
- createTestUser (defaultConnection , testUser , testPassword1 );
177
- tokenManager .emitToken (testToken (testUser , testPassword1 ));
178
-
179
- StatefulRedisConnection <String , String > connection = client .connect (StringCodec .UTF8 , uri );
180
- assertThat (connection .sync ().aclWhoami ()).isEqualTo (testUser );
181
-
182
- // update test user credentials in Redis server (password changed to testPassword2)
183
- // then emit updated credentials trough streaming credentials provider
184
- // and trigger re-connect to force re-authentication
185
- // updated credentials should be used for re-authentication
186
- updateTestUser (defaultConnection , testUser , testPassword2 );
187
- tokenManager .emitToken (testToken (testUser , testPassword2 ));
188
- connection .sync ().quit ();
189
-
190
- Delay .delay (Duration .ofMillis (100 ));
191
- assertThat (connection .sync ().ping ()).isEqualTo ("PONG" );
192
-
193
- String res = connection .sync ().aclWhoami ();
194
- assertThat (res ).isEqualTo (testUser );
186
+ private SimpleToken testToken (String username , char [] password ) {
187
+ return new SimpleToken (String .valueOf (password ), Instant .now ().plusMillis (500 ).toEpochMilli (),
188
+ Instant .now ().toEpochMilli (), Collections .singletonMap ("oid" , username ));
189
+ }
195
190
196
- defaultConnection .close ();
197
- connection .close ();
198
191
}
0 commit comments