Skip to content

KAFKA-19400: Update AddRaftVoterRequest RPC to version 1 #19982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"type": "request",
"listeners": ["controller", "broker"],
"name": "AddRaftVoterRequest",
"validVersions": "0",
// Version 1 adds the AckWhenCommitted field.
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "0+", "nullableVersions": "0+",
Expand All @@ -37,6 +38,8 @@
"about": "The hostname." },
{ "name": "Port", "type": "uint16", "versions": "0+",
"about": "The port." }
]}
]},
{ "name": "AckWhenCommitted", "type": "bool", "versions": "1+", "default": "true",
"about": "When true, return a response after the new voter set is committed. Otherwise, return after the leader writes the changes locally." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"apiKey": 80,
"type": "response",
"name": "AddRaftVoterResponse",
"validVersions": "0",
// Version 1 is the same as version 0
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2263,7 +2263,8 @@ private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
quorum.leaderStateOrThrow(),
newVoter.get(),
newVoterEndpoints,
currentTimeMs
currentTimeMs,
data.ackWhenCommitted()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
LeaderState<?> leaderState,
ReplicaKey voterKey,
Endpoints voterEndpoints,
long currentTimeMs
long currentTimeMs,
boolean ackWhenCommitted
) {
// Check if there are any pending voter change requests
if (leaderState.isOperationPending(currentTimeMs)) {
Expand Down Expand Up @@ -184,7 +185,8 @@ public CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
AddVoterHandlerState state = new AddVoterHandlerState(
voterKey,
voterEndpoints,
time.timer(timeout.getAsLong())
time.timer(timeout.getAsLong()),
ackWhenCommitted
);
leaderState.resetAddVoterHandlerState(
Errors.UNKNOWN_SERVER_ERROR,
Expand Down Expand Up @@ -321,7 +323,16 @@ public boolean handleApiVersionsResponse(
)
);
current.setLastOffset(leaderState.appendVotersRecord(newVoters, currentTimeMs));

if (!current.ackWhenCommitted()) {
// complete the future to send response, but do not reset the state,
// since the new voter set is not yet committed
current.future().complete(
RaftUtil.addVoterResponse(
Errors.NONE,
null
)
);
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,21 @@ public final class AddVoterHandlerState {
private final ReplicaKey voterKey;
private final Endpoints voterEndpoints;
private final Timer timeout;
private final boolean ackWhenCommitted;
private final CompletableFuture<AddRaftVoterResponseData> future = new CompletableFuture<>();

private OptionalLong lastOffset = OptionalLong.empty();

AddVoterHandlerState(
ReplicaKey voterKey,
Endpoints voterEndpoints,
Timer timeout
Timer timeout,
boolean ackWhenCommitted
) {
this.voterKey = voterKey;
this.voterEndpoints = voterEndpoints;
this.timeout = timeout;
this.ackWhenCommitted = ackWhenCommitted;
}

public long timeUntilOperationExpiration(long currentTimeMs) {
Expand Down Expand Up @@ -76,6 +79,10 @@ public Endpoints voterEndpoints() {
return voterEndpoints;
}

public boolean ackWhenCommitted() {
return ackWhenCommitted;
}

public OptionalLong lastOffset() {
return lastOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
Expand Down Expand Up @@ -339,27 +340,80 @@ public void testAddVoter() throws Exception {
Map.of(context.channel.listenerName(), newAddress)
);

// Show that the new voter is not currently a voter
assertFalse(context.client.quorum().isVoter(newVoter));
prepareLeaderToReceiveAddVoter(context, epoch, local, follower, newVoter);

// Establish a HWM and fence previous leaders
context.deliverRequest(
context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0)
);
// Attempt to add new voter to the quorum
context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners));

completeApiVersionsForAddVoter(context, newVoter, newAddress);

// Handle the API_VERSIONS response
context.client.poll();
// Append new VotersRecord to log
context.client.poll();

commitNewVoterSetForAddVoter(context, local, follower, newVoter, epoch);

// Expect reply for AddVoter request
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id()));
context.assertSentAddVoterResponse(Errors.NONE);
}

// Catch up the new voter to the leader's LEO, the new voter is still an observer at this point
@Test
void testAddVoterCompletesEarlyWithAckWhenCommittedFalse() throws Exception {
ReplicaKey local = replicaKey(randomReplicaId(), true);
ReplicaKey follower = replicaKey(local.id() + 1, true);

VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));

RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get())
.withRaftProtocol(RaftProtocol.KIP_1186_PROTOCOL)
.withBootstrapSnapshot(Optional.of(voters))
.withUnknownLeader(3)
.build();

context.unattachedToLeader();
int epoch = context.currentEpoch();

checkLeaderMetricValues(2, 0, 0, context);

ReplicaKey newVoter = replicaKey(local.id() + 2, true);
InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
"localhost",
9990 + newVoter.id()
);
Endpoints newListeners = Endpoints.fromInetSocketAddresses(
Map.of(context.channel.listenerName(), newAddress)
);

prepareLeaderToReceiveAddVoter(context, epoch, local, follower, newVoter);

// Attempt to add new voter to the quorum
context.deliverRequest(
context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0)
context.addVoterRequest(
Integer.MAX_VALUE,
newVoter,
newListeners
).setAckWhenCommitted(false)
);

completeApiVersionsForAddVoter(context, newVoter, newAddress);

// Handle the API_VERSIONS response
context.client.poll();
// Append new VotersRecord to log
// Expect a response for AddVoter request before committing the new voter
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id()));
checkLeaderMetricValues(2, 1, 0, context);
context.assertSentAddVoterResponse(Errors.NONE);

