From bde18b8817f469e0223941ceb8b29a928b82703d Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Fri, 2 May 2025 23:30:21 +0800 Subject: [PATCH 1/3] test --- .../java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 5cb5cc292ea2a..117ec94600f52 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2531,8 +2531,7 @@ void handleResponse(AbstractResponse abstractResponse) { DescribeClusterResponse response = (DescribeClusterResponse) abstractResponse; Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { - ApiError apiError = new ApiError(error, response.data().errorMessage()); - handleFailure(apiError.exception()); + handleFailure(error.exception(response.data().errorMessage())); return; } From 42cab5181b4bc10d81db929fa5c84f87c85c51a8 Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Tue, 27 May 2025 18:19:35 +0200 Subject: [PATCH 2/3] Sync KafkaAdminClient.java from upstream/trunk --- .../kafka/clients/admin/KafkaAdminClient.java | 768 +++++++++--------- 1 file changed, 397 insertions(+), 371 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index ec2c4603ed4fa..b283d65cbee06 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -159,7 +159,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; -import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData; +import org.apache.kafka.common.message.ListConfigResourcesRequestData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; @@ -233,8 +233,8 @@ import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.JoinGroupRequest; -import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest; -import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse; +import org.apache.kafka.common.requests.ListConfigResourcesRequest; +import org.apache.kafka.common.requests.ListConfigResourcesResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListOffsetsRequest; @@ -419,11 +419,11 @@ public class KafkaAdminClient extends AdminClient { /** * Get or create a list value from a map. * - * @param map The map to get or create the element from. - * @param key The key. - * @param The key type. - * @param The value type. - * @return The list value. + * @param map The map to get or create the element from. + * @param key The key. + * @param The key type. + * @param The value type. + * @return The list value. */ static List getOrCreateListValue(Map> map, K key) { return map.computeIfAbsent(key, k -> new LinkedList<>()); @@ -432,9 +432,9 @@ static List getOrCreateListValue(Map> map, K key) { /** * Send an exception to every element in a collection of KafkaFutureImpls. * - * @param futures The collection of KafkaFutureImpl objects. - * @param exc The exception - * @param The KafkaFutureImpl result type. + * @param futures The collection of KafkaFutureImpl objects. + * @param exc The exception + * @param The KafkaFutureImpl result type. */ private static void completeAllExceptionally(Collection> futures, Throwable exc) { completeAllExceptionally(futures.stream(), exc); @@ -443,9 +443,9 @@ private static void completeAllExceptionally(Collection> /** * Send an exception to all futures in the provided stream * - * @param futures The stream of KafkaFutureImpl objects. - * @param exc The exception - * @param The KafkaFutureImpl result type. + * @param futures The stream of KafkaFutureImpl objects. + * @param exc The exception + * @param The KafkaFutureImpl result type. */ private static void completeAllExceptionally(Stream> futures, Throwable exc) { futures.forEach(future -> future.completeExceptionally(exc)); @@ -454,9 +454,9 @@ private static void completeAllExceptionally(Stream> futu /** * Get the current time remaining before a deadline as an integer. * - * @param now The current time in milliseconds. - * @param deadlineMs The deadline time in milliseconds. - * @return The time delta in milliseconds. + * @param now The current time in milliseconds. + * @param deadlineMs The deadline time in milliseconds. + * @return The time delta in milliseconds. */ static int calcTimeoutMsRemainingAsInt(long now, long deadlineMs) { long deltaMs = deadlineMs - now; @@ -470,9 +470,8 @@ else if (deltaMs < Integer.MIN_VALUE) /** * Generate the client id based on the configuration. * - * @param config The configuration - * - * @return The client id + * @param config The configuration + * @return The client id */ static String generateClientId(AdminClientConfig config) { String clientId = config.getString(AdminClientConfig.CLIENT_ID_CONFIG); @@ -488,10 +487,9 @@ String getClientId() { /** * Get the deadline for a particular call. * - * @param now The current time in milliseconds. - * @param optionTimeoutMs The timeout option given by the user. - * - * @return The deadline in milliseconds. + * @param now The current time in milliseconds. + * @param optionTimeoutMs The timeout option given by the user. + * @return The deadline in milliseconds. */ private long calcDeadlineMs(long now, Integer optionTimeoutMs) { if (optionTimeoutMs != null) @@ -502,9 +500,8 @@ private long calcDeadlineMs(long now, Integer optionTimeoutMs) { /** * Pretty-print an exception. * - * @param throwable The exception. - * - * @return A compact human-readable string. + * @param throwable The exception. + * @return A compact human-readable string. */ static String prettyPrintException(Throwable throwable) { if (throwable == null) @@ -550,7 +547,7 @@ static KafkaAdminClient createInternal( .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, - config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); + config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); metrics = new Metrics(metricConfig, reporters, time, metricsContext); networkClient = ClientUtils.createNetworkClient(config, clientId, @@ -656,11 +653,11 @@ private int configureDefaultApiTimeoutMs(AdminClientConfig config) { if (defaultApiTimeoutMs < requestTimeoutMs) { if (config.originals().containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) { throw new ConfigException("The specified value of " + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG + - " must be no smaller than the value of " + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + "."); + " must be no smaller than the value of " + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + "."); } else { log.warn("Overriding the default value for {} ({}) with the explicitly configured request timeout {}", - AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, this.defaultApiTimeoutMs, - requestTimeoutMs); + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, this.defaultApiTimeoutMs, + requestTimeoutMs); return requestTimeoutMs; } } @@ -718,6 +715,7 @@ public void close(Duration timeout) { */ private interface NodeProvider { Node provide(); + boolean supportsUseControllers(); } @@ -727,7 +725,7 @@ public Node provide() { long now = time.milliseconds(); LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now); if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP - && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) { + && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) { metadataManager.rebootstrap(now); } @@ -757,7 +755,7 @@ private class ConstantNodeIdProvider implements NodeProvider { @Override public Node provide() { if (metadataManager.isReady() && - (metadataManager.nodeById(nodeId) != null)) { + (metadataManager.nodeById(nodeId) != null)) { return metadataManager.nodeById(nodeId); } // If we can't find the node with the given constant ID, we schedule a @@ -791,7 +789,7 @@ private class ControllerNodeProvider implements NodeProvider { @Override public Node provide() { if (metadataManager.isReady() && - (metadataManager.controller() != null)) { + (metadataManager.controller() != null)) { return metadataManager.controller(); } metadataManager.requestUpdate(); @@ -825,36 +823,6 @@ public boolean supportsUseControllers() { } } - /** - * Provides the least loaded broker, or the active kcontroller if we're using - * bootstrap.controllers. - */ - private class ConstantBrokerOrActiveKController implements NodeProvider { - private final int nodeId; - - ConstantBrokerOrActiveKController(int nodeId) { - this.nodeId = nodeId; - } - - @Override - public Node provide() { - if (metadataManager.isReady()) { - if (metadataManager.usingBootstrapControllers()) { - return metadataManager.controller(); - } else if (metadataManager.nodeById(nodeId) != null) { - return metadataManager.nodeById(nodeId); - } - } - metadataManager.requestUpdate(); - return null; - } - - @Override - public boolean supportsUseControllers() { - return true; - } - } - /** * Provides the least loaded broker, or the active kcontroller if we're using * bootstrap.controllers. @@ -923,13 +891,13 @@ protected Node curNode() { /** * Handle a failure. - * + *

* Depending on what the exception is and how many times we have already tried, we may choose to * fail the Call, or retry it. It is important to print the stack traces here in some cases, * since they are not necessarily preserved in ApiVersionException objects. * - * @param now The current time in milliseconds. - * @param throwable The failure exception. + * @param now The current time in milliseconds. + * @param throwable The failure exception. */ final void fail(long now, Throwable throwable) { if (curNode != null) { @@ -945,7 +913,7 @@ final void fail(long now, Throwable throwable) { // protocol downgrade will not count against the total number of retries we get for // this RPC. That is why 'tries' is not incremented. if ((throwable instanceof UnsupportedVersionException) && - handleUnsupportedVersionException((UnsupportedVersionException) throwable)) { + handleUnsupportedVersionException((UnsupportedVersionException) throwable)) { log.debug("{} attempting protocol downgrade and then retry.", this); runnable.pendingCalls.add(this); return; @@ -999,16 +967,14 @@ private void handleTimeoutFailure(long now, Throwable cause) { * Create an AbstractRequest.Builder for this Call. * * @param timeoutMs The timeout in milliseconds. - * - * @return The AbstractRequest builder. + * @return The AbstractRequest builder. */ abstract AbstractRequest.Builder createRequest(int timeoutMs); /** * Process the call response. * - * @param abstractResponse The AbstractResponse. - * + * @param abstractResponse The AbstractResponse. */ abstract void handleResponse(AbstractResponse abstractResponse); @@ -1016,16 +982,15 @@ private void handleTimeoutFailure(long now, Throwable cause) { * Handle a failure. This will only be called if the failure exception was not * retriable, or if we hit a timeout. * - * @param throwable The exception. + * @param throwable The exception. */ abstract void handleFailure(Throwable throwable); /** * Handle an UnsupportedVersionException. * - * @param exception The exception. - * - * @return True if the exception can be handled; false otherwise. + * @param exception The exception. + * @return True if the exception can be handled; false otherwise. */ boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { return false; @@ -1062,7 +1027,7 @@ static class TimeoutProcessor { /** * Create a new timeout processor. * - * @param now The current time in milliseconds since the epoch. + * @param now The current time in milliseconds since the epoch. */ TimeoutProcessor(long now) { this.now = now; @@ -1074,9 +1039,8 @@ static class TimeoutProcessor { * Timed out calls will be removed and failed. * The remaining milliseconds until the next timeout will be updated. * - * @param calls The collection of calls. - * - * @return The number of calls which were timed out. + * @param calls The collection of calls. + * @return The number of calls which were timed out. */ int handleTimeouts(Collection calls, String msg) { int numTimedOut = 0; @@ -1098,9 +1062,8 @@ int handleTimeouts(Collection calls, String msg) { * Check whether a call should be timed out. * The remaining milliseconds until the next timeout will be updated. * - * @param call The call. - * - * @return True if the call should be timed out. + * @param call The call. + * @return True if the call should be timed out. */ boolean callHasExpired(Call call) { int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs); @@ -1160,7 +1123,7 @@ private final class AdminClientRunnable implements Runnable { /** * Time out the elements in the pendingCalls list which are expired. * - * @param processor The timeout processor. + * @param processor The timeout processor. */ private void timeoutPendingCalls(TimeoutProcessor processor) { int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed out waiting for a node assignment."); @@ -1171,7 +1134,7 @@ private void timeoutPendingCalls(TimeoutProcessor processor) { /** * Time out calls which have been assigned to nodes. * - * @param processor The timeout processor. + * @param processor The timeout processor. */ private int timeoutCallsToSend(TimeoutProcessor processor) { int numTimedOut = 0; @@ -1186,7 +1149,7 @@ private int timeoutCallsToSend(TimeoutProcessor processor) { /** * Drain all the calls from newCalls into pendingCalls. - * + *

* This function holds the lock for the minimum amount of time, to avoid blocking * users of AdminClient who will also take the lock to add new calls. */ @@ -1198,7 +1161,7 @@ private synchronized void drainNewCalls() { * Add some calls to pendingCalls, and then clear the input list. * Also clears Call#curNode. * - * @param calls The calls to add. + * @param calls The calls to add. */ private void transitionToPendingAndClearList(List calls) { for (Call call : calls) { @@ -1211,9 +1174,9 @@ private void transitionToPendingAndClearList(List calls) { /** * Choose nodes for the calls in the pendingCalls list. * - * @param now The current time in milliseconds. - * @return The minimum time until a call is ready to be retried if any of the pending - * calls are backing off after a failure + * @param now The current time in milliseconds. + * @return The minimum time until a call is ready to be retried if any of the pending + * calls are backing off after a failure */ private long maybeDrainPendingCalls(long now) { long pollTimeout = Long.MAX_VALUE; @@ -1271,8 +1234,8 @@ private boolean maybeDrainPendingCall(Call call, long now) { /** * Send the calls which are ready. * - * @param now The current time in milliseconds. - * @return The minimum timeout we need for poll(). + * @param now The current time in milliseconds. + * @return The minimum timeout we need for poll(). */ private long sendEligibleCalls(long now) { long pollTimeout = Long.MAX_VALUE; @@ -1294,7 +1257,7 @@ private long sendEligibleCalls(long now) { if (deadline != null) { if (now >= deadline) { log.info("Disconnecting from {} and revoking {} node assignment(s) " + - "because the node is taking too long to become ready.", + "because the node is taking too long to become ready.", node.idString(), calls.size()); transitionToPendingAndClearList(calls); client.disconnect(node.idString()); @@ -1347,12 +1310,12 @@ private long sendEligibleCalls(long now) { /** * Time out expired calls that are in flight. - * + *

* Calls that are in flight may have been partially or completely sent over the wire. They may * even be in the process of being processed by the remote server. At the moment, our only option * to time them out is to close the entire connection. * - * @param processor The timeout processor. + * @param processor The timeout processor. */ private void timeoutCallsInFlight(TimeoutProcessor processor) { int numTimedOut = 0; @@ -1375,8 +1338,8 @@ private void timeoutCallsInFlight(TimeoutProcessor processor) { /** * Handle responses from the server. * - * @param now The current time in milliseconds. - * @param responses The latest responses from KafkaClient. + * @param now The current time in milliseconds. + * @param responses The latest responses from KafkaClient. */ private void handleResponses(long now, List responses) { for (ClientResponse response : responses) { @@ -1387,7 +1350,7 @@ private void handleResponses(long now, List responses) { // If the server returns information about a correlation ID we didn't use yet, // an internal server error has occurred. Close the connection and log an error message. log.error("Internal server error on {}: server returned information about unknown " + - "correlation ID {}, requestHeader = {}", response.destination(), correlationId, + "correlation ID {}, requestHeader = {}", response.destination(), correlationId, response.requestHeader()); client.disconnect(response.destination()); continue; @@ -1506,7 +1469,7 @@ public void run() { numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited."); numTimedOut += timeoutCallsToSend(timeoutProcessor); numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(), - "The AdminClient thread has exited."); + "The AdminClient thread has exited."); if (numTimedOut > 0) { log.info("Timed out {} remaining operation(s) during close.", numTimedOut); } @@ -1576,13 +1539,13 @@ private void processRequests() { /** * Queue a call for sending. - * + *

* If the AdminClient thread has exited, this will fail. Otherwise, it will succeed (even * if the AdminClient is shutting down). This function should called when retrying an * existing call. * - * @param call The new call object. - * @param now The current time in milliseconds. + * @param call The new call object. + * @param now The current time in milliseconds. */ void enqueue(Call call, long now) { if (call.tries > maxRetries) { @@ -1613,18 +1576,18 @@ void enqueue(Call call, long now) { /** * Initiate a new call. - * + *

* This will fail if the AdminClient is scheduled to shut down. * - * @param call The new call object. - * @param now The current time in milliseconds. + * @param call The new call object. + * @param now The current time in milliseconds. */ void call(Call call, long now) { if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) { log.debug("Cannot accept new call {} when AdminClient is closing.", call); call.handleFailure(new IllegalStateException("Cannot accept new calls when AdminClient is closing.")); } else if (metadataManager.usingBootstrapControllers() && - (!call.nodeProvider.supportsUseControllers())) { + (!call.nodeProvider.supportsUseControllers())) { call.fail(now, new UnsupportedEndpointTypeException("This Admin API is not " + "yet supported when communicating directly with the controller quorum.")); } else { @@ -1646,7 +1609,7 @@ private Call makeMetadataCall(long now) { private Call makeControllerMetadataCall(long now) { // Use DescribeCluster here, as specified by KIP-919. return new Call(true, "describeCluster", calcDeadlineMs(now, requestTimeoutMs), - new MetadataUpdateNodeIdProvider()) { + new MetadataUpdateNodeIdProvider()) { @Override public DescribeClusterRequest.Builder createRequest(int timeoutMs) { return new DescribeClusterRequest.Builder(new DescribeClusterRequestData() @@ -1689,7 +1652,7 @@ private Call makeBrokerMetadataCall(long now) { // We use MetadataRequest here so that we can continue to support brokers that are too // old to handle DescribeCluster. return new Call(true, "fetchMetadata", calcDeadlineMs(now, requestTimeoutMs), - new MetadataUpdateNodeIdProvider()) { + new MetadataUpdateNodeIdProvider()) { @Override public MetadataRequest.Builder createRequest(int timeoutMs) { // Since this only requests node information, it's safe to pass true @@ -1778,10 +1741,10 @@ int numPendingCalls() { * Used when a response handler expected a result for some entity but no result was present. */ private static void completeUnrealizedFutures( - Stream>> futures, - Function messageFormatter) { + Stream>> futures, + Function messageFormatter) { futures.filter(entry -> !entry.getValue().isDone()).forEach(entry -> - entry.getValue().completeExceptionally(new ApiException(messageFormatter.apply(entry.getKey())))); + entry.getValue().completeExceptionally(new ApiException(messageFormatter.apply(entry.getKey())))); } /** @@ -1789,11 +1752,11 @@ private static void completeUnrealizedFutures( * the initial error back to the caller if the request timed out. */ private static void maybeCompleteQuotaExceededException( - boolean shouldRetryOnQuotaViolation, - Throwable throwable, - Map> futures, - Map quotaExceededExceptions, - int throttleTimeDelta) { + boolean shouldRetryOnQuotaViolation, + Throwable throwable, + Map> futures, + Map quotaExceededExceptions, + int throttleTimeDelta) { if (shouldRetryOnQuotaViolation && throwable instanceof TimeoutException) { quotaExceededExceptions.forEach((key, value) -> futures.get(key).completeExceptionally( new ThrottlingQuotaExceededException( @@ -2072,10 +2035,10 @@ private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options, @Override DeleteTopicsRequest.Builder createRequest(int timeoutMs) { return new DeleteTopicsRequest.Builder( - new DeleteTopicsRequestData() - .setTopics(topicIds.stream().map( - topic -> new DeleteTopicState().setTopicId(topic)).collect(Collectors.toList())) - .setTimeoutMs(timeoutMs)); + new DeleteTopicsRequestData() + .setTopics(topicIds.stream().map( + topic -> new DeleteTopicState().setTopicId(topic)).collect(Collectors.toList())) + .setTimeoutMs(timeoutMs)); } @Override @@ -2095,7 +2058,7 @@ void handleResponse(AbstractResponse abstractResponse) { if (error.isFailure()) { if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) { ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException( - response.throttleTimeMs(), error.messageWithFallback()); + response.throttleTimeMs(), error.messageWithFallback()); if (options.shouldRetryOnQuotaViolation()) { retryTopics.add(result.topicId()); retryTopicQuotaExceededExceptions.put(result.topicId(), quotaExceededException); @@ -2118,7 +2081,7 @@ void handleResponse(AbstractResponse abstractResponse) { } else { final long now = time.milliseconds(); final Call call = getDeleteTopicsWithIdsCall(options, futures, retryTopics, - retryTopicQuotaExceededExceptions, now, deadline); + retryTopicQuotaExceededExceptions, now, deadline); runnable.call(call, now); } } @@ -2128,7 +2091,7 @@ void handleFailure(Throwable throwable) { // If there were any topics retries due to a quota exceeded exception, we propagate // the initial error back to the caller if the request timed out. maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(), - throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); + throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now)); // Fail all the other remaining futures completeAllExceptionally(futures.values(), throwable); } @@ -2315,7 +2278,7 @@ void handleResponse(AbstractResponse abstractResponse) { } if (partiallyFinishedTopicDescription != null && - (responseCursor == null || !responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) { + (responseCursor == null || !responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) { // We can't simply check nextTopicDescription != null here to close the partiallyFinishedTopicDescription. // Because the responseCursor topic may not show in the response. String topicName = partiallyFinishedTopicDescription.name(); @@ -2398,7 +2361,7 @@ private Map> handleDescribeTopicsByIds(Colle if (topicIdIsUnrepresentable(topicId)) { KafkaFutureImpl future = new KafkaFutureImpl<>(); future.completeExceptionally(new InvalidTopicException("The given topic id '" + - topicId + "' cannot be represented in a request.")); + topicId + "' cannot be represented in a request.")); topicFutures.put(topicId, future); } else if (!topicFutures.containsKey(topicId)) { topicFutures.put(topicId, new KafkaFutureImpl<>()); @@ -2407,14 +2370,14 @@ private Map> handleDescribeTopicsByIds(Colle } final long now = time.milliseconds(); Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedNodeProvider()) { @Override MetadataRequest.Builder createRequest(int timeoutMs) { return new MetadataRequest.Builder(new MetadataRequestData() - .setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList)) - .setAllowAutoTopicCreation(false) - .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); + .setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList)) + .setAllowAutoTopicCreation(false) + .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); } @Override @@ -2476,8 +2439,8 @@ private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, String List partitions = new ArrayList<>(partitionInfos.size()); for (PartitionInfo partitionInfo : partitionInfos) { TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( - partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), - Arrays.asList(partitionInfo.inSyncReplicas())); + partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), + Arrays.asList(partitionInfo.inSyncReplicas())); partitions.add(topicPartitionInfo); } partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); @@ -2512,7 +2475,7 @@ AbstractRequest.Builder createRequest(int timeoutMs) { return new DescribeClusterRequest.Builder(new DescribeClusterRequestData() .setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations()) .setEndpointType(metadataManager.usingBootstrapControllers() ? - EndpointType.CONTROLLER.id() : EndpointType.BROKER.id()) + EndpointType.CONTROLLER.id() : EndpointType.BROKER.id()) .setIncludeFencedBrokers(options.includeFencedBrokers())); } else { // Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it @@ -2531,7 +2494,8 @@ void handleResponse(AbstractResponse abstractResponse) { DescribeClusterResponse response = (DescribeClusterResponse) abstractResponse; Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { - handleFailure(error.exception(response.data().errorMessage())); + ApiError apiError = new ApiError(error, response.data().errorMessage()); + handleFailure(apiError.exception()); return; } @@ -2595,7 +2559,7 @@ public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAc if (filter.isUnknown()) { KafkaFutureImpl> future = new KafkaFutureImpl<>(); future.completeExceptionally(new InvalidRequestException("The AclBindingFilter " + - "must not contain UNKNOWN elements.")); + "must not contain UNKNOWN elements.")); return new DescribeAclsResult(future); } final long now = time.milliseconds(); @@ -2795,15 +2759,15 @@ void handleResponse(AbstractResponse abstractResponse) { if (future == null) { if (node != null) { log.warn("The config {} in the response from node {} is not in the request", - configResource, node); + configResource, node); } else { log.warn("The config {} in the response from the least loaded broker is not in the request", - configResource); + configResource); } } else { if (describeConfigsResult.errorCode() != Errors.NONE.code()) { future.completeExceptionally(Errors.forCode(describeConfigsResult.errorCode()) - .exception(describeConfigsResult.errorMessage())); + .exception(describeConfigsResult.errorMessage())); } else { future.complete(describeConfigResult(describeConfigsResult)); } @@ -2839,15 +2803,15 @@ void handleFailure(Throwable throwable) { private Config describeConfigResult(DescribeConfigsResponseData.DescribeConfigsResult describeConfigsResult) { return new Config(describeConfigsResult.configs().stream().map(config -> new ConfigEntry( - config.name(), - config.value(), - DescribeConfigsResponse.ConfigSource.forId(config.configSource()).source(), - config.isSensitive(), - config.readOnly(), - (config.synonyms().stream().map(synonym -> new ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(), - DescribeConfigsResponse.ConfigSource.forId(synonym.source()).source()))).collect(Collectors.toList()), - DescribeConfigsResponse.ConfigType.forId(config.configType()).type(), - config.documentation() + config.name(), + config.value(), + DescribeConfigsResponse.ConfigSource.forId(config.configSource()).source(), + config.isSensitive(), + config.readOnly(), + (config.synonyms().stream().map(synonym -> new ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(), + DescribeConfigsResponse.ConfigSource.forId(synonym.source()).source()))).collect(Collectors.toList()), + DescribeConfigsResponse.ConfigType.forId(config.configType()).type(), + config.documentation() )).collect(Collectors.toList())); } @@ -2959,7 +2923,7 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map()); Map replicaAssignmentByBroker = new HashMap<>(); - for (Map.Entry entry: replicaAssignment.entrySet()) { + for (Map.Entry entry : replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); @@ -2980,7 +2944,7 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map entry: replicaAssignmentByBroker.entrySet()) { + for (Map.Entry entry : replicaAssignmentByBroker.entrySet()) { final int brokerId = entry.getKey(); final AlterReplicaLogDirsRequestData assignment = entry.getValue(); @@ -2995,15 +2959,15 @@ public AlterReplicaLogDirsRequest.Builder createRequest(int timeoutMs) { @Override public void handleResponse(AbstractResponse abstractResponse) { AlterReplicaLogDirsResponse response = (AlterReplicaLogDirsResponse) abstractResponse; - for (AlterReplicaLogDirTopicResult topicResult: response.data().results()) { - for (AlterReplicaLogDirPartitionResult partitionResult: topicResult.partitions()) { + for (AlterReplicaLogDirTopicResult topicResult : response.data().results()) { + for (AlterReplicaLogDirPartitionResult partitionResult : topicResult.partitions()) { TopicPartitionReplica replica = new TopicPartitionReplica( - topicResult.topicName(), partitionResult.partitionIndex(), brokerId); + topicResult.topicName(), partitionResult.partitionIndex(), brokerId); KafkaFutureImpl future = futures.get(replica); if (future == null) { log.warn("The partition {} in the response from broker {} is not in the request", - new TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()), - brokerId); + new TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()), + brokerId); } else if (partitionResult.errorCode() == Errors.NONE.code()) { future.complete(null); } else { @@ -3015,8 +2979,9 @@ public void handleResponse(AbstractResponse abstractResponse) { completeUnrealizedFutures( futures.entrySet().stream().filter(entry -> entry.getKey().brokerId() == brokerId), replica -> "The response from broker " + brokerId + - " did not contain a result for replica " + replica); + " did not contain a result for replica " + replica); } + @Override void handleFailure(Throwable throwable) { // Only completes the futures of brokerId @@ -3059,11 +3024,12 @@ public void handleResponse(AbstractResponse abstractResponse) { } else { // Up to v3 DescribeLogDirsResponse did not have an error code field, hence it defaults to None Errors error = response.data().errorCode() == Errors.NONE.code() - ? Errors.CLUSTER_AUTHORIZATION_FAILED - : Errors.forCode(response.data().errorCode()); + ? Errors.CLUSTER_AUTHORIZATION_FAILED + : Errors.forCode(response.data().errorCode()); future.completeExceptionally(error.exception()); } } + @Override void handleFailure(Throwable throwable) { future.completeExceptionally(throwable); @@ -3081,15 +3047,15 @@ private static Map logDirDescriptions(DescribeLogDirs for (DescribeLogDirsResponseData.DescribeLogDirsTopic t : logDirResult.topics()) { for (DescribeLogDirsResponseData.DescribeLogDirsPartition p : t.partitions()) { replicaInfoMap.put( - new TopicPartition(t.name(), p.partitionIndex()), - new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey())); + new TopicPartition(t.name(), p.partitionIndex()), + new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey())); } } result.put(logDirResult.logDir(), new LogDirDescription( - Errors.forCode(logDirResult.errorCode()).exception(), - replicaInfoMap, - logDirResult.totalBytes(), - logDirResult.usableBytes())); + Errors.forCode(logDirResult.errorCode()).exception(), + replicaInfoMap, + logDirResult.totalBytes(), + logDirResult.usableBytes())); } return result; } @@ -3104,7 +3070,7 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection partitionsByBroker = new HashMap<>(); - for (TopicPartitionReplica replica: replicas) { + for (TopicPartitionReplica replica : replicas) { DescribeLogDirsRequestData requestData = partitionsByBroker.computeIfAbsent(replica.brokerId(), brokerId -> new DescribeLogDirsRequestData()); DescribableLogDirTopic describableLogDirTopic = requestData.topics().find(replica.topic()); @@ -3112,7 +3078,7 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection partitions = new ArrayList<>(); partitions.add(replica.partition()); describableLogDirTopic = new DescribableLogDirTopic().setTopic(replica.topic()) - .setPartitions(partitions); + .setPartitions(partitions); requestData.topics().add(describableLogDirTopic); } else { describableLogDirTopic.partitions().add(replica.partition()); @@ -3120,11 +3086,11 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection entry: partitionsByBroker.entrySet()) { + for (Map.Entry entry : partitionsByBroker.entrySet()) { final int brokerId = entry.getKey(); final DescribeLogDirsRequestData topicPartitions = entry.getValue(); final Map replicaDirInfoByPartition = new HashMap<>(); - for (DescribableLogDirTopic topicPartition: topicPartitions.topics()) { + for (DescribableLogDirTopic topicPartition : topicPartitions.topics()) { for (Integer partitionId : topicPartition.partitions()) { replicaDirInfoByPartition.put(new TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo()); } @@ -3142,7 +3108,7 @@ public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) { @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; - for (Map.Entry responseEntry: logDirDescriptions(response).entrySet()) { + for (Map.Entry responseEntry : logDirDescriptions(response).entrySet()) { String logDir = responseEntry.getKey(); LogDirDescription logDirInfo = responseEntry.getValue(); @@ -3153,7 +3119,7 @@ public void handleResponse(AbstractResponse abstractResponse) { handleFailure(new IllegalStateException( "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); - for (Map.Entry replicaInfoEntry: logDirInfo.replicaInfos().entrySet()) { + for (Map.Entry replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { TopicPartition tp = replicaInfoEntry.getKey(); ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); @@ -3161,24 +3127,25 @@ public void handleResponse(AbstractResponse abstractResponse) { log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); } else if (replicaInfo.isFuture()) { replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), - replicaLogDirInfo.getCurrentReplicaOffsetLag(), - logDir, - replicaInfo.offsetLag())); + replicaLogDirInfo.getCurrentReplicaOffsetLag(), + logDir, + replicaInfo.offsetLag())); } else { replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, - replicaInfo.offsetLag(), - replicaLogDirInfo.getFutureReplicaLogDir(), - replicaLogDirInfo.getFutureReplicaOffsetLag())); + replicaInfo.offsetLag(), + replicaLogDirInfo.getFutureReplicaLogDir(), + replicaLogDirInfo.getFutureReplicaOffsetLag())); } } } - for (Map.Entry entry: replicaDirInfoByPartition.entrySet()) { + for (Map.Entry entry : replicaDirInfoByPartition.entrySet()) { TopicPartition tp = entry.getKey(); KafkaFutureImpl future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId)); future.complete(entry.getValue()); } } + @Override void handleFailure(Throwable throwable) { completeAllExceptionally(futures.values(), throwable); @@ -3314,8 +3281,8 @@ public CreateDelegationTokenResult createDelegationToken(final CreateDelegationT List renewers = new ArrayList<>(); for (KafkaPrincipal principal : options.renewers()) { renewers.add(new CreatableRenewers() - .setPrincipalName(principal.getName()) - .setPrincipalType(principal.getPrincipalType())); + .setPrincipalName(principal.getName()) + .setPrincipalType(principal.getPrincipalType())); } runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @@ -3339,7 +3306,7 @@ void handleResponse(AbstractResponse abstractResponse) { delegationTokenFuture.completeExceptionally(response.error().exception()); } else { CreateDelegationTokenResponseData data = response.data(); - TokenInformation tokenInfo = new TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(), data.principalName()), + TokenInformation tokenInfo = new TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(), data.principalName()), new KafkaPrincipal(data.tokenRequesterPrincipalType(), data.tokenRequesterPrincipalName()), options.renewers(), data.issueTimestampMs(), data.maxTimestampMs(), data.expiryTimestampMs()); DelegationToken token = new DelegationToken(tokenInfo, data.hmac()); @@ -3358,7 +3325,7 @@ void handleFailure(Throwable throwable) { @Override public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final RenewDelegationTokenOptions options) { - final KafkaFutureImpl expiryTimeFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl expiryTimeFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @@ -3366,7 +3333,7 @@ public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, final @Override RenewDelegationTokenRequest.Builder createRequest(int timeoutMs) { return new RenewDelegationTokenRequest.Builder( - new RenewDelegationTokenRequestData() + new RenewDelegationTokenRequestData() .setHmac(hmac) .setRenewPeriodMs(options.renewTimePeriodMs())); } @@ -3392,7 +3359,7 @@ void handleFailure(Throwable throwable) { @Override public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) { - final KafkaFutureImpl expiryTimeFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl expiryTimeFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @@ -3400,9 +3367,9 @@ public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, fina @Override ExpireDelegationTokenRequest.Builder createRequest(int timeoutMs) { return new ExpireDelegationTokenRequest.Builder( - new ExpireDelegationTokenRequestData() - .setHmac(hmac) - .setExpiryTimePeriodMs(options.expiryTimePeriodMs())); + new ExpireDelegationTokenRequestData() + .setHmac(hmac) + .setExpiryTimePeriodMs(options.expiryTimePeriodMs())); } @Override @@ -3426,7 +3393,7 @@ void handleFailure(Throwable throwable) { @Override public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) { - final KafkaFutureImpl> tokensFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl> tokensFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @@ -3540,27 +3507,29 @@ ListGroupsRequest.Builder createRequest(int timeoutMs) { } private void maybeAddGroup(ListGroupsResponseData.ListedGroup group) { - final String groupId = group.groupId(); - final Optional type; - if (group.groupType() == null || group.groupType().isEmpty()) { - type = Optional.empty(); - } else { - type = Optional.of(GroupType.parse(group.groupType())); - } - final String protocolType = group.protocolType(); - final Optional groupState; - if (group.groupState() == null || group.groupState().isEmpty()) { - groupState = Optional.empty(); - } else { - groupState = Optional.of(GroupState.parse(group.groupState())); + String protocolType = group.protocolType(); + if (options.protocolTypes().isEmpty() || options.protocolTypes().contains(protocolType)) { + final String groupId = group.groupId(); + final Optional type; + if (group.groupType() == null || group.groupType().isEmpty()) { + type = Optional.empty(); + } else { + type = Optional.of(GroupType.parse(group.groupType())); + } + final Optional groupState; + if (group.groupState() == null || group.groupState().isEmpty()) { + groupState = Optional.empty(); + } else { + groupState = Optional.of(GroupState.parse(group.groupState())); + } + final GroupListing groupListing = new GroupListing( + groupId, + type, + protocolType, + groupState + ); + results.addListing(groupListing); } - final GroupListing groupListing = new GroupListing( - groupId, - type, - protocolType, - groupState - ); - results.addListing(groupListing); } @Override @@ -3606,11 +3575,11 @@ void handleFailure(Throwable throwable) { public DescribeConsumerGroupsResult describeConsumerGroups(final Collection groupIds, final DescribeConsumerGroupsOptions options) { SimpleAdminApiFuture future = - DescribeConsumerGroupsHandler.newFuture(groupIds); + DescribeConsumerGroupsHandler.newFuture(groupIds); DescribeConsumerGroupsHandler handler = new DescribeConsumerGroupsHandler(options.includeAuthorizedOperations(), logContext); invokeDriver(handler, future, options.timeoutMs); return new DescribeConsumerGroupsResult(future.all().entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); + .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } @Deprecated @@ -3687,13 +3656,13 @@ void handleResponse(AbstractResponse abstractResponse) { @Override ListGroupsRequest.Builder createRequest(int timeoutMs) { List states = options.groupStates() - .stream() - .map(GroupState::toString) - .collect(Collectors.toList()); + .stream() + .map(GroupState::toString) + .collect(Collectors.toList()); List groupTypes = options.types() - .stream() - .map(GroupType::toString) - .collect(Collectors.toList()); + .stream() + .map(GroupType::toString) + .collect(Collectors.toList()); return new ListGroupsRequest.Builder(new ListGroupsRequestData() .setStatesFilter(states) .setTypesFilter(groupTypes) @@ -3705,17 +3674,17 @@ private void maybeAddConsumerGroup(ListGroupsResponseData.ListedGroup group) { if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { final String groupId = group.groupId(); final Optional groupState = group.groupState().isEmpty() - ? Optional.empty() - : Optional.of(GroupState.parse(group.groupState())); + ? Optional.empty() + : Optional.of(GroupState.parse(group.groupState())); final Optional type = group.groupType().isEmpty() - ? Optional.empty() - : Optional.of(GroupType.parse(group.groupType())); + ? Optional.empty() + : Optional.of(GroupType.parse(group.groupType())); final ConsumerGroupListing groupListing = new ConsumerGroupListing( - groupId, - groupState, - type, - protocolType.isEmpty() - ); + groupId, + groupState, + type, + protocolType.isEmpty() + ); results.addListing(groupListing); } } @@ -3763,7 +3732,7 @@ void handleFailure(Throwable throwable) { public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map groupSpecs, ListConsumerGroupOffsetsOptions options) { SimpleAdminApiFuture> future = - ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet()); + ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet()); ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupSpecs, options.requireStable(), logContext); invokeDriver(handler, future, options.timeoutMs); @@ -3772,37 +3741,42 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map groupSpecs, - ListStreamsGroupOffsetsOptions options) { + ListStreamsGroupOffsetsOptions options) { Map consumerGroupSpecs = groupSpecs.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> new ListConsumerGroupOffsetsSpec().topicPartitions(entry.getValue().topicPartitions()) )); - return new ListStreamsGroupOffsetsResult(listConsumerGroupOffsets(consumerGroupSpecs, new ListConsumerGroupOffsetsOptions())); + ListConsumerGroupOffsetsOptions consumerGroupOptions = new ListConsumerGroupOffsetsOptions() + .requireStable(options.requireStable()) + .timeoutMs(options.timeoutMs()); + return new ListStreamsGroupOffsetsResult(listConsumerGroupOffsets(consumerGroupSpecs, consumerGroupOptions)); } @Override public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds, DeleteConsumerGroupsOptions options) { SimpleAdminApiFuture future = - DeleteConsumerGroupsHandler.newFuture(groupIds); + DeleteConsumerGroupsHandler.newFuture(groupIds); DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext); invokeDriver(handler, future, options.timeoutMs); return new DeleteConsumerGroupsResult(future.all().entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); + .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } @Override public DeleteStreamsGroupsResult deleteStreamsGroups(Collection groupIds, DeleteStreamsGroupsOptions options) { - return new DeleteStreamsGroupsResult(deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions())); + DeleteConsumerGroupsOptions consumerGroupOptions = new DeleteConsumerGroupsOptions() + .timeoutMs(options.timeoutMs()); + return new DeleteStreamsGroupsResult(deleteConsumerGroups(groupIds, consumerGroupOptions)); } @Override public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets( - String groupId, - Set partitions, - DeleteConsumerGroupOffsetsOptions options) { + String groupId, + Set partitions, + DeleteConsumerGroupOffsetsOptions options) { SimpleAdminApiFuture> future = - DeleteConsumerGroupOffsetsHandler.newFuture(groupId); + DeleteConsumerGroupOffsetsHandler.newFuture(groupId); DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, partitions, logContext); invokeDriver(handler, future, options.timeoutMs); return new DeleteConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), partitions); @@ -3813,18 +3787,20 @@ public DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets( String groupId, Set partitions, DeleteStreamsGroupOffsetsOptions options) { - return new DeleteStreamsGroupOffsetsResult(deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions())); + DeleteConsumerGroupOffsetsOptions consumerGroupOptions = new DeleteConsumerGroupOffsetsOptions() + .timeoutMs(options.timeoutMs()); + return new DeleteStreamsGroupOffsetsResult(deleteConsumerGroupOffsets(groupId, partitions, consumerGroupOptions)); } @Override public DescribeShareGroupsResult describeShareGroups(final Collection groupIds, final DescribeShareGroupsOptions options) { SimpleAdminApiFuture future = - DescribeShareGroupsHandler.newFuture(groupIds); + DescribeShareGroupsHandler.newFuture(groupIds); DescribeShareGroupsHandler handler = new DescribeShareGroupsHandler(options.includeAuthorizedOperations(), logContext); invokeDriver(handler, future, options.timeoutMs); return new DescribeShareGroupsResult(future.all().entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); + .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } @Override @@ -3838,7 +3814,7 @@ public AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map groupSpecs, final ListShareGroupOffsetsOptions options) { - SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); + SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); ListShareGroupOffsetsHandler handler = new ListShareGroupOffsetsHandler(groupSpecs, logContext); invokeDriver(handler, future, options.timeoutMs); return new ListShareGroupOffsetsResult(future.all()); @@ -3891,13 +3867,13 @@ public DeleteShareGroupsResult deleteShareGroups(Collection groupIds, De @Override public ElectLeadersResult electLeaders( - final ElectionType electionType, - final Set topicPartitions, - ElectLeadersOptions options) { + final ElectionType electionType, + final Set topicPartitions, + ElectLeadersOptions options) { final KafkaFutureImpl>> electionFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("electLeaders", calcDeadlineMs(now, options.timeoutMs()), - new ControllerNodeProvider()) { + new ControllerNodeProvider()) { @Override public ElectLeadersRequest.Builder createRequest(int timeoutMs) { @@ -3930,8 +3906,8 @@ void handleFailure(Throwable throwable) { @Override public AlterPartitionReassignmentsResult alterPartitionReassignments( - Map> reassignments, - AlterPartitionReassignmentsOptions options) { + Map> reassignments, + AlterPartitionReassignmentsOptions options) { final Map> futures = new HashMap<>(); final Map>> topicsToReassignments = new TreeMap<>(); for (Map.Entry> entry : reassignments.entrySet()) { @@ -3944,13 +3920,13 @@ public AlterPartitionReassignmentsResult alterPartitionReassignments( if (topicNameIsUnrepresentable(topic)) { future.completeExceptionally(new InvalidTopicException("The given topic name '" + - topic + "' cannot be represented in a request.")); + topic + "' cannot be represented in a request.")); } else if (topicPartition.partition() < 0) { future.completeExceptionally(new InvalidTopicException("The given partition index " + - topicPartition.partition() + " is not valid.")); + topicPartition.partition() + " is not valid.")); } else { Map> partitionReassignments = - topicsToReassignments.get(topicPartition.topic()); + topicsToReassignments.get(topicPartition.topic()); if (partitionReassignments == null) { partitionReassignments = new TreeMap<>(); topicsToReassignments.put(topic, partitionReassignments); @@ -3962,32 +3938,32 @@ public AlterPartitionReassignmentsResult alterPartitionReassignments( final long now = time.milliseconds(); Call call = new Call("alterPartitionReassignments", calcDeadlineMs(now, options.timeoutMs()), - new ControllerNodeProvider(true)) { + new ControllerNodeProvider(true)) { @Override public AlterPartitionReassignmentsRequest.Builder createRequest(int timeoutMs) { AlterPartitionReassignmentsRequestData data = - new AlterPartitionReassignmentsRequestData(); + new AlterPartitionReassignmentsRequestData(); for (Map.Entry>> entry : - topicsToReassignments.entrySet()) { + topicsToReassignments.entrySet()) { String topicName = entry.getKey(); Map> partitionsToReassignments = entry.getValue(); List reassignablePartitions = new ArrayList<>(); for (Map.Entry> partitionEntry : - partitionsToReassignments.entrySet()) { + partitionsToReassignments.entrySet()) { int partitionIndex = partitionEntry.getKey(); Optional reassignment = partitionEntry.getValue(); ReassignablePartition reassignablePartition = new ReassignablePartition() - .setPartitionIndex(partitionIndex) - .setReplicas(reassignment.map(NewPartitionReassignment::targetReplicas).orElse(null)); + .setPartitionIndex(partitionIndex) + .setReplicas(reassignment.map(NewPartitionReassignment::targetReplicas).orElse(null)); reassignablePartitions.add(reassignablePartition); } ReassignableTopic reassignableTopic = new ReassignableTopic() - .setName(topicName) - .setPartitions(reassignablePartitions); + .setName(topicName) + .setPartitions(reassignablePartitions); data.topics().add(reassignableTopic); } data.setTimeoutMs(timeoutMs); @@ -4014,8 +3990,8 @@ public void handleResponse(AbstractResponse abstractResponse) { String topicName = topicResponse.name(); for (ReassignablePartitionResponse partition : topicResponse.partitions()) { errors.put( - new TopicPartition(topicName, partition.partitionIndex()), - new ApiError(topLevelError, response.data().errorMessage()).exception() + new TopicPartition(topicName, partition.partitionIndex()), + new ApiError(topLevelError, response.data().errorMessage()).exception() ); receivedResponsesCount += 1; } @@ -4087,10 +4063,10 @@ public ListPartitionReassignmentsResult listPartitionReassignments(Optional> getMembersFromGroup(String groupId } else { List membersToRemove = res.members().stream().map(member -> member.groupInstanceId().map(id -> new MemberIdentity().setGroupInstanceId(id)) - .orElseGet(() -> new MemberIdentity().setMemberId(member.consumerId())) - .setReason(reason) + .orElseGet(() -> new MemberIdentity().setMemberId(member.consumerId())) + .setReason(reason) ).collect(Collectors.toList()); future.complete(membersToRemove); @@ -4229,7 +4205,7 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin DEFAULT_LEAVE_GROUP_REASON : JoinGroupRequest.maybeTruncateReason(options.reason()); final SimpleAdminApiFuture> adminFuture = - RemoveMembersFromConsumerGroupHandler.newFuture(groupId); + RemoveMembersFromConsumerGroupHandler.newFuture(groupId); KafkaFutureImpl> memFuture; if (options.removeAll()) { @@ -4237,8 +4213,8 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin } else { memFuture = new KafkaFutureImpl<>(); memFuture.complete(options.members().stream() - .map(m -> m.toMemberIdentity().setReason(reason)) - .collect(Collectors.toList())); + .map(m -> m.toMemberIdentity().setReason(reason)) + .collect(Collectors.toList())); } memFuture.whenComplete((members, ex) -> { @@ -4260,7 +4236,7 @@ public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets( AlterConsumerGroupOffsetsOptions options ) { SimpleAdminApiFuture> future = - AlterConsumerGroupOffsetsHandler.newFuture(groupId); + AlterConsumerGroupOffsetsHandler.newFuture(groupId); AlterConsumerGroupOffsetsHandler handler = new AlterConsumerGroupOffsetsHandler(groupId, offsets, logContext); invokeDriver(handler, future, options.timeoutMs); return new AlterConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId))); @@ -4272,7 +4248,9 @@ public AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets( Map offsets, AlterStreamsGroupOffsetsOptions options ) { - return new AlterStreamsGroupOffsetsResult(alterConsumerGroupOffsets(groupId, offsets, new AlterConsumerGroupOffsetsOptions())); + AlterConsumerGroupOffsetsOptions consumerGroupOptions = new AlterConsumerGroupOffsetsOptions() + .timeoutMs(options.timeoutMs()); + return new AlterStreamsGroupOffsetsResult(alterConsumerGroupOffsets(groupId, offsets, consumerGroupOptions)); } @Override @@ -4293,24 +4271,24 @@ public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, final long now = time.milliseconds(); runnable.call(new Call("describeClientQuotas", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedNodeProvider()) { - @Override - DescribeClientQuotasRequest.Builder createRequest(int timeoutMs) { - return new DescribeClientQuotasRequest.Builder(filter); - } + @Override + DescribeClientQuotasRequest.Builder createRequest(int timeoutMs) { + return new DescribeClientQuotasRequest.Builder(filter); + } - @Override - void handleResponse(AbstractResponse abstractResponse) { - DescribeClientQuotasResponse response = (DescribeClientQuotasResponse) abstractResponse; - response.complete(future); - } + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeClientQuotasResponse response = (DescribeClientQuotasResponse) abstractResponse; + response.complete(future); + } - @Override - void handleFailure(Throwable throwable) { - future.completeExceptionally(throwable); - } - }, now); + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }, now); return new DescribeClientQuotasResult(future); } @@ -4324,24 +4302,24 @@ public AlterClientQuotasResult alterClientQuotas(Collection dataFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedNodeProvider()) { @Override public DescribeUserScramCredentialsRequest.Builder createRequest(final int timeoutMs) { final DescribeUserScramCredentialsRequestData requestData = new DescribeUserScramCredentialsRequestData(); @@ -4397,7 +4375,7 @@ public AlterUserScramCredentialsResult alterUserScramCredentials(List> futures = new HashMap<>(); - for (UserScramCredentialAlteration alteration: alterations) { + for (UserScramCredentialAlteration alteration : alterations) { futures.put(alteration.user(), new KafkaFutureImpl<>()); } final Map userIllegalAlterationExceptions = new HashMap<>(); @@ -4421,55 +4399,55 @@ public AlterUserScramCredentialsResult alterUserScramCredentials(List> userInsertions = new HashMap<>(); alterations.stream().filter(a -> a instanceof UserScramCredentialUpsertion) - .filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.user())) - .forEach(alteration -> { - final String user = alteration.user(); - if (user == null || user.isEmpty()) { - userIllegalAlterationExceptions.put(alteration.user(), new UnacceptableCredentialException(usernameMustNotBeEmptyMsg)); - } else { - UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; - try { - byte[] password = upsertion.password(); - if (password == null || password.length == 0) { - userIllegalAlterationExceptions.put(user, new UnacceptableCredentialException(passwordMustNotBeEmptyMsg)); + .filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.user())) + .forEach(alteration -> { + final String user = alteration.user(); + if (user == null || user.isEmpty()) { + userIllegalAlterationExceptions.put(alteration.user(), new UnacceptableCredentialException(usernameMustNotBeEmptyMsg)); + } else { + UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; + try { + byte[] password = upsertion.password(); + if (password == null || password.length == 0) { + userIllegalAlterationExceptions.put(user, new UnacceptableCredentialException(passwordMustNotBeEmptyMsg)); + } else { + ScramMechanism mechanism = upsertion.credentialInfo().mechanism(); + if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { + userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg)); } else { - ScramMechanism mechanism = upsertion.credentialInfo().mechanism(); - if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { - userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg)); - } else { - userInsertions.putIfAbsent(user, new HashMap<>()); - userInsertions.get(user).put(mechanism, getScramCredentialUpsertion(upsertion)); - } + userInsertions.putIfAbsent(user, new HashMap<>()); + userInsertions.get(user).put(mechanism, getScramCredentialUpsertion(upsertion)); } - } catch (NoSuchAlgorithmException e) { - // we might overwrite an exception from a previous alteration, but we don't really care - // since we just need to mark this user as having at least one illegal alteration - // and make an exception instance available for completing the corresponding future exceptionally - userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg)); - } catch (InvalidKeyException e) { - // generally shouldn't happen since we deal with the empty password case above, - // but we still need to catch/handle it - userIllegalAlterationExceptions.put(user, new UnacceptableCredentialException(e.getMessage(), e)); } + } catch (NoSuchAlgorithmException e) { + // we might overwrite an exception from a previous alteration, but we don't really care + // since we just need to mark this user as having at least one illegal alteration + // and make an exception instance available for completing the corresponding future exceptionally + userIllegalAlterationExceptions.put(user, new UnsupportedSaslMechanismException(unknownScramMechanismMsg)); + } catch (InvalidKeyException e) { + // generally shouldn't happen since we deal with the empty password case above, + // but we still need to catch/handle it + userIllegalAlterationExceptions.put(user, new UnacceptableCredentialException(e.getMessage(), e)); } - }); + } + }); // submit alterations only for users that do not have an illegal alteration as identified above Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), - new ControllerNodeProvider()) { + new ControllerNodeProvider()) { @Override public AlterUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { return new AlterUserScramCredentialsRequest.Builder( - new AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream() - .filter(a -> a instanceof UserScramCredentialUpsertion) - .filter(a -> !userIllegalAlterationExceptions.containsKey(a.user())) - .map(a -> userInsertions.get(a.user()).get(((UserScramCredentialUpsertion) a).credentialInfo().mechanism())) - .collect(Collectors.toList())) + new AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream() + .filter(a -> a instanceof UserScramCredentialUpsertion) + .filter(a -> !userIllegalAlterationExceptions.containsKey(a.user())) + .map(a -> userInsertions.get(a.user()).get(((UserScramCredentialUpsertion) a).credentialInfo().mechanism())) + .collect(Collectors.toList())) .setDeletions(alterations.stream() - .filter(a -> a instanceof UserScramCredentialDeletion) - .filter(a -> !userIllegalAlterationExceptions.containsKey(a.user())) - .map(d -> getScramCredentialDeletion((UserScramCredentialDeletion) d)) - .collect(Collectors.toList()))); + .filter(a -> a instanceof UserScramCredentialDeletion) + .filter(a -> !userIllegalAlterationExceptions.containsKey(a.user())) + .map(d -> getScramCredentialDeletion((UserScramCredentialDeletion) d)) + .collect(Collectors.toList()))); } @Override @@ -4519,10 +4497,10 @@ void handleFailure(Throwable throwable) { private static AlterUserScramCredentialsRequestData.ScramCredentialUpsertion getScramCredentialUpsertion(UserScramCredentialUpsertion u) throws InvalidKeyException, NoSuchAlgorithmException { AlterUserScramCredentialsRequestData.ScramCredentialUpsertion retval = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion(); return retval.setName(u.user()) - .setMechanism(u.credentialInfo().mechanism().type()) - .setIterations(u.credentialInfo().iterations()) - .setSalt(u.salt()) - .setSaltedPassword(getSaltedPassword(u.credentialInfo().mechanism(), u.password(), u.salt(), u.credentialInfo().iterations())); + .setMechanism(u.credentialInfo().mechanism().type()) + .setIterations(u.credentialInfo().iterations()) + .setSalt(u.salt()) + .setSaltedPassword(getSaltedPassword(u.credentialInfo().mechanism(), u.password(), u.salt(), u.credentialInfo().iterations())); } private static AlterUserScramCredentialsRequestData.ScramCredentialDeletion getScramCredentialDeletion(UserScramCredentialDeletion d) { @@ -4531,7 +4509,7 @@ private static AlterUserScramCredentialsRequestData.ScramCredentialDeletion getS private static byte[] getSaltedPassword(ScramMechanism publicScramMechanism, byte[] password, byte[] salt, int iterations) throws NoSuchAlgorithmException, InvalidKeyException { return new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(publicScramMechanism.mechanismName())) - .hi(password, salt, iterations); + .hi(password, salt, iterations); } @Override @@ -4657,7 +4635,7 @@ void handleResponse(AbstractResponse abstractResponse) { } // The server should send back a response for every feature, but we do a sanity check anyway. completeUnrealizedFutures(updateFutures.entrySet().stream(), - feature -> "The controller response did not contain a result for feature " + feature); + feature -> "The controller response did not contain a result for feature " + feature); } break; case NOT_CONTROLLER: @@ -4688,15 +4666,15 @@ public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuoru final KafkaFutureImpl future = new KafkaFutureImpl<>(); final long now = time.milliseconds(); final Call call = new Call( - "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { private QuorumInfo.ReplicaState translateReplicaState(DescribeQuorumResponseData.ReplicaState replica) { return new QuorumInfo.ReplicaState( - replica.replicaId(), - replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID : replica.replicaDirectoryId(), - replica.logEndOffset(), - replica.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(replica.lastFetchTimestamp()), - replica.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp())); + replica.replicaId(), + replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID : replica.replicaDirectoryId(), + replica.logEndOffset(), + replica.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(replica.lastFetchTimestamp()), + replica.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp())); } private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition, DescribeQuorumResponseData.NodeCollection nodeCollection) { @@ -4729,7 +4707,7 @@ private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.Partition @Override DescribeQuorumRequest.Builder createRequest(int timeoutMs) { return new Builder(DescribeQuorumRequest.singletonRequest( - new TopicPartition(CLUSTER_METADATA_TOPIC_NAME, CLUSTER_METADATA_TOPIC_PARTITION.partition()))); + new TopicPartition(CLUSTER_METADATA_TOPIC_NAME, CLUSTER_METADATA_TOPIC_PARTITION.partition()))); } @Override @@ -4741,27 +4719,27 @@ void handleResponse(AbstractResponse response) { } if (quorumResponse.data().topics().size() != 1) { String msg = String.format("DescribeMetadataQuorum received %d topics when 1 was expected", - quorumResponse.data().topics().size()); + quorumResponse.data().topics().size()); log.debug(msg); throw new UnknownServerException(msg); } DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0); if (!topic.topicName().equals(CLUSTER_METADATA_TOPIC_NAME)) { String msg = String.format("DescribeMetadataQuorum received a topic with name %s when %s was expected", - topic.topicName(), CLUSTER_METADATA_TOPIC_NAME); + topic.topicName(), CLUSTER_METADATA_TOPIC_NAME); log.debug(msg); throw new UnknownServerException(msg); } if (topic.partitions().size() != 1) { String msg = String.format("DescribeMetadataQuorum received a topic %s with %d partitions when 1 was expected", - topic.topicName(), topic.partitions().size()); + topic.topicName(), topic.partitions().size()); log.debug(msg); throw new UnknownServerException(msg); } DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0); if (partition.partitionIndex() != CLUSTER_METADATA_TOPIC_PARTITION.partition()) { String msg = String.format("DescribeMetadataQuorum received a single partition with index %d when %d was expected", - partition.partitionIndex(), CLUSTER_METADATA_TOPIC_PARTITION.partition()); + partition.partitionIndex(), CLUSTER_METADATA_TOPIC_PARTITION.partition()); log.debug(msg); throw new UnknownServerException(msg); } @@ -4786,19 +4764,19 @@ public UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOpt final KafkaFutureImpl future = new KafkaFutureImpl<>(); final long now = time.milliseconds(); final Call call = new Call("unregisterBroker", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedBrokerOrActiveKController()) { + new LeastLoadedBrokerOrActiveKController()) { @Override UnregisterBrokerRequest.Builder createRequest(int timeoutMs) { UnregisterBrokerRequestData data = - new UnregisterBrokerRequestData().setBrokerId(brokerId); + new UnregisterBrokerRequestData().setBrokerId(brokerId); return new UnregisterBrokerRequest.Builder(data); } @Override void handleResponse(AbstractResponse abstractResponse) { final UnregisterBrokerResponse response = - (UnregisterBrokerResponse) abstractResponse; + (UnregisterBrokerResponse) abstractResponse; Errors error = Errors.forCode(response.data().errorCode()); switch (error) { case NONE: @@ -4858,7 +4836,7 @@ public AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortT * where a coordinator may need to unilaterally terminate a participant transaction that hasn't completed. *

* - * @param transactionalId The transactional ID whose active transaction should be forcefully terminated. + * @param transactionalId The transactional ID whose active transaction should be forcefully terminated. * @return a {@link TerminateTransactionResult} that can be used to await the operation result. */ @Override @@ -4897,6 +4875,45 @@ public FenceProducersResult fenceProducers(Collection transactionalIds, return new FenceProducersResult(future.all()); } + @Override + public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { + final long now = time.milliseconds(); + final KafkaFutureImpl> future = new KafkaFutureImpl<>(); + final Call call = new Call("listConfigResources", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { + + @Override + ListConfigResourcesRequest.Builder createRequest(int timeoutMs) { + return new ListConfigResourcesRequest.Builder( + new ListConfigResourcesRequestData() + .setResourceTypes( + configResourceTypes + .stream() + .map(ConfigResource.Type::id) + .collect(Collectors.toList()) + ) + ); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + ListConfigResourcesResponse response = (ListConfigResourcesResponse) abstractResponse; + if (response.error().isFailure()) { + future.completeExceptionally(response.error().exception()); + } else { + future.complete(response.configResources()); + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }; + runnable.call(call, now); + return new ListConfigResourcesResult(future); + } + + @SuppressWarnings({"deprecation", "removal"}) @Override public ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options) { final long now = time.milliseconds(); @@ -4905,17 +4922,26 @@ public ListClientMetricsResourcesResult listClientMetricsResources(ListClientMet new LeastLoadedNodeProvider()) { @Override - ListClientMetricsResourcesRequest.Builder createRequest(int timeoutMs) { - return new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()); + ListConfigResourcesRequest.Builder createRequest(int timeoutMs) { + return new ListConfigResourcesRequest.Builder( + new ListConfigResourcesRequestData() + .setResourceTypes(List.of(ConfigResource.Type.CLIENT_METRICS.id())) + ); } @Override void handleResponse(AbstractResponse abstractResponse) { - ListClientMetricsResourcesResponse response = (ListClientMetricsResourcesResponse) abstractResponse; + ListConfigResourcesResponse response = (ListConfigResourcesResponse) abstractResponse; if (response.error().isFailure()) { future.completeExceptionally(response.error().exception()); } else { - future.complete(response.clientMetricsResources()); + future.complete(response + .data() + .configResources() + .stream() + .filter(entry -> entry.resourceType() == ConfigResource.Type.CLIENT_METRICS.id()) + .map(entry -> new ClientMetricsResourceListing(entry.resourceName())) + .collect(Collectors.toList())); } } @@ -4939,7 +4965,7 @@ public AddRaftVoterResult addRaftVoter( final KafkaFutureImpl future = new KafkaFutureImpl<>(); final long now = time.milliseconds(); final Call call = new Call( - "addRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) { + "addRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) { @Override AddRaftVoterRequest.Builder createRequest(int timeoutMs) { @@ -4951,12 +4977,12 @@ AddRaftVoterRequest.Builder createRequest(int timeoutMs) { setHost(endpoint.host()). setPort(endpoint.port()))); return new AddRaftVoterRequest.Builder( - new AddRaftVoterRequestData(). - setClusterId(options.clusterId().orElse(null)). - setTimeoutMs(timeoutMs). - setVoterId(voterId) . - setVoterDirectoryId(voterDirectoryId). - setListeners(listeners)); + new AddRaftVoterRequestData(). + setClusterId(options.clusterId().orElse(null)). + setTimeoutMs(timeoutMs). + setVoterId(voterId). + setVoterDirectoryId(voterDirectoryId). + setListeners(listeners)); } @Override @@ -4993,14 +5019,14 @@ public RemoveRaftVoterResult removeRaftVoter( final KafkaFutureImpl future = new KafkaFutureImpl<>(); final long now = time.milliseconds(); final Call call = new Call( - "removeRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) { + "removeRaftVoter", calcDeadlineMs(now, options.timeoutMs()), provider) { @Override RemoveRaftVoterRequest.Builder createRequest(int timeoutMs) { return new RemoveRaftVoterRequest.Builder( new RemoveRaftVoterRequestData(). setClusterId(options.clusterId().orElse(null)). - setVoterId(voterId) . + setVoterId(voterId). setVoterDirectoryId(voterDirectoryId)); } @@ -5010,8 +5036,8 @@ void handleResponse(AbstractResponse response) { RemoveRaftVoterResponse addResponse = (RemoveRaftVoterResponse) response; if (addResponse.data().errorCode() != Errors.NONE.code()) { ApiError error = new ApiError( - addResponse.data().errorCode(), - addResponse.data().errorMessage()); + addResponse.data().errorCode(), + addResponse.data().errorMessage()); future.completeExceptionally(error.exception()); } else { future.complete(null); From 2330800985af91f707d9284dd5be324fdf043cf4 Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Sat, 21 Jun 2025 22:42:34 +0200 Subject: [PATCH 3/3] refactor: Extract and move ConnectionQuotas to server module --- .../scala/kafka/network/SocketServer.scala | 436 +------------- .../kafka/network/ConnectionQuotas.java | 534 ++++++++++++++++++ .../server/config/AbstractKafkaConfig.java | 65 ++- 3 files changed, 600 insertions(+), 435 deletions(-) create mode 100644 server/src/main/java/org/apache/kafka/network/ConnectionQuotas.java diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 884c00002c5b5..8c2367820d4ce 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -25,6 +25,7 @@ import java.util import java.util.Optional import java.util.concurrent._ import java.util.concurrent.atomic._ +import kafka.network.connectionQuotas import kafka.network.Processor._ import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse} import kafka.network.SocketServer._ @@ -1279,437 +1280,4 @@ private[kafka] class Processor( metrics.removeMetric(expiredConnectionsKilledCountMetricName) } } -} - -class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extends Logging with AutoCloseable { - - @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp - @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) } - @volatile private var brokerMaxConnections = config.maxConnections - private val interBrokerListenerName = config.interBrokerListenerName - private val counts = mutable.Map[InetAddress, Int]() - - // Listener counts and configs are synchronized on `counts` - private val listenerCounts = mutable.Map[ListenerName, Int]() - private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() - @volatile private var totalCount = 0 - // updates to defaultConnectionRatePerIp or connectionRatePerIp must be synchronized on `counts` - @volatile private var defaultConnectionRatePerIp = QuotaConfig.IP_CONNECTION_RATE_DEFAULT.intValue() - private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() - // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, ConnectionQuotaEntity.brokerQuotaEntity()) - private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaConfig.quotaWindowSizeSeconds.toLong) - - def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { - counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) - - recordIpConnectionMaybeThrottle(listenerName, address) - val count = counts.getOrElseUpdate(address, 0) - counts.put(address, count + 1) - totalCount += 1 - if (listenerCounts.contains(listenerName)) { - listenerCounts.put(listenerName, listenerCounts(listenerName) + 1) - } - val max = maxConnectionsPerIpOverrides.getOrElse(address, defaultMaxConnectionsPerIp) - if (count >= max) - throw new TooManyConnectionsException(address, max) - } - } - - private[network] def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = { - defaultMaxConnectionsPerIp = maxConnectionsPerIp - } - - private[network] def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = { - maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) } - } - - private[network] def updateBrokerMaxConnections(maxConnections: Int): Unit = { - counts.synchronized { - brokerMaxConnections = maxConnections - counts.notifyAll() - } - } - - private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { - // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if - // the rate limit increases, because it is just one connection per listener and the code is simpler that way - updateConnectionRateQuota(maxConnectionRate, ConnectionQuotaEntity.brokerQuotaEntity()) - } - - /** - * Update the connection rate quota for a given IP and updates quota configs for updated IPs. - * If an IP is given, metric config will be updated only for the given IP, otherwise - * all metric configs will be checked and updated if required. - * - * @param ip ip to update or default if None - * @param maxConnectionRate new connection rate, or resets entity to default if None - */ - def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: Option[Int]): Unit = synchronized { - def isIpConnectionRateMetric(metricName: MetricName) = { - metricName.name == ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME && - metricName.group == MetricsGroup && - metricName.tags.containsKey(ConnectionQuotaEntity.IP_METRIC_TAG) - } - - def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { - quotaLimit != metric.config.quota.bound - } - - ip match { - case Some(address) => - // synchronize on counts to ensure reading an IP connection rate quota and creating a quota config is atomic - counts.synchronized { - maxConnectionRate match { - case Some(rate) => - info(s"Updating max connection rate override for $address to $rate") - connectionRatePerIp.put(address, rate) - case None => - info(s"Removing max connection rate override for $address") - connectionRatePerIp.remove(address) - } - } - updateConnectionRateQuota(connectionRateForIp(address), ConnectionQuotaEntity.ipQuotaEntity(address)) - case None => - // synchronize on counts to ensure reading an IP connection rate quota and creating a quota config is atomic - counts.synchronized { - defaultConnectionRatePerIp = maxConnectionRate.getOrElse(QuotaConfig.IP_CONNECTION_RATE_DEFAULT.intValue()) - } - info(s"Updated default max IP connection rate to $defaultConnectionRatePerIp") - metrics.metrics.forEach { (metricName, metric) => - if (isIpConnectionRateMetric(metricName)) { - val quota = connectionRateForIp(InetAddress.getByName(metricName.tags.get(ConnectionQuotaEntity.IP_METRIC_TAG))) - if (shouldUpdateQuota(metric, quota)) { - debug(s"Updating existing connection rate quota config for ${metricName.tags} to $quota") - metric.config(rateQuotaMetricConfig(quota)) - } - } - } - } - } - - // Visible for testing - def connectionRateForIp(ip: InetAddress): Int = { - connectionRatePerIp.getOrDefault(ip, defaultConnectionRatePerIp) - } - - private[network] def addListener(config: KafkaConfig, listenerName: ListenerName): Unit = { - counts.synchronized { - if (!maxConnectionsPerListener.contains(listenerName)) { - val newListenerQuota = new ListenerConnectionQuota(counts, listenerName) - maxConnectionsPerListener.put(listenerName, newListenerQuota) - listenerCounts.put(listenerName, 0) - config.addReconfigurable(newListenerQuota) - newListenerQuota.configure(config.valuesWithPrefixOverride(listenerName.configPrefix)) - } - counts.notifyAll() - } - } - - private[network] def removeListener(config: KafkaConfig, listenerName: ListenerName): Unit = { - counts.synchronized { - maxConnectionsPerListener.remove(listenerName).foreach { listenerQuota => - listenerCounts.remove(listenerName) - // once listener is removed from maxConnectionsPerListener, no metrics will be recorded into listener's sensor - // so it is safe to remove sensor here - listenerQuota.close() - counts.notifyAll() // wake up any waiting acceptors to close cleanly - config.removeReconfigurable(listenerQuota) - } - } - } - - def dec(listenerName: ListenerName, address: InetAddress): Unit = { - counts.synchronized { - val count = counts.getOrElse(address, - throw new IllegalArgumentException(s"Attempted to decrease connection count for address with no connections, address: $address")) - if (count == 1) - counts.remove(address) - else - counts.put(address, count - 1) - - if (totalCount <= 0) - error(s"Attempted to decrease total connection count for broker with no connections") - totalCount -= 1 - - if (maxConnectionsPerListener.contains(listenerName)) { - val listenerCount = listenerCounts(listenerName) - if (listenerCount == 0) - error(s"Attempted to decrease connection count for listener $listenerName with no connections") - else - listenerCounts.put(listenerName, listenerCount - 1) - } - counts.notifyAll() // wake up any acceptors waiting to process a new connection since listener connection limit was reached - } - } - - def get(address: InetAddress): Int = counts.synchronized { - counts.getOrElse(address, 0) - } - - private def waitForConnectionSlot(listenerName: ListenerName, - acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { - counts.synchronized { - val startThrottleTimeMs = time.milliseconds - val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startThrottleTimeMs), 0) - - if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) { - val startNs = time.nanoseconds - val endThrottleTimeMs = startThrottleTimeMs + throttleTimeMs - var remainingThrottleTimeMs = throttleTimeMs - do { - counts.wait(remainingThrottleTimeMs) - remainingThrottleTimeMs = math.max(endThrottleTimeMs - time.milliseconds, 0) - } while (remainingThrottleTimeMs > 0 || !connectionSlotAvailable(listenerName)) - acceptorBlockedPercentMeter.mark(time.nanoseconds - startNs) - } - } - } - - // This is invoked in every poll iteration and we close one LRU connection in an iteration - // if necessary - def maxConnectionsExceeded(listenerName: ListenerName): Boolean = { - totalCount > brokerMaxConnections && !protectedListener(listenerName) - } - - private def connectionSlotAvailable(listenerName: ListenerName): Boolean = { - if (listenerCounts(listenerName) >= maxListenerConnections(listenerName)) - false - else if (protectedListener(listenerName)) - true - else - totalCount < brokerMaxConnections - } - - private def protectedListener(listenerName: ListenerName): Boolean = - interBrokerListenerName == listenerName && listenerCounts.size > 1 - - private def maxListenerConnections(listenerName: ListenerName): Int = - maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) - - /** - * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide - * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp - * - * @param listenerName listener for which calculate the delay - * @param timeMs current time in milliseconds - * @return delay in milliseconds - */ - private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { - def recordAndGetListenerThrottleTime(minThrottleTimeMs: Int): Int = { - maxConnectionsPerListener - .get(listenerName) - .map { listenerQuota => - val listenerThrottleTimeMs = recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs) - val throttleTimeMs = math.max(minThrottleTimeMs, listenerThrottleTimeMs) - // record throttle time due to hitting connection rate quota - if (throttleTimeMs > 0) { - listenerQuota.listenerConnectionRateThrottleSensor.record(throttleTimeMs.toDouble, timeMs) - } - throttleTimeMs - } - .getOrElse(0) - } - - if (protectedListener(listenerName)) { - recordAndGetListenerThrottleTime(0) - } else { - val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) - recordAndGetListenerThrottleTime(brokerThrottleTimeMs) - } - } - - /** - * Record IP throttle time on the corresponding listener. To avoid over-recording listener/broker connection rate, we - * also un-record the listener and broker connection if the IP gets throttled. - * - * @param listenerName listener to un-record connection - * @param throttleMs IP throttle time to record for listener - * @param timeMs current time in milliseconds - */ - private def updateListenerMetrics(listenerName: ListenerName, throttleMs: Long, timeMs: Long): Unit = { - if (!protectedListener(listenerName)) { - brokerConnectionRateSensor.record(-1.0, timeMs, false) - } - maxConnectionsPerListener - .get(listenerName) - .foreach { listenerQuota => - listenerQuota.ipConnectionRateThrottleSensor.record(throttleMs.toDouble, timeMs) - listenerQuota.connectionRateSensor.record(-1.0, timeMs, false) - } - } - - /** - * Calculates the delay needed to bring the observed connection creation rate to the IP limit. - * If the connection would cause an IP quota violation, un-record the connection for both IP, - * listener, and broker connection rate and throw a ConnectionThrottledException. Calls to - * this function must be performed with the counts lock to ensure that reading the IP - * connection rate quota and creating the sensor's metric config is atomic. - * - * @param listenerName listener to unrecord connection if throttled - * @param address ip address to record connection - */ - private def recordIpConnectionMaybeThrottle(listenerName: ListenerName, address: InetAddress): Unit = { - val connectionRateQuota = connectionRateForIp(address) - val quotaEnabled = connectionRateQuota != QuotaConfig.IP_CONNECTION_RATE_DEFAULT - if (quotaEnabled) { - val sensor = getOrCreateConnectionRateQuotaSensor(connectionRateQuota, ConnectionQuotaEntity.ipQuotaEntity(address)) - val timeMs = time.milliseconds - val throttleMs = recordAndGetThrottleTimeMs(sensor, timeMs) - if (throttleMs > 0) { - trace(s"Throttling $address for $throttleMs ms") - // unrecord the connection since we won't accept the connection - sensor.record(-1.0, timeMs, false) - updateListenerMetrics(listenerName, throttleMs, timeMs) - throw new ConnectionThrottledException(address, timeMs, throttleMs) - } - } - } - - /** - * Records a new connection into a given connection acceptance rate sensor 'sensor' and returns throttle time - * in milliseconds if quota got violated - * @param sensor sensor to record connection - * @param timeMs current time in milliseconds - * @return throttle time in milliseconds if quota got violated, otherwise 0 - */ - private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = { - try { - sensor.record(1.0, timeMs) - 0 - } catch { - case e: QuotaViolationException => - val throttleTimeMs = QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs).toInt - debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms") - throttleTimeMs - } - } - - /** - * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given - * listener or broker-wide, if listener is not provided. - * @param quotaLimit connection creation rate quota - * @param connectionQuotaEntity entity to create the sensor for - */ - private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, connectionQuotaEntity: ConnectionQuotaEntity): Sensor = { - Option(metrics.getSensor(connectionQuotaEntity.sensorName)).getOrElse { - val sensor = metrics.sensor( - connectionQuotaEntity.sensorName, - rateQuotaMetricConfig(quotaLimit), - connectionQuotaEntity.sensorExpiration - ) - sensor.add(connectionRateMetricName(connectionQuotaEntity), new Rate, null) - sensor - } - } - - /** - * Updates quota configuration for a given connection quota entity - */ - private def updateConnectionRateQuota(quotaLimit: Int, connectionQuotaEntity: ConnectionQuotaEntity): Unit = { - Option(metrics.metric(connectionRateMetricName(connectionQuotaEntity))).foreach { metric => - metric.config(rateQuotaMetricConfig(quotaLimit)) - info(s"Updated ${connectionQuotaEntity.metricName} max connection creation rate to $quotaLimit") - } - } - - private def connectionRateMetricName(connectionQuotaEntity: ConnectionQuotaEntity): MetricName = { - metrics.metricName( - connectionQuotaEntity.metricName, - MetricsGroup, - s"Tracking rate of accepting new connections (per second)", - connectionQuotaEntity.metricTags) - } - - private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = { - new MetricConfig() - .timeWindow(config.quotaConfig.quotaWindowSizeSeconds.toLong, TimeUnit.SECONDS) - .samples(config.quotaConfig.numQuotaSamples) - .quota(new Quota(quotaLimit, true)) - } - - def close(): Unit = { - metrics.removeSensor(brokerConnectionRateSensor.name) - maxConnectionsPerListener.values.foreach(_.close()) - } - - class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable with AutoCloseable { - @volatile private var _maxConnections = Int.MaxValue - private[network] val connectionRateSensor = getOrCreateConnectionRateQuotaSensor(Int.MaxValue, ConnectionQuotaEntity.listenerQuotaEntity(listener.value)) - private[network] val listenerConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ConnectionQuotaEntity.LISTENER_THROTTLE_PREFIX) - private[network] val ipConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ConnectionQuotaEntity.IP_THROTTLE_PREFIX) - - def maxConnections: Int = _maxConnections - - override def listenerName(): ListenerName = listener - - override def configure(configs: util.Map[String, _]): Unit = { - _maxConnections = maxConnections(configs) - updateConnectionRateQuota(maxConnectionCreationRate(configs), ConnectionQuotaEntity.listenerQuotaEntity(listener.value)) - } - - override def reconfigurableConfigs(): util.Set[String] = { - SocketServer.ListenerReconfigurableConfigs.asJava - } - - override def validateReconfiguration(configs: util.Map[String, _]): Unit = { - val value = maxConnections(configs) - if (value <= 0) - throw new ConfigException(s"Invalid ${SocketServerConfigs.MAX_CONNECTIONS_CONFIG} $value") - - val rate = maxConnectionCreationRate(configs) - if (rate <= 0) - throw new ConfigException(s"Invalid ${SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG} $rate") - } - - override def reconfigure(configs: util.Map[String, _]): Unit = { - lock.synchronized { - _maxConnections = maxConnections(configs) - updateConnectionRateQuota(maxConnectionCreationRate(configs), ConnectionQuotaEntity.listenerQuotaEntity(listener.value)) - lock.notifyAll() - } - } - - def close(): Unit = { - metrics.removeSensor(connectionRateSensor.name) - metrics.removeSensor(listenerConnectionRateThrottleSensor.name) - metrics.removeSensor(ipConnectionRateThrottleSensor.name) - } - - private def maxConnections(configs: util.Map[String, _]): Int = { - Option(configs.get(SocketServerConfigs.MAX_CONNECTIONS_CONFIG)).map(_.toString.toInt).getOrElse(Int.MaxValue) - } - - private def maxConnectionCreationRate(configs: util.Map[String, _]): Int = { - Option(configs.get(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)).map(_.toString.toInt).getOrElse(Int.MaxValue) - } - - /** - * Creates sensor for tracking the average throttle time on this listener due to hitting broker/listener connection - * rate or IP connection rate quota. The average is out of all throttle times > 0, which is consistent with the - * bandwidth and request quota throttle time metrics. - */ - private def createConnectionRateThrottleSensor(throttlePrefix: String): Sensor = { - val sensor = metrics.sensor(s"${throttlePrefix}ConnectionRateThrottleTime-${listener.value}") - val metricName = metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time", - MetricsGroup, - "Tracking average throttle-time, out of non-zero throttle times, per listener", - Map(ListenerMetricTag -> listener.value).asJava) - sensor.add(metricName, new Avg) - sensor - } - } - - /** - * Close `channel` and decrement the connection count. - */ - def closeChannel(log: Logging, listenerName: ListenerName, channel: SocketChannel): Unit = { - if (channel != null) { - log.debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress}") - dec(listenerName, channel.socket.getInetAddress) - closeSocket(channel) - } - } - -} +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/kafka/network/ConnectionQuotas.java b/server/src/main/java/org/apache/kafka/network/ConnectionQuotas.java new file mode 100644 index 0000000000000..f2728e08edb42 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/network/ConnectionQuotas.java @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import kafka.network.SocketServer; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.metrics.*; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.ListenerReconfigurable; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.config.AbstractKafkaConfig; +import org.apache.kafka.server.config.QuotaConfig; +import org.apache.kafka.server.quota.QuotaUtils; + +import com.yammer.metrics.core.Meter; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ConnectionQuotas extends AbstractKafkaConfig implements Logging, AutoCloseable { + private AbstractKafkaConfig config; + private Time time; + private Metrics metrics; + private volatile int defaultMaxConnectionsPerIp; + private volatile HashMap maxConnectionsPerIpOverrides = new HashMap<>(); + private volatile int brokerMaxConnections; + private final ListenerName interBrokerListenerName; + private final HashMap counts = new HashMap<>(); + + // Listener counts and configs are synchronized on `counts` + private final HashMap listenerCounts = new HashMap<>(); + private final HashMap maxConnectionsPerListener = new HashMap<>(); + private volatile int totalCount = 0; + // updates to defaultConnectionRatePerIp or connectionRatePerIp must be synchronized on `counts` + private volatile int defaultConnectionRatePerIp = QuotaConfig.IP_CONNECTION_RATE_DEFAULT.intValue(); + private final ConcurrentHashMap connectionRatePerIp = new ConcurrentHashMap<>(); + // sensor that tracks broker-wide connection creation rate and limit (quota) + private final Sensor brokerConnectionRateSensor; + private final long maxThrottleTimeMs; + private static final String MetricsGroup = "socket-server-metrics"; + private static final String ListenerMetricTag = "listener"; + + public ConnectionQuotas(AbstractKafkaConfig config, Time time, Metrics metrics) { + this.config = config; + this.time = time; + this.metrics = metrics; + this.defaultMaxConnectionsPerIp = config.maxConnectionsPerIp; + this.maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides() + .entrySet() + .stream() + .collect(Collectors.toMap(entry -> {return InetAddress.getByName(entry.getKey());}, Map.Entry::getValue)); + this.brokerMaxConnections = config.maxConnections(); + this.interBrokerListenerName = config.interBrokerListenerName(); + this.brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor( + config.maxConnectionCreationRate(), + ConnectionQuotaEntity.brokerQuotaEntity()); + this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaConfig().quotaWindowSizeSeconds()); + } + + public void inc(ListenerName listenerName, InetAddress address, Meter acceptorBlockedPercentMeter) { + synchronized (counts) { + waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter); + recordIpConnectionMaybeThrottle(listenerName, address); + int count = counts.getOrDefault(address, 0); + counts.put(address, count + 1); + totalCount += 1; + if (listenerCounts.containsKey(listenerName)) { + listenerCounts.put(listenerName, listenerCounts.get(listenerName) + 1); + } + int max = this.maxConnectionsPerIpOverrides.getOrDefault(address, this.defaultMaxConnectionsPerIp); + if (count >= max) + throw new TooManyConnectionsException(address, max); + } + } + + void updateMaxConnectionsPerIp(int maxConnectionsPerIp) { + this.defaultMaxConnectionsPerIp = maxConnectionsPerIp; + } + + void updateMaxConnectionsPerIpOverride(Map overrideQuotas) { + overrideQuotas + .entrySet() + .stream() + .collect(Collectors.toMap(entry -> { + try {return InetAddress.getByName(entry.getKey());} + catch (UnknownHostException e) {throw new RuntimeException(e);}}, Map.Entry::getValue)); + } + + void updateBrokerMaxConnections(int maxConnections) { + synchronized (counts) { + brokerMaxConnections = maxConnections; + counts.notifyAll(); + } + } + + void updateBrokerMaxConnectionRate(int maxConnectionRate) { + // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if + // the rate limit increases, because it is just one connection per listener and the code is simpler that way + updateConnectionRateQuota(maxConnectionRate, ConnectionQuotaEntity.brokerQuotaEntity()); + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if Optional.empty() + * @param maxConnectionRate new connection rate, or resets entity to default if Optional.empty() + */ + public void updateIpConnectionRateQuota(Optional ip, Optional maxConnectionRate) { + synchronized (this) { + if (ip.isPresent()) { + InetAddress address = ip.get(); + // synchronize on counts to ensure reading an IP connection rate quota and creating a quota config is atomic + synchronized (counts) { + if (maxConnectionRate.isPresent()) { + int rate = maxConnectionRate.get(); + info("Updating max connection rate override for " + address + " to " + rate); + connectionRatePerIp.put(address, rate); + } else { + info("Removing max connection rate override for " + address); + connectionRatePerIp.remove(address); + } + } + updateConnectionRateQuota(connectionRateForIp(address), ConnectionQuotaEntity.ipQuotaEntity(address)); + } else { + // synchronize on counts to ensure reading an IP connection rate quota and creating a quota config is atomic + synchronized (counts) { + this.defaultConnectionRatePerIp = maxConnectionRate.orElse(QuotaConfig.IP_CONNECTION_RATE_DEFAULT.intValue()); + } + info("Updated default max IP connection rate to " + defaultConnectionRatePerIp); + metrics.metrics().forEach((metricName, metric) -> { + if (isIpConnectionRateMetric(metricName)) { + int quota = connectionRateForIp(InetAddress.getByName(metricName.tags().get(ConnectionQuotaEntity.IP_METRIC_TAG))); + if (shouldUpdateQuota(metric, quota)) { + debug("Updating existing connection rate quota config for " + metricName.tags() + " to " + quota); + metric.config(rateQuotaMetricConfig(quota)); + } + } + } + ); + } + } + } + + private boolean isIpConnectionRateMetric(MetricName metricName) { + return metricName.name().equals(ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME) && + metricName.group().equals(MetricsGroup) && + metricName.tags().containsKey(ConnectionQuotaEntity.IP_METRIC_TAG); + } + + private boolean shouldUpdateQuota(KafkaMetric metric, int quotaLimit) { + return quotaLimit != metric.config().quota().bound(); + } + + // Visible for testing + public int connectionRateForIp(InetAddress ip) { + return connectionRatePerIp.getOrDefault(ip, defaultConnectionRatePerIp); + } + + void addListener(AbstractKafkaConfig config, ListenerName listenerName) { + synchronized (counts) { + if (!maxConnectionsPerListener.containsKey(listenerName)) { + ListenerConnectionQuota newListenerQuota = new ListenerConnectionQuota(counts, listenerName); + maxConnectionsPerListener.put(listenerName, newListenerQuota); + listenerCounts.put(listenerName, 0); + config.addReconfigurable(newListenerQuota); + newListenerQuota.configure(config.valuesWithPrefixOverride(listenerName.configPrefix())); + } + counts.notifyAll(); + } + } + + void removeListener(AbstractKafkaConfig config, ListenerName listenerName) { + synchronized (counts) { + ListenerConnectionQuota listenerQuota = maxConnectionsPerListener.remove(listenerName); + if (listenerQuota != null) { + listenerCounts.remove(listenerName); + // once listener is removed from maxConnectionsPerListener, no metrics will be recorded into listener's sensor + // so it is safe to remove sensor here + listenerQuota.close(); + counts.notifyAll(); // wake up any waiting acceptors to close cleanly + config.removeReconfigurable(listenerQuota); + } + } + } + + public void dec(ListenerName listenerName, InetAddress address) { + synchronized (counts) { + Integer count = counts.get(address); + if (count == null) + throw new IllegalArgumentException("Attempted to decrease connection count for address with no connections, address: " + address); + if (count == 1) + counts.remove(address); + else + counts.put(address, count - 1); + + if (totalCount <= 0) + error("Attempted to decrease total connection count for broker with no connections"); + totalCount -= 1; + + if (maxConnectionsPerListener.containsKey(listenerName)) { + int listenerCount = listenerCounts.get(listenerName); + if (listenerCount == 0) + error("Attempted to decrease connection count for listener " + listenerName + " with no connections"); + else + listenerCounts.put(listenerName, listenerCount - 1); + } + counts.notifyAll(); // wake up any acceptors waiting to process a new connection since listener connection limit was reached + } + } + + public Integer get(InetAddress address) { + synchronized (counts) { + return counts.getOrDefault(address, 0); + } + } + + private void waitForConnectionSlot(ListenerName listenerName, Meter acceptorBlockedPercentMeter) { + synchronized (counts) { + long startThrottleTimeMs = time.milliseconds(); + long throttleTimeMs = Math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startThrottleTimeMs), 0); + + if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) { + long startNs = time.nanoseconds(); + long endThrottleTimeMs = startThrottleTimeMs + throttleTimeMs; + long remainingThrottleTimeMs = throttleTimeMs; + do { + counts.wait(remainingThrottleTimeMs); + remainingThrottleTimeMs = Math.max(endThrottleTimeMs - time.milliseconds(), 0); + } while (remainingThrottleTimeMs > 0 || !connectionSlotAvailable(listenerName)); + acceptorBlockedPercentMeter.mark(time.nanoseconds() - startNs); + } + } + } + + // This is invoked in every poll iteration and we close one LRU connection in an iteration + // if necessary + public boolean maxConnectionsExceeded(ListenerName listenerName) { + return totalCount > brokerMaxConnections && !protectedListener(listenerName); + } + + private boolean connectionSlotAvailable(ListenerName listenerName) { + if (listenerCounts.get(listenerName) >= maxListenerConnections(listenerName)) + return false; + else if (protectedListener(listenerName)) + return true; + else + return totalCount < brokerMaxConnections; + } + + private boolean protectedListener(ListenerName listenerName) { + return interBrokerListenerName.equals(listenerName) && listenerCounts.size() > 1; + } + + private int maxListenerConnections(ListenerName listenerName) { + ListenerConnectionQuota quota = maxConnectionsPerListener.get(listenerName); + return quota != null ? quota.maxConnections : Integer.MAX_VALUE; + } + + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private long recordConnectionAndGetThrottleTimeMs(ListenerName listenerName, long timeMs) { + if (protectedListener(listenerName)) { + return recordAndGetListenerThrottleTime(0, timeMs); + } else { + long brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs); + return recordAndGetListenerThrottleTime(brokerThrottleTimeMs, timeMs); + } + } + + private long recordAndGetListenerThrottleTime(long minThrottleTimeMs, long timeMs) { + ListenerConnectionQuota listenerQuota = maxConnectionsPerListener.get(listenerName); + if (listenerQuota != null) { + long listenerThrottleTimeMs = recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs); + long throttleTimeMs = Math.max(minThrottleTimeMs, listenerThrottleTimeMs); + // record throttle time due to hitting connection rate quota + if (throttleTimeMs > 0) { + listenerQuota.listenerConnectionRateThrottleSensor.record((double) throttleTimeMs, timeMs); + } + return throttleTimeMs; + } + return 0; + } + + /** + * Record IP throttle time on the corresponding listener. To avoid over-recording listener/broker connection rate, we + * also un-record the listener and broker connection if the IP gets throttled. + * + * @param listenerName listener to un-record connection + * @param throttleMs IP throttle time to record for listener + * @param timeMs current time in milliseconds + */ + private void updateListenerMetrics(ListenerName listenerName, long throttleMs, long timeMs) { + if (!protectedListener(listenerName)) { + brokerConnectionRateSensor.record(-1.0, timeMs, false); + } + + ListenerConnectionQuota listenerQuota = maxConnectionsPerListener.get(listenerName); + if (listenerQuota != null) { + listenerQuota.ipConnectionRateThrottleSensor.record((double) throttleMs, timeMs); + listenerQuota.connectionRateSensor.record(-1.0, timeMs, false); + } + } + + /** + * Calculates the delay needed to bring the observed connection creation rate to the IP limit. + * If the connection would cause an IP quota violation, un-record the connection for both IP, + * listener, and broker connection rate and throw a ConnectionThrottledException. Calls to + * this function must be performed with the counts lock to ensure that reading the IP + * connection rate quota and creating the sensor's metric config is atomic. + * + * @param listenerName listener to unrecord connection if throttled + * @param address ip address to record connection + */ + private void recordIpConnectionMaybeThrottle(ListenerName listenerName, InetAddress address) { + int connectionRateQuota = connectionRateForIp(address); + boolean quotaEnabled = connectionRateQuota != QuotaConfig.IP_CONNECTION_RATE_DEFAULT; + if (quotaEnabled) { + Sensor sensor = getOrCreateConnectionRateQuotaSensor(connectionRateQuota, ConnectionQuotaEntity.ipQuotaEntity(address)); + long timeMs = time.milliseconds(); + long throttleMs = recordAndGetThrottleTimeMs(sensor, timeMs); + if (throttleMs > 0) { + trace("Throttling " + address + " for " + throttleMs + " ms"); + // unrecord the connection since we won't accept the connection + sensor.record(-1.0, timeMs, false); + updateListenerMetrics(listenerName, throttleMs, timeMs); + throw new ConnectionThrottledException(address, timeMs, throttleMs); + } + } + } + + /** + * Records a new connection into a given connection acceptance rate sensor 'sensor' and returns throttle time + * in milliseconds if quota got violated + * @param sensor sensor to record connection + * @param timeMs current time in milliseconds + * @return throttle time in milliseconds if quota got violated, otherwise 0 + */ + private long recordAndGetThrottleTimeMs(Sensor sensor, long timeMs) { + try { + sensor.record(1.0, timeMs); + return 0; + } catch (QuotaViolationException e) { + long throttleTimeMs = QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs).intValue(); + debug("Quota violated for sensor (" + sensor.name() + "). Delay time: " + throttleTimeMs + " ms"); + return throttleTimeMs; + } + } + + /** + * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given + * listener or broker-wide, if listener is not provided. + * @param quotaLimit connection creation rate quota + * @param connectionQuotaEntity entity to create the sensor for + */ + private Sensor getOrCreateConnectionRateQuotaSensor(int quotaLimit, ConnectionQuotaEntity connectionQuotaEntity) { + Sensor sensor = metrics.getSensor(connectionQuotaEntity.sensorName()); + if (sensor == null) { + sensor = metrics.sensor( + connectionQuotaEntity.sensorName(), + rateQuotaMetricConfig(quotaLimit), + connectionQuotaEntity.sensorExpiration() + ); + sensor.add(connectionRateMetricName(connectionQuotaEntity), new Rate(), null); + } + return sensor; + } + + /** + * Updates quota configuration for a given connection quota entity + */ + private void updateConnectionRateQuota(int quotaLimit, ConnectionQuotaEntity connectionQuotaEntity) { + KafkaMetric metric = metrics.metric(connectionRateMetricName(connectionQuotaEntity)); + if (metric != null) { + metric.config(rateQuotaMetricConfig(quotaLimit)); + info("Updated " + connectionQuotaEntity.metricName() + " max connection creation rate to " + quotaLimit); + } + } + + private MetricName connectionRateMetricName(ConnectionQuotaEntity connectionQuotaEntity) { + return metrics.metricName( + connectionQuotaEntity.metricName(), + MetricsGroup, + "Tracking rate of accepting new connections (per second)", + connectionQuotaEntity.metricTags()); + } + + private MetricConfig rateQuotaMetricConfig(int quotaLimit) { + return new MetricConfig() + .timeWindow((long) (this.config.quotaConfig().quotaWindowSizeSeconds()), TimeUnit.SECONDS) + .samples(this.config.quotaConfig().numQuotaSamples()) + .quota(new Quota(quotaLimit, true)); + } + +// @Override + public void close() { + metrics.removeSensor(brokerConnectionRateSensor.name()); + for (ListenerConnectionQuota quota : maxConnectionsPerListener.values()) { + quota.close(); + } + } + /** + * * Close `channel` and decrement the connection count. + * */ + public void closeChannel(Logging log, ListenerName listenerName, SocketChannel channel) { + if (channel != null) { + log.debug("Closing connection from " + channel.socket().getRemoteSocketAddress()); + dec(listenerName, channel.socket().getInetAddress()); + SocketServer.closeSocket(channel); + } + } + public class ListenerConnectionQuota extends AutoCloseable implements ListenerReconfigurable { + private final Object lock; + private final ListenerName listener; + private volatile int _maxConnections = Integer.MAX_VALUE; + protected final Sensor connectionRateSensor; + protected final Sensor listenerConnectionRateThrottleSensor; + protected final Sensor ipConnectionRateThrottleSensor; + + public ListenerConnectionQuota(Object lock, ListenerName listener) { + this.lock = lock; + this.listener = listener; + this.connectionRateSensor = getOrCreateConnectionRateQuotaSensor(Integer.MAX_VALUE, ConnectionQuotaEntity.listenerQuotaEntity(listener.value())); + this.listenerConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ConnectionQuotaEntity.LISTENER_THROTTLE_PREFIX); + this.ipConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ConnectionQuotaEntity.IP_THROTTLE_PREFIX); + } + + public int maxConnections() { + return _maxConnections; + } + + @Override + public ListenerName listenerName() { + return listener; + } + + @Override + public void configure(Map configs) { + _maxConnections = maxConnections(configs); + updateConnectionRateQuota(maxConnectionCreationRate(configs), ConnectionQuotaEntity.listenerQuotaEntity(listener.value())); + } + + @Override + public Set reconfigurableConfigs() { + return Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG); + } + + @Override + public void validateReconfiguration(Map configs) { + int value = maxConnections(configs); + if (value <= 0) + throw new ConfigException("Invalid " + SocketServerConfigs.MAX_CONNECTIONS_CONFIG + " " + value); + int rate = maxConnectionCreationRate(configs); + if (rate <= 0) + throw new ConfigException("Invalid " + SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG + " " + rate); + } + + @Override + public void reconfigure(Map configs) { + synchronized (lock) { + _maxConnections = maxConnections(configs); + updateConnectionRateQuota(maxConnectionCreationRate(configs), ConnectionQuotaEntity.listenerQuotaEntity(listener.value())); + lock.notifyAll(); + } + } + + public void close() { + metrics.removeSensor(connectionRateSensor.name()); + metrics.removeSensor(listenerConnectionRateThrottleSensor.name()); + metrics.removeSensor(ipConnectionRateThrottleSensor.name()); + } + + private int maxConnections(Map configs) { + Object value = configs.get(SocketServerConfigs.MAX_CONNECTIONS_CONFIG); + return value != null ? Integer.parseInt(value.toString()) : Integer.MAX_VALUE; + } + + private int maxConnectionCreationRate(Map configs) { + Object value = configs.get(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG); + return value != null ? Integer.parseInt(value.toString()) : Integer.MAX_VALUE; + } + + /** + * Creates sensor for tracking the average throttle time on this listener due to hitting broker/listener connection + * rate or IP connection rate quota. The average is out of all throttle times > 0, which is consistent with the + * bandwidth and request quota throttle time metrics. + */ + private Sensor createConnectionRateThrottleSensor(String throttlePrefix) { + Sensor sensor = metrics.sensor(throttlePrefix + "ConnectionRateThrottleTime-" + listener.value()); + Map tags = new HashMap<>(); + tags.put(ListenerMetricTag, listener.value()); + MetricName metricName = metrics.metricName( + throttlePrefix + "connection-accept-throttle-time", + MetricsGroup, + "Tracking average throttle-time, out of non-zero throttle times, per listener", + tags); + sensor.add(metricName, new Avg()); + return sensor; + } + } +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index 16d617227273e..09c3ef2518314 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.server.config; +import kafka.server.dynamicConfig; + +import org.apache.kafka.common.Reconfigurable; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -33,6 +37,8 @@ import org.apache.kafka.network.SocketServerConfigs; import org.apache.kafka.raft.MetadataLogConfig; import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.server.config.QuotaConfig; +import org.apache.kafka.server.util.Csv; import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.apache.kafka.server.metrics.MetricConfigs; import org.apache.kafka.server.util.Csv; @@ -52,6 +58,7 @@ * For more details check KAFKA-15853 */ public abstract class AbstractKafkaConfig extends AbstractConfig { + private volatile AbstractKafkaConfig currentConfig = this public static final ConfigDef CONFIG_DEF = Utils.mergeConfigs(List.of( RemoteLogManagerConfig.configDef(), ServerConfigs.CONFIG_DEF, @@ -98,6 +105,62 @@ public int backgroundThreads() { return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG); } + public Map maxConnectionsPerIpOverrides() { + return getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, + getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)) + .entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> Integer.valueOf(entry.getValue()) + )); + } + + public Integer maxConnections(){ + return getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG); + } + + public Integer maxConnectionsPerIp(){ + return getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG); + } + + public ListenerName interBrokerListenerName(){ + return getInterBrokerListenerNameAndSecurityProtocol._1; + } + + public Integer maxConnectionCreationRate(){ + return getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG); + } + + public QuotaConfig quotaConfig(){ + return _quotaConfig; + } + + public void addReconfigurable (Reconfigurable reconfigurable){ + dynamicConfig.addReconfigurable(reconfigurable); + } + + public void removeReconfigurable(Reconfigurable reconfigurable){ + dynamicConfig.removeReconfigurable(reconfigurable) + } + + @Override + public Map valuesWithPrefixOverride(String prefix){ + if (this == currentConfig) { + return super.valuesWithPrefixOverride(prefix); + } else { + return currentConfig.valuesWithPrefixOverride(prefix); + } + } + + private Map getMap(String propName, String propValue){ + try { + Csv.parseCsvMap(propValue); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("Error parsing configuration property '%s': %s", propName, e.getMessage())); + } + } + public int brokerId() { return getInt(ServerConfigs.BROKER_ID_CONFIG); } @@ -206,4 +269,4 @@ private static String parseListenerName(String connectionString) { } return connectionString.substring(0, firstColon).toUpperCase(Locale.ROOT); } -} +} \ No newline at end of file