Skip to content

Commit a72270e

Browse files
committed
Support shared OAuth token
1 parent 8691c7a commit a72270e

File tree

13 files changed

+269
-246
lines changed

13 files changed

+269
-246
lines changed

pom.xml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,16 @@
4545
<netty4.version>4.1.115.Final</netty4.version>
4646
<netty4.iouring.version>0.0.25.Final</netty4.iouring.version>
4747
<micrometer.version>1.14.1</micrometer.version>
48+
<gson.version>2.11.0</gson.version>
4849
<logback.version>1.2.13</logback.version>
4950
<junit.jupiter.version>5.11.3</junit.jupiter.version>
5051
<assertj.version>3.26.3</assertj.version>
5152
<mockito.version>5.14.2</mockito.version>
5253
<jqwik.version>1.9.2</jqwik.version>
53-
<amqp-client.version>5.22.0</amqp-client.version>
5454
<micrometer-tracing-test.version>1.4.0</micrometer-tracing-test.version>
5555
<micrometer-docs-generator.version>1.0.4</micrometer-docs-generator.version>
5656
<jose4j.version>0.9.6</jose4j.version>
57+
<commons-lang3.version>3.17.0</commons-lang3.version>
5758
<maven.compiler.plugin.version>3.13.0</maven.compiler.plugin.version>
5859
<maven.dependency.plugin.version>3.8.1</maven.dependency.plugin.version>
5960
<maven-surefire-plugin.version>3.5.2</maven-surefire-plugin.version>
@@ -76,7 +77,6 @@
7677
<gpg.skip>true</gpg.skip>
7778
<gpg.keyname>6026DFCA</gpg.keyname>
7879
<spotbugs.skip>false</spotbugs.skip>
79-
<gson.version>2.11.0</gson.version>
8080
</properties>
8181

8282
<dependencies>
@@ -251,6 +251,11 @@
251251
<scope>test</scope>
252252
</dependency>
253253

254+
<dependency>
255+
<groupId>org.apache.commons</groupId>
256+
<artifactId>commons-lang3</artifactId>
257+
<version>${commons-lang3.version}</version>
258+
</dependency>
254259

255260
</dependencies>
256261

