diff --git a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java index a2c8a717a603a..1a500e1a77e64 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java @@ -184,6 +184,10 @@ ControllerResult> deleteAcls(List filter validateFilter(filter); AclDeleteResult result = deleteAclsForFilter(filter, records); results.add(result); + } catch (BoundedListTooLongException e) { + // we do not return partial results here because the fact that only a portion of the deletions + // succeeded can be easily missed due to response size. instead fail the entire response + throw new InvalidRequestException(e.getMessage(), e); } catch (Throwable e) { results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception())); } @@ -199,13 +203,14 @@ AclDeleteResult deleteAclsForFilter(AclBindingFilter filter, StandardAcl acl = entry.getValue(); AclBinding binding = acl.toBinding(); if (filter.matches(binding)) { - deleted.add(new AclBindingDeleteResult(binding)); - records.add(new ApiMessageAndVersion( - new RemoveAccessControlEntryRecord().setId(id), (short) 0)); - if (records.size() > MAX_RECORDS_PER_USER_OP) { + // check size limitation first before adding additional records + if (records.size() >= MAX_RECORDS_PER_USER_OP) { throw new BoundedListTooLongException("Cannot remove more than " + MAX_RECORDS_PER_USER_OP + " acls in a single delete operation."); } + deleted.add(new AclBindingDeleteResult(binding)); + records.add(new ApiMessageAndVersion( + new RemoveAccessControlEntryRecord().setId(id), (short) 0)); } } return new AclDeleteResult(deleted); diff --git a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java index 15abdba14b2f2..8fdead891d3c2 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java @@ -48,6 +48,7 @@ import org.apache.kafka.server.authorizer.AuthorizationResult; import org.apache.kafka.server.authorizer.AuthorizerServerInfo; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.mutable.BoundedListTooLongException; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; @@ -69,6 +70,7 @@ import static org.apache.kafka.common.resource.PatternType.LITERAL; import static org.apache.kafka.common.resource.PatternType.MATCH; import static org.apache.kafka.common.resource.ResourceType.TOPIC; +import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP; import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -380,4 +382,60 @@ public void testDeleteDedupe() { assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsBothFilters.records().get(0).message()).id()); assertEquals(2, deleteAclResultsBothFilters.response().size()); } + + @Test + public void testDeleteExceedsMaxRecords() { + AclControlManager manager = new AclControlManager.Builder().build(); + MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer(); + authorizer.loadSnapshot(manager.idToAcl()); + + List firstCreate = new ArrayList<>(); + List secondCreate = new ArrayList<>(); + + // create MAX_RECORDS_PER_USER_OP + 2 ACLs + for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) { + StandardAclWithId acl = new StandardAclWithId(Uuid.randomUuid(), + new StandardAcl( + ResourceType.TOPIC, + "mytopic_" + i, + PatternType.LITERAL, + "User:alice", + "127.0.0.1", + AclOperation.READ, + AclPermissionType.ALLOW)); + + // split acl creations between two create requests + if (i % 2 == 0) { + firstCreate.add(acl.toBinding()); + } else { + secondCreate.add(acl.toBinding()); + } + } + ControllerResult> firstCreateResult = manager.createAcls(firstCreate); + assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, firstCreateResult.response().size()); + for (AclCreateResult result : firstCreateResult.response()) { + assertTrue(result.exception().isEmpty()); + } + + ControllerResult> secondCreateResult = manager.createAcls(secondCreate); + assertEquals((MAX_RECORDS_PER_USER_OP / 2) + 1, secondCreateResult.response().size()); + for (AclCreateResult result : secondCreateResult.response()) { + assertTrue(result.exception().isEmpty()); + } + + RecordTestUtils.replayAll(manager, firstCreateResult.records()); + RecordTestUtils.replayAll(manager, secondCreateResult.records()); + assertFalse(manager.idToAcl().isEmpty()); + + ArrayList filters = new ArrayList<>(); + for (int i = 0; i < MAX_RECORDS_PER_USER_OP + 2; i++) { + filters.add(new AclBindingFilter( + new ResourcePatternFilter(ResourceType.TOPIC, "mytopic_" + i, PatternType.LITERAL), + AccessControlEntryFilter.ANY)); + } + + Exception exception = assertThrows(InvalidRequestException.class, () -> manager.deleteAcls(filters)); + assertEquals(BoundedListTooLongException.class, exception.getCause().getClass()); + assertEquals("Cannot remove more than " + MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.", exception.getCause().getMessage()); + } }