Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/developer/designs/topology-awareness/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ The implementation will proceed in phases:
3. Implementation of Approach 1 in a topology plugin (can be the same plugin from step 2.)



## Alternatives Considered

- Using existing Kubernetes topology mechanisms like topology spread constraints, pod affinity
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/plugins/scores/scores.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const (
ResourceType = 10
Availability = 100
GpuSharing = 1000
K8sPlugins = 10000
NominatedNode = 100000
Topology = 10000
K8sPlugins = 100000
NominatedNode = 1000000
)
83 changes: 27 additions & 56 deletions pkg/scheduler/plugins/topology/topology_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@
package topology

import (
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/node_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/pod_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/k8s_internal"
kueuev1alpha1 "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
)

const (
topologyPluginName = "topology"
noNodeName = ""
)

type topologyPlugin struct {
enabled bool
TopologyTrees map[string]*TopologyInfo
enabled bool
taskOrderFunc common_info.LessFn
sessionStateGetter k8s_internal.SessionStateProvider
nodesInfos map[string]*node_info.NodeInfo
TopologyTrees map[string]*TopologyInfo
}

func New(pluginArgs map[string]string) framework.Plugin {
Expand All @@ -33,19 +36,25 @@ func (t *topologyPlugin) Name() string {

func (t *topologyPlugin) OnSessionOpen(ssn *framework.Session) {
topologies := ssn.Topologies
t.taskOrderFunc = ssn.TaskOrderFn
t.sessionStateGetter = ssn
t.nodesInfos = ssn.Nodes
t.initializeTopologyTree(topologies, ssn)

ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: t.handleAllocate(ssn),
DeallocateFunc: t.handleDeallocate(ssn),
})
//pre-predicate to generate the whole topology tree and store per workload
ssn.AddPrePredicateFn(t.prePredicateFn)
//predicate to filter nodes that are related to parts of the tree that cannot accommodate the workload - this is for "required" use only
ssn.AddPredicateFn(t.predicateFn)
//node order to sort the nodes according to topology nodes score - this is for "prefer" use only
ssn.AddNodeOrderFn(t.nodeOrderFn)
}

func (t *topologyPlugin) initializeTopologyTree(topologies []*kueuev1alpha1.Topology, ssn *framework.Session) {
for _, singleTopology := range topologies {
topologyTree := &TopologyInfo{
Name: singleTopology.Name,
Domains: map[TopologyDomainID]*TopologyDomainInfo{},
Name: singleTopology.Name,
//Domains: map[TopologyDomainID]*TopologyDomainInfo{},
DomainsByLevel: map[string]map[TopologyDomainID]*TopologyDomainInfo{},
Root: NewTopologyDomainInfo(TopologyDomainID("root"), "datacenter", "cluster", 0),
TopologyResource: singleTopology,
}
Expand All @@ -69,10 +78,16 @@ func (*topologyPlugin) addNodeDataToTopology(topologyTree *TopologyInfo, singleT
}

domainId := calcDomainId(levelIndex, singleTopology.Spec.Levels, nodeInfo.Node.Labels)
domainInfo, foundLevelLabel := topologyTree.Domains[domainId]
domainLevel := level.NodeLabel
domainsForLevel, foundLevelLabel := topologyTree.DomainsByLevel[domainLevel]
if !foundLevelLabel {
topologyTree.DomainsByLevel[level.NodeLabel] = map[TopologyDomainID]*TopologyDomainInfo{}
domainsForLevel = topologyTree.DomainsByLevel[level.NodeLabel]
}
domainInfo, foundDomain := domainsForLevel[domainId]
if !foundDomain {
domainInfo = NewTopologyDomainInfo(domainId, domainName, level.NodeLabel, levelIndex+1)
topologyTree.Domains[domainId] = domainInfo
domainsForLevel[domainId] = domainInfo
}
domainInfo.AddNode(nodeInfo)

Expand All @@ -86,48 +101,4 @@ func (*topologyPlugin) addNodeDataToTopology(topologyTree *TopologyInfo, singleT
topologyTree.Root.AddNode(nodeInfo)
}

func (t *topologyPlugin) handleAllocate(ssn *framework.Session) func(event *framework.Event) {
return t.updateTopologyGivenPodEvent(ssn, func(domainInfo *TopologyDomainInfo, podInfo *pod_info.PodInfo) {
domainInfo.AllocatedResources.AddResourceRequirements(podInfo.AcceptedResource)
domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"] =
domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"] + 1
})
}

func (t *topologyPlugin) handleDeallocate(ssn *framework.Session) func(event *framework.Event) {
return t.updateTopologyGivenPodEvent(ssn, func(domainInfo *TopologyDomainInfo, podInfo *pod_info.PodInfo) {
domainInfo.AllocatedResources.SubResourceRequirements(podInfo.AcceptedResource)
domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"] =
domainInfo.AllocatedResources.BaseResource.ScalarResources()["pods"] - 1
})
}

func (t *topologyPlugin) updateTopologyGivenPodEvent(
ssn *framework.Session,
domainUpdater func(domainInfo *TopologyDomainInfo, podInfo *pod_info.PodInfo),
) func(event *framework.Event) {
return func(event *framework.Event) {
pod := event.Task.Pod
nodeName := event.Task.NodeName
if nodeName == noNodeName {
return
}
node := ssn.Nodes[nodeName].Node
podInfo := ssn.Nodes[nodeName].PodInfos[pod_info.PodKey(pod)]

for _, topologyTree := range t.TopologyTrees {
leafDomainId := calcLeafDomainId(topologyTree.TopologyResource, node.Labels)
domainInfo := topologyTree.Domains[leafDomainId]
for domainInfo != nil {
domainUpdater(domainInfo, podInfo)

if domainInfo.Nodes[nodeName] != nil {
break
}
domainInfo = domainInfo.Parent
}
}
}
}

func (t *topologyPlugin) OnSessionClose(ssn *framework.Session) {}
Loading
Loading