src/main/java/com/rabbitmq/client/amqp/OAuthSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,7 @@ public interface OAuthSettings<T> {
2929

3030
OAuthSettings<T> parameter(String name, String value);
3131

32+
OAuthSettings<T> shared(boolean shared);
33+
3234
T connection();
3335
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
9898
this.topologyListener = createTopologyListener(builder);
9999

100100
if (recoveryConfiguration.activated()) {
101-
disconnectHandler = recoveryDisconnectHandler(recoveryConfiguration, builder.name());
101+
disconnectHandler = recoveryDisconnectHandler(recoveryConfiguration, this.name());
102102
} else {
103103
disconnectHandler =
104104
(c, e) -> {
@@ -121,21 +121,30 @@ final class AmqpConnection extends ResourceBase implements Connection {
121121
Credentials credentials = builder.credentials();
122122
this.credentialsRegistration =
123123
credentials.register(
124+
this.name(),
124125
(username, password) -> {
125-
LOGGER.debug("Setting new token for connection {}", this.name);
126-
long start = nanoTime();
127-
((AmqpManagement) management()).setToken(password);
128-
LOGGER.debug(
129-
"Set new token for connection {} in {} ms",
130-
this.name,
131-
ofNanos(nanoTime() - start).toMillis());
126+
State state = this.state();
127+
if (state == OPEN) {
128+
LOGGER.debug("Setting new token for connection {}", this.name);
129+
long start = nanoTime();
130+
((AmqpManagement) management()).setToken(password);
131+
LOGGER.debug(
132+
"Set new token for connection {} in {} ms",
133+
this.name,
134+
ofNanos(nanoTime() - start).toMillis());
135+
} else {
136+
LOGGER.debug(
137+
"Could not set new token for connection {} because its state is {}",
138+
this.name(),
139+
state);
140+
}
132141
});
133142
LOGGER.debug("Opening native connection for connection '{}'...", this.name());
134143
NativeConnectionWrapper ncw =
135144
ConnectionUtils.enforceAffinity(
136145
addrs -> {
137146
NativeConnectionWrapper wrapper =
138-
connect(this.connectionSettings, builder.name(), disconnectHandler, addrs);
147+
connect(this.connectionSettings, this.name(), disconnectHandler, addrs);
139148
this.nativeConnection = wrapper.connection();
140149
return wrapper;
141150
},
@@ -419,7 +428,10 @@ private void recoverAfterConnectionFailure(
419428
() -> {
420429
if (!this.recoveringConnection.get()) {
421430
recoverAfterConnectionFailure(
422-
recoveryConfiguration, name, ex, disconnectedHandlerReference);
431+
recoveryConfiguration,
432+
this.name(),
433+
ex,
434+
disconnectedHandlerReference);
423435
}
424436
});
425437
}
@@ -822,7 +834,7 @@ public int hashCode() {
822834
return Objects.hashCode(id);
823835
}
824836

825-
private static class TokenConnectionCallback implements Credentials.ConnectionCallback {
837+
private static class TokenConnectionCallback implements Credentials.AuthenticationCallback {
826838

827839
private final ConnectionOptions options;
828840

@@ -831,15 +843,8 @@ private TokenConnectionCallback(ConnectionOptions options) {
831843
}
832844

833845
@Override
834-
public Credentials.ConnectionCallback username(String username) {
835-
options.user(username);
836-
return this;
837-
}
838-
839-
@Override
840-
public Credentials.ConnectionCallback password(String password) {
841-
options.password(password);
842-
return this;
846+
public void authenticate(String username, String password) {
847+
options.user(username).password(password);
843848
}
844849
}
845850
}

src/main/java/com/rabbitmq/client/amqp/impl/Credentials.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,39 +21,32 @@ interface Credentials {
2121

2222
Credentials NO_OP = new NoOpCredentials();
2323

24-
Registration register(RefreshCallback refreshCallback);
24+
Registration register(String name, AuthenticationCallback refreshCallback);
2525

2626
interface Registration {
2727

28-
void connect(ConnectionCallback callback);
28+
void connect(AuthenticationCallback callback);
2929

3030
void unregister();
3131
}
3232

33-
interface ConnectionCallback {
33+
interface AuthenticationCallback {
3434

35-
ConnectionCallback username(String username);
36-
37-
ConnectionCallback password(String password);
38-
}
39-
40-
interface RefreshCallback {
41-
42-
void refresh(String username, String password);
35+
void authenticate(String username, String password);
4336
}
4437

4538
class NoOpCredentials implements Credentials {
4639

4740
@Override
48-
public Registration register(RefreshCallback refreshCallback) {
41+
public Registration register(String name, AuthenticationCallback refreshCallback) {
4942
return new NoOpRegistration();
5043
}
5144
}
5245

5346
class NoOpRegistration implements Registration {
5447

5548
@Override
56-
public void connect(ConnectionCallback callback) {}
49+
public void connect(AuthenticationCallback callback) {}
5750

5851
@Override
5952
public void unregister() {}

src/main/java/com/rabbitmq/client/amqp/impl/CredentialsFactory.java

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
final class CredentialsFactory {
2828

29-
private volatile Credentials oauthCredentials;
29+
private volatile Credentials globalOAuthCredentials;
3030
private final Lock oauthCredentialsLock = new ReentrantLock();
3131
private final AmqpEnvironment environment;
3232

@@ -38,8 +38,11 @@ Credentials credentials(DefaultConnectionSettings<?> settings) {
3838
CredentialsProvider provider = settings.credentialsProvider();
3939
Credentials credentials;
4040
if (settings.oauth().enabled()) {
41-
// TODO consider OAuth credentials are not shared
42-
credentials = oauthCredentials(settings);
41+
if (settings.oauth().shared()) {
42+
credentials = globalOAuthCredentials(settings);
43+
} else {
44+
credentials = createOAuthCredentials(settings);
45+
}
4346
} else {
4447
if (provider instanceof UsernamePasswordCredentialsProvider) {
4548
UsernamePasswordCredentialsProvider credentialsProvider =
@@ -52,36 +55,39 @@ Credentials credentials(DefaultConnectionSettings<?> settings) {
5255
return credentials;
5356
}
5457

55-
private Credentials oauthCredentials(DefaultConnectionSettings<?> connectionSettings) {
56-
Credentials result = this.oauthCredentials;
58+
private Credentials globalOAuthCredentials(DefaultConnectionSettings<?> connectionSettings) {
59+
Credentials result = this.globalOAuthCredentials;
5760
if (result != null) {
5861
return result;
5962
}
6063

6164
this.oauthCredentialsLock.lock();
6265
try {
63-
if (this.oauthCredentials == null) {
64-
DefaultConnectionSettings.DefaultOAuthSettings<?> settings = connectionSettings.oauth();
65-
// TODO set TLS configuration on TLS requester
66-
// TODO use pre-configured token requester if any
67-
HttpTokenRequester tokenRequester =
68-
new HttpTokenRequester(
69-
settings.tokenEndpointUri(),
70-
settings.clientId(),
71-
settings.clientSecret(),
72-
settings.grantType(),
73-
settings.parameters(),
74-
null,
75-
null,
76-
null,
77-
null,
78-
new GsonTokenParser());
79-
this.oauthCredentials =
80-
new TokenCredentials(tokenRequester, environment.scheduledExecutorService());
66+
if (this.globalOAuthCredentials == null) {
67+
this.globalOAuthCredentials = createOAuthCredentials(connectionSettings);
8168
}
82-
return this.oauthCredentials;
69+
return this.globalOAuthCredentials;
8370
} finally {
8471
this.oauthCredentialsLock.unlock();
8572
}
8673
}
74+
75+
private Credentials createOAuthCredentials(DefaultConnectionSettings<?> connectionSettings) {
76+
DefaultConnectionSettings.DefaultOAuthSettings<?> settings = connectionSettings.oauth();
77+
// TODO set TLS configuration on TLS requester
78+
// TODO use pre-configured token requester if any
79+
HttpTokenRequester tokenRequester =
80+
new HttpTokenRequester(
81+
settings.tokenEndpointUri(),
82+
settings.clientId(),
83+
settings.clientSecret(),
84+
settings.grantType(),
85+
settings.parameters(),
86+
null,
87+
null,
88+
null,
89+
null,
90+
new GsonTokenParser());
91+
return new TokenCredentials(tokenRequester, environment.scheduledExecutorService());
92+
}
8793
}

src/main/java/com/rabbitmq/client/amqp/impl/DefaultConnectionSettings.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,13 +507,15 @@ static class DefaultOAuthSettings<T> implements OAuthSettings<T> {
507507
private String clientId;
508508
private String clientSecret;
509509
private String grantType = "client_credentials";
510+
private boolean shared = true;
510511

511512
DefaultOAuthSettings(DefaultConnectionSettings<T> connectionSettings) {
512513
this.connectionSettings = connectionSettings;
513514
}
514515

515516
@Override
516517
public OAuthSettings<T> tokenEndpointUri(String uri) {
518+
this.connectionSettings.saslMechanism(SASL_MECHANISM_PLAIN);
517519
this.tokenEndpointUri = uri;
518520
return this;
519521
}
@@ -546,6 +548,12 @@ public OAuthSettings<T> parameter(String name, String value) {
546548
return this;
547549
}
548550

551+
@Override
552+
public OAuthSettings<T> shared(boolean shared) {
553+
this.shared = shared;
554+
return this;
555+
}
556+
549557
@Override
550558
public T connection() {
551559
return this.connectionSettings.toReturn();
@@ -556,6 +564,7 @@ void copyTo(DefaultOAuthSettings<?> copy) {
556564
copy.clientId(this.clientId);
557565
copy.clientSecret(this.clientSecret);
558566
copy.grantType(this.grantType);
567+
copy.shared(this.shared);
559568
this.parameters.forEach(copy::parameter);
560569
}
561570

@@ -579,6 +588,10 @@ Map<String, String> parameters() {
579588
return Map.copyOf(this.parameters);
580589
}
581590

591+
boolean shared() {
592+
return this.shared;
593+
}
594+
582595
boolean enabled() {
583596
return this.tokenEndpointUri != null;
584597
}

src/main/java/com/rabbitmq/client/amqp/impl/OAuthCredentialsProvider.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

0 commit comments

Comments
 (0)