Skip to content

Commit 6bceb18

Browse files
committed
Scope methods to Project and Cluster
1 parent e37d6f7 commit 6bceb18

File tree

19 files changed

+193
-158
lines changed

19 files changed

+193
-158
lines changed

server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,12 @@
1616
import org.elasticsearch.cluster.ClusterChangedEvent;
1717
import org.elasticsearch.cluster.ClusterState;
1818
import org.elasticsearch.cluster.ClusterStateListener;
19-
import org.elasticsearch.cluster.metadata.ProjectId;
2019
import org.elasticsearch.cluster.node.DiscoveryNode;
2120
import org.elasticsearch.cluster.service.ClusterService;
2221
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2322
import org.elasticsearch.common.settings.ClusterSettings;
2423
import org.elasticsearch.common.settings.Setting;
2524
import org.elasticsearch.common.settings.Settings;
26-
import org.elasticsearch.core.Nullable;
2725
import org.elasticsearch.core.TimeValue;
2826
import org.elasticsearch.node.NodeClosedException;
2927
import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -135,11 +133,10 @@ protected HealthNode createTask(
135133
* Returns the node id from the eligible health nodes
136134
*/
137135
@Override
138-
public PersistentTasksCustomMetadata.Assignment getAssignment(
136+
public PersistentTasksCustomMetadata.Assignment getClusterScopedAssignment(
139137
HealthNodeTaskParams params,
140138
Collection<DiscoveryNode> candidateNodes,
141-
ClusterState clusterState,
142-
@Nullable ProjectId projectId
139+
ClusterState clusterState
143140
) {
144141
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
145142
if (discoveryNode == null) {

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,15 @@ public ClusterState execute(ClusterState currentState) {
168168
}
169169

170170
PersistentTasksExecutor<Params> taskExecutor = registry.getPersistentTaskExecutorSafe(taskName);
171-
assert (projectId == null && taskExecutor.scope() == PersistentTasksExecutor.Scope.CLUSTER)
172-
|| (projectId != null && taskExecutor.scope() == PersistentTasksExecutor.Scope.PROJECT)
173-
: "inconsistent project-id [" + projectId + "] and task scope [" + taskExecutor.scope() + "]";
174-
taskExecutor.validate(taskParams, currentState, projectId);
171+
if (taskExecutor.scope() == PersistentTasksExecutor.Scope.CLUSTER) {
172+
assert projectId == null : "project id must be null when persistent task is cluster scoped";
173+
taskExecutor.validateCluster(taskParams, currentState);
174+
} else if (taskExecutor.scope() == PersistentTasksExecutor.Scope.PROJECT) {
175+
assert projectId != null : "project id must not be null when persistent task is project scoped";
176+
taskExecutor.validateProject(taskParams, currentState.projectState(projectId));
177+
} else {
178+
assert false : "unknown scope " + taskExecutor.scope();
179+
}
175180

176181
Assignment assignment = createAssignment(taskName, taskParams, currentState, projectId);
177182
logger.debug("creating {} persistent task [{}] with assignment [{}]", taskTypeString(projectId), taskName, assignment);
@@ -469,7 +474,16 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(
469474
// Task assignment should not rely on node order
470475
Randomness.shuffle(candidateNodes);
471476

472-
final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState, projectId);
477+
final Assignment assignment = switch (persistentTasksExecutor.scope()) {
478+
case PROJECT -> {
479+
assert projectId != null : "project id must not be null when persistent task is project scoped";
480+
yield persistentTasksExecutor.getProjectScopedAssignment(taskParams, candidateNodes, currentState.projectState(projectId));
481+
}
482+
case CLUSTER -> {
483+
assert projectId == null : "project id must be null when persistent task is cluster scoped";
484+
yield persistentTasksExecutor.getClusterScopedAssignment(taskParams, candidateNodes, currentState);
485+
}
486+
};
473487
assert assignment != null : "getAssignment() should always return an Assignment object, containing a node or a reason why not";
474488
assert (assignment.getExecutorNode() == null
475489
|| currentState.metadata().nodeShutdowns().contains(assignment.getExecutorNode()) == false)
@@ -541,7 +555,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
541555
* persistent tasks changed.
542556
*/
543557
boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
544-
var projectIdToTasksIterator = PersistentTasks.getAllTasks(event.state()).iterator();
558+
var projectIdToTasksIterator = getAllTasks(event.state()).iterator();
545559
if (projectIdToTasksIterator.hasNext() == false) {
546560
return false;
547561
}
@@ -814,7 +828,7 @@ public void runInternal() {
814828
// TODO just run on the elected master?
815829
final ClusterState state = clusterService.state();
816830
logger.trace("periodic persistent task assignment check running for cluster state {}", state.getVersion());
817-
if (isAnyTaskUnassigned(PersistentTasks.getAllTasks(state))) {
831+
if (isAnyTaskUnassigned(getAllTasks(state))) {
818832
reassignPersistentTasks();
819833
}
820834
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
package org.elasticsearch.persistent;
1111

1212
import org.elasticsearch.cluster.ClusterState;
13-
import org.elasticsearch.cluster.metadata.ProjectId;
13+
import org.elasticsearch.cluster.ProjectState;
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
1515
import org.elasticsearch.core.Nullable;
1616
import org.elasticsearch.core.Tuple;
@@ -60,16 +60,26 @@ public Scope scope() {
6060
public static final Assignment NO_NODE_FOUND = new Assignment(null, "no appropriate nodes found for the assignment");
6161

6262
/**
63-
* Returns the node id where the params has to be executed,
63+
* Returns the node id where the params has to be executed based on the Project scope,
6464
* <p>
65-
* The default implementation returns the least loaded data node from amongst the collection of candidate nodes
65+
* The default implementation returns the least loaded data node from amongst the collection of candidate nodes.
66+
* <p>
67+
* Implement this method if your {@link #scope()} returns PROJECT, or {@link #getClusterScopedAssignment} if your {@link #scope()}
68+
* returns CLUSTER.
6669
*/
67-
public Assignment getAssignment(
68-
Params params,
69-
Collection<DiscoveryNode> candidateNodes,
70-
ClusterState clusterState,
71-
@Nullable ProjectId projectId
72-
) {
70+
public Assignment getProjectScopedAssignment(Params params, Collection<DiscoveryNode> candidateNodes, ProjectState projectState) {
71+
return getClusterScopedAssignment(params, candidateNodes, projectState.cluster());
72+
}
73+
74+
/**
75+
* Returns the node id where the params has to be executed based on the Cluster scope,
76+
* <p>
77+
* The default implementation returns the least loaded data node from amongst the collection of candidate nodes.
78+
* <p>
79+
* Implement this method if your {@link #scope()} returns CLUSTER, or {@link #getProjectScopedAssignment} if your {@link #scope()}
80+
* returns PROJECT.
81+
*/
82+
public Assignment getClusterScopedAssignment(Params params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
7383
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
7484
if (discoveryNode == null) {
7585
return NO_NODE_FOUND;
@@ -110,8 +120,19 @@ protected DiscoveryNode selectLeastLoadedNode(
110120
* Checks the current cluster state for compatibility with the params
111121
* <p>
112122
* Throws an exception if the supplied params cannot be executed on the cluster in the current state.
123+
* <p>
124+
* Implement this method if your {@link #scope()} returns CLUSTER, or {@link #validateProject} if your {@link #scope()} returns PROJECT.
125+
*/
126+
public void validateCluster(Params params, ClusterState clusterState) {}
127+
128+
/**
129+
* Checks the current project state for compatibility with the params
130+
* <p>
131+
* Throws an exception if the supplied params cannot be executed on the cluster in the current state.
132+
* <p>
133+
* Implement this method if your {@link #scope()} returns PROJECT, or {@link #validateCluster} if your {@link #scope()} returns CLUSTER.
113134
*/
114-
public void validate(Params params, ClusterState clusterState, @Nullable ProjectId projectId) {}
135+
public void validateProject(Params params, ProjectState projectState) {}
115136

116137
/**
117138
* Creates a AllocatedPersistentTask for communicating with task manager

server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.cluster.metadata.IndexMetadata;
2121
import org.elasticsearch.cluster.metadata.Metadata;
2222
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
23-
import org.elasticsearch.cluster.metadata.ProjectId;
2423
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2524
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
2625
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -1088,11 +1087,10 @@ public Scope scope() {
10881087
}
10891088

10901089
@Override
1091-
public Assignment getAssignment(
1090+
public Assignment getClusterScopedAssignment(
10921091
P params,
10931092
Collection<DiscoveryNode> candidateNodes,
1094-
ClusterState clusterState,
1095-
ProjectId projectId
1093+
ClusterState clusterState
10961094
) {
10971095
return fn.apply(params, candidateNodes, clusterState);
10981096
}

server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.elasticsearch.client.internal.Client;
2525
import org.elasticsearch.client.internal.ElasticsearchClient;
2626
import org.elasticsearch.cluster.ClusterState;
27+
import org.elasticsearch.cluster.ProjectState;
2728
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
28-
import org.elasticsearch.cluster.metadata.ProjectId;
2929
import org.elasticsearch.cluster.node.DiscoveryNode;
3030
import org.elasticsearch.cluster.service.ClusterService;
3131
import org.elasticsearch.common.Strings;
@@ -327,17 +327,25 @@ public static void setNonClusterStateCondition(boolean nonClusterStateCondition)
327327
}
328328

329329
@Override
330-
public Assignment getAssignment(
330+
public Assignment getProjectScopedAssignment(
331331
TestParams params,
332332
Collection<DiscoveryNode> candidateNodes,
333-
ClusterState clusterState,
334-
ProjectId projectId
333+
ProjectState projectState
334+
) {
335+
return getClusterScopedAssignment(params, candidateNodes, projectState.cluster());
336+
}
337+
338+
@Override
339+
public Assignment getClusterScopedAssignment(
340+
TestParams params,
341+
Collection<DiscoveryNode> candidateNodes,
342+
ClusterState clusterState
335343
) {
336344
if (nonClusterStateCondition == false) {
337345
return new Assignment(null, "non cluster state condition prevents assignment");
338346
}
339347
if (params == null || params.getExecutorNodeAttr() == null) {
340-
return super.getAssignment(params, candidateNodes, clusterState, projectId);
348+
return super.getClusterScopedAssignment(params, candidateNodes, clusterState);
341349
} else {
342350
DiscoveryNode executorNode = selectLeastLoadedNode(
343351
clusterState,

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@
2828
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2929
import org.elasticsearch.client.internal.Client;
3030
import org.elasticsearch.client.internal.RemoteClusterClient;
31-
import org.elasticsearch.cluster.ClusterState;
31+
import org.elasticsearch.cluster.ProjectState;
3232
import org.elasticsearch.cluster.metadata.AliasMetadata;
3333
import org.elasticsearch.cluster.metadata.IndexMetadata;
3434
import org.elasticsearch.cluster.metadata.MappingMetadata;
35-
import org.elasticsearch.cluster.metadata.ProjectId;
3635
import org.elasticsearch.cluster.node.DiscoveryNode;
3736
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3837
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -44,7 +43,6 @@
4443
import org.elasticsearch.common.util.concurrent.EsExecutors;
4544
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4645
import org.elasticsearch.core.CheckedConsumer;
47-
import org.elasticsearch.core.Nullable;
4846
import org.elasticsearch.core.TimeValue;
4947
import org.elasticsearch.index.Index;
5048
import org.elasticsearch.index.IndexNotFoundException;
@@ -120,8 +118,8 @@ public ShardFollowTasksExecutor(Client client, ThreadPool threadPool, ClusterSer
120118
}
121119

122120
@Override
123-
public void validate(ShardFollowTask params, ClusterState clusterState, @Nullable ProjectId projectId) {
124-
final IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex());
121+
public void validateProject(ShardFollowTask params, ProjectState projectState) {
122+
final IndexRoutingTable routingTable = projectState.cluster().getRoutingTable().index(params.getFollowShardId().getIndex());
125123
final ShardRouting primaryShard = routingTable.shard(params.getFollowShardId().id()).primaryShard();
126124
if (primaryShard.active() == false) {
127125
throw new IllegalArgumentException("The primary shard of a follower index " + primaryShard + " is not active");
@@ -131,14 +129,13 @@ public void validate(ShardFollowTask params, ClusterState clusterState, @Nullabl
131129
private static final Assignment NO_ASSIGNMENT = new Assignment(null, "no nodes found with data and remote cluster client roles");
132130

133131
@Override
134-
public Assignment getAssignment(
132+
public Assignment getProjectScopedAssignment(
135133
final ShardFollowTask params,
136134
final Collection<DiscoveryNode> candidateNodes,
137-
final ClusterState clusterState,
138-
@Nullable final ProjectId projectId
135+
final ProjectState projectState
139136
) {
140137
final DiscoveryNode node = selectLeastLoadedNode(
141-
clusterState,
138+
projectState.cluster(),
142139
candidateNodes,
143140
((Predicate<DiscoveryNode>) DiscoveryNode::canContainData).and(DiscoveryNode::isRemoteClusterClient)
144141
);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.client.internal.Client;
1111
import org.elasticsearch.cluster.ClusterName;
1212
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.ProjectId;
1314
import org.elasticsearch.cluster.node.DiscoveryNode;
1415
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
1516
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -90,11 +91,10 @@ private void runAssignmentTest(
9091
nodesBuilder.add(newNode(otherNodesRolesSupplier.get()));
9192
}
9293
clusterStateBuilder.nodes(nodesBuilder);
93-
final Assignment assignment = executor.getAssignment(
94+
final Assignment assignment = executor.getProjectScopedAssignment(
9495
mock(ShardFollowTask.class),
9596
clusterStateBuilder.nodes().getAllNodes(),
96-
clusterStateBuilder.build(),
97-
null
97+
clusterStateBuilder.build().projectState(ProjectId.DEFAULT)
9898
);
9999
consumer.accept(theSpecial, assignment);
100100
}

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@
2222
import org.elasticsearch.action.support.TransportAction;
2323
import org.elasticsearch.client.internal.Client;
2424
import org.elasticsearch.cluster.ClusterState;
25-
import org.elasticsearch.cluster.metadata.ProjectId;
25+
import org.elasticsearch.cluster.ProjectState;
2626
import org.elasticsearch.cluster.node.DiscoveryNode;
2727
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2828
import org.elasticsearch.cluster.routing.ShardRouting;
2929
import org.elasticsearch.common.io.stream.StreamOutput;
3030
import org.elasticsearch.common.settings.Settings;
3131
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3232
import org.elasticsearch.common.util.concurrent.EsExecutors;
33-
import org.elasticsearch.core.Nullable;
3433
import org.elasticsearch.index.IndexNotFoundException;
3534
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
3635
import org.elasticsearch.index.shard.ShardId;
@@ -118,21 +117,20 @@ protected AllocatedPersistentTask createTask(
118117
}
119118

120119
@Override
121-
public void validate(DownsampleShardTaskParams params, ClusterState clusterState, @Nullable ProjectId projectId) {
120+
public void validateProject(DownsampleShardTaskParams params, ProjectState projectState) {
122121
// This is just a pre-check, but doesn't prevent from avoiding from aborting the task when source index disappeared
123122
// after initial creation of the persistent task.
124-
var indexShardRouting = findShardRoutingTable(params.shardId(), clusterState);
123+
var indexShardRouting = findShardRoutingTable(params.shardId(), projectState.cluster());
125124
if (indexShardRouting == null) {
126125
throw new ShardNotFoundException(params.shardId());
127126
}
128127
}
129128

130129
@Override
131-
public PersistentTasksCustomMetadata.Assignment getAssignment(
130+
public PersistentTasksCustomMetadata.Assignment getProjectScopedAssignment(
132131
final DownsampleShardTaskParams params,
133132
final Collection<DiscoveryNode> candidateNodes,
134-
final ClusterState clusterState,
135-
@Nullable final ProjectId projectId
133+
final ProjectState projectState
136134
) {
137135
// NOTE: downsampling works by running a task per each shard of the source index.
138136
// Here we make sure we assign the task to the actual node holding the shard identified by
@@ -142,9 +140,9 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
142140
// If during re-assignment the source index was deleted, then we need to break out.
143141
// Returning NO_NODE_FOUND just keeps the persistent task until the source index appears again (which would never happen)
144142
// So let's return a node and then in the node operation we would just fail and stop this persistent task
145-
var indexShardRouting = findShardRoutingTable(shardId, clusterState);
143+
var indexShardRouting = findShardRoutingTable(shardId, projectState.cluster());
146144
if (indexShardRouting == null) {
147-
var node = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
145+
var node = selectLeastLoadedNode(projectState.cluster(), candidateNodes, DiscoveryNode::canContainData);
148146
return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task");
149147
}
150148

0 commit comments

Comments
 (0)