// Attempt to add new voter to the quorum
context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners));
commitNewVoterSetForAddVoter(context, local, follower, newVoter, epoch);
}

private void completeApiVersionsForAddVoter(
RaftClientTestContext context,
ReplicaKey newVoter,
InetSocketAddress newAddress
) throws Exception {
// Leader should send an API_VERSIONS request to the new voter's endpoint
context.pollUntilRequest();
RaftRequest.Outbound apiVersionRequest = context.assertSentApiVersionsRequest();
Expand All @@ -374,26 +428,105 @@ public void testAddVoter() throws Exception {
apiVersionRequest.destination(),
apiVersionsResponse(Errors.NONE)
);
}

// Handle the API_VERSIONS response
context.client.poll();
// Append new VotersRecord to log
context.client.poll();
private void commitNewVoterSetForAddVoter(
RaftClientTestContext context,
ReplicaKey leader,
ReplicaKey follower,
ReplicaKey newVoter,
int epoch
) throws Exception {
// The new voter is now a voter after writing the VotersRecord to the log
assertTrue(context.client.quorum().isVoter(newVoter));
checkLeaderMetricValues(3, 0, 1, context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem very extensible - maybe the above checks belong outside of this method


// Send a FETCH to increase the HWM and commit the new voter set
context.deliverRequest(
context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0)
context.fetchRequest(
epoch,
follower,
context.log.endOffset().offset(),
epoch,
0
)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id()));
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(leader.id()));
checkLeaderMetricValues(3, 0, 0, context);
}

// Expect reply for AddVoter request
@ParameterizedTest
@EnumSource(value = RaftProtocol.class, names = {
"KIP_853_PROTOCOL",
"KIP_996_PROTOCOL",
"KIP_1166_PROTOCOL"
})
void testAddVoterAckWhenCommittedUnsupported(RaftProtocol protocol) throws Exception {
ReplicaKey local = replicaKey(randomReplicaId(), true);
ReplicaKey follower = replicaKey(local.id() + 1, true);

VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));

RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get())
.withRaftProtocol(protocol)
.withBootstrapSnapshot(Optional.of(voters))
.withUnknownLeader(3)
.build();

context.unattachedToLeader();
int epoch = context.currentEpoch();

ReplicaKey newVoter = replicaKey(local.id() + 2, true);
InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
"localhost",
9990 + newVoter.id()
);
Endpoints newListeners = Endpoints.fromInetSocketAddresses(
Map.of(context.channel.listenerName(), newAddress)
);

prepareLeaderToReceiveAddVoter(context, epoch, local, follower, newVoter);

// Attempt to add new voter to the quorum
assertThrows(
UnsupportedVersionException.class,
() -> context.deliverRequest(
context.addVoterRequest(
Integer.MAX_VALUE,
newVoter,
newListeners
).setAckWhenCommitted(false)
)
);
}

// This method sets up the context so a test can send an AddVoter request after
// exiting this method
private void prepareLeaderToReceiveAddVoter(
RaftClientTestContext context,
int epoch,
ReplicaKey leader,
ReplicaKey follower,
ReplicaKey observer
) throws Exception {
// Show that the observer is not currently a voter
assertFalse(context.client.quorum().isVoter(observer));

// Establish a HWM and fence previous leaders
context.deliverRequest(
context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0)
);
context.pollUntilResponse();
context.assertSentAddVoterResponse(Errors.NONE);
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(leader.id()));

// Catch up the new voter to the leader's LEO, the new voter is still an observer at this point
context.deliverRequest(
context.fetchRequest(epoch, observer, context.log.endOffset().offset(), epoch, 0)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(leader.id()));
checkLeaderMetricValues(2, 1, 0, context);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2018,7 +2018,9 @@ private short describeQuorumRpcVersion() {
}

private short addVoterRpcVersion() {
if (raftProtocol.isReconfigSupported()) {
if (raftProtocol.isAutoJoinSupported()) {
return 1;
} else if (raftProtocol.isReconfigSupported()) {
return 0;
} else {
throw new IllegalStateException("Reconfiguration must be enabled by calling withRaftProtocol(KIP_853_PROTOCOL)");
Expand Down Expand Up @@ -2264,7 +2266,7 @@ public void handleLoadSnapshot(SnapshotReader<String> reader) {

/**
* Determines what versions of RPCs are in use. Note, these are ordered from oldest to newest, and are
* cumulative. E.g. KIP_996_PROTOCOL includes KIP_853_PROTOCOL and KIP_595_PROTOCOL changes
* cumulative. E.g. KIP_1186_PROTOCOL includes KIP_996_PROTOCOL, KIP_853_PROTOCOL, and KIP_595_PROTOCOL changes
*/
enum RaftProtocol {
// kraft support
Expand All @@ -2274,7 +2276,9 @@ enum RaftProtocol {
// preVote support
KIP_996_PROTOCOL,
// HWM in FETCH request support
KIP_1166_PROTOCOL;
KIP_1166_PROTOCOL,
// autoJoin support
KIP_1186_PROTOCOL;

boolean isKRaftSupported() {
return isAtLeast(KIP_595_PROTOCOL);
Expand All @@ -2291,6 +2295,10 @@ boolean isPreVoteSupported() {
boolean isHwmInFetchSupported() {
return isAtLeast(KIP_1166_PROTOCOL);
}

boolean isAutoJoinSupported() {
return isAtLeast(KIP_1186_PROTOCOL);
}

private boolean isAtLeast(RaftProtocol otherRpc) {
return this.compareTo(otherRpc) >= 0;
Expand Down