Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 10 additions & 20 deletions broker/bucketbroker/server/bucket_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (

"github.com/go-logr/logr"
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
brokerutils "github.com/ironcore-dev/ironcore/broker/common/utils"

bucketbrokerv1alpha1 "github.com/ironcore-dev/ironcore/broker/bucketbroker/api/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/bucketbroker/apiutils"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1"
poolletutils "github.com/ironcore-dev/ironcore/poollet/common/utils"
"github.com/ironcore-dev/ironcore/utils/maps"

utilsmaps "github.com/ironcore-dev/ironcore/utils/maps"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -25,35 +27,23 @@ type AggregateIronCoreBucket struct {
AccessSecret *corev1.Secret
}

func (s *Server) prepareIronCoreBucketLabels(bucket *iri.Bucket) map[string]string {
labels := make(map[string]string)

for downwardAPILabelName, defaultLabelName := range s.brokerDownwardAPILabels {
value := bucket.GetMetadata().GetLabels()[poolletutils.DownwardAPILabel(bucketpoolletv1alpha1.BucketDownwardAPIPrefix, downwardAPILabelName)]
if value == "" {
value = bucket.GetMetadata().GetLabels()[defaultLabelName]
}
if value != "" {
labels[poolletutils.DownwardAPILabel(bucketpoolletv1alpha1.BucketDownwardAPIPrefix, downwardAPILabelName)] = value
}
}

return labels
}

func (s *Server) getIronCoreBucketConfig(_ context.Context, bucket *iri.Bucket) (*AggregateIronCoreBucket, error) {
var bucketPoolRef *corev1.LocalObjectReference
if s.bucketPoolName != "" {
bucketPoolRef = &corev1.LocalObjectReference{
Name: s.bucketPoolName,
}
}
labels := s.prepareIronCoreBucketLabels(bucket)
labels := brokerutils.PrepareDownwardAPILabels(
bucket.GetMetadata().GetLabels(),
s.brokerDownwardAPILabels,
bucketpoolletv1alpha1.BucketDownwardAPIPrefix,
)
ironcoreBucket := &storagev1alpha1.Bucket{
ObjectMeta: metav1.ObjectMeta{
Namespace: s.namespace,
Name: s.generateID(),
Labels: maps.AppendMap(labels, map[string]string{
Labels: utilsmaps.AppendMap(labels, map[string]string{
bucketbrokerv1alpha1.ManagerLabel: bucketbrokerv1alpha1.BucketBrokerManager,
}),
},
Expand Down
21 changes: 21 additions & 0 deletions broker/common/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package utils
import (
"context"

poolletutils "github.com/ironcore-dev/ironcore/poollet/common/utils"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -55,3 +57,22 @@ func ClientObjectGetter[ObjPtr ObjectPtr[Obj], Obj any](
return objPtr, nil
}
}

func PrepareDownwardAPILabels(
labelSource map[string]string,
downwardAPILabels map[string]string,
prefix string,
) map[string]string {
preparedLabels := make(map[string]string)
for downwardAPILabelName, defaultLabelName := range downwardAPILabels {
key := poolletutils.DownwardAPILabel(prefix, downwardAPILabelName)
value := labelSource[key]
if value == "" {
value = labelSource[defaultLabelName]
}
if value != "" {
preparedLabels[key] = value
}
}
return preparedLabels
}
8 changes: 8 additions & 0 deletions broker/machinebroker/apiutils/apiutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,11 @@ func IsManagedBy(o metav1.Object, manager string) bool {
actual, ok := o.GetLabels()[machinebrokerv1alpha1.ManagerLabel]
return ok && actual == manager
}

func MustMarshalJSON(v interface{}) string {
data, err := json.Marshal(v)
if err != nil {
panic(err)
}
return string(data)
}
37 changes: 24 additions & 13 deletions broker/machinebroker/networks/networks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1"
machinebrokerv1alpha1 "github.com/ironcore-dev/ironcore/broker/machinebroker/api/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/machinebroker/cluster"
utilsmaps "github.com/ironcore-dev/ironcore/utils/maps"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -42,9 +43,10 @@ func NewManager(cluster cluster.Cluster) *Manager {
}

type waiter struct {
network *networkingv1alpha1.Network
error error
done chan struct{}
network *networkingv1alpha1.Network
error error
done chan struct{}
networkLabels map[string]string
}

func (e *Manager) getNetworkForProviderID(ctx context.Context, providerID string) (*networkingv1alpha1.Network, error) {
Expand Down Expand Up @@ -81,7 +83,7 @@ func (e *Manager) providerIDHash(providerID string) string {
return rand.SafeEncodeString(fmt.Sprint(h.Sum32()))
}

func (e *Manager) getOrCreateNetworkForProviderID(ctx context.Context, log logr.Logger, providerID string) (*networkingv1alpha1.Network, error) {
func (e *Manager) getOrCreateNetworkForProviderID(ctx context.Context, log logr.Logger, providerID string, labels map[string]string) (*networkingv1alpha1.Network, error) {
network, err := e.getNetworkForProviderID(ctx, providerID)
if err != nil {
return nil, fmt.Errorf("error getting network for providerID: %w", err)
Expand All @@ -102,9 +104,9 @@ func (e *Manager) getOrCreateNetworkForProviderID(ctx context.Context, log logr.
Annotations: map[string]string{
commonv1alpha1.ManagedByAnnotation: machinebrokerv1alpha1.MachineBrokerManager,
},
Labels: map[string]string{
Labels: utilsmaps.AppendMap(labels, map[string]string{
machinebrokerv1alpha1.ManagerLabel: machinebrokerv1alpha1.MachineBrokerManager,
},
}),
},
Spec: networkingv1alpha1.NetworkSpec{
ProviderID: providerID,
Expand All @@ -125,9 +127,9 @@ func (e *Manager) setNetworkAsAvailable(ctx context.Context, network *networking
return nil
}

func (e *Manager) doWork(ctx context.Context, providerID string) (*networkingv1alpha1.Network, error) {
func (e *Manager) doWork(ctx context.Context, providerID string, labels map[string]string) (*networkingv1alpha1.Network, error) {
log := ctrl.LoggerFrom(ctx)
network, err := e.getOrCreateNetworkForProviderID(ctx, log, providerID)
network, err := e.getOrCreateNetworkForProviderID(ctx, log, providerID, labels)
if err != nil {
return nil, fmt.Errorf("error getting / creating network for providerID: %w", err)
}
Expand All @@ -150,7 +152,14 @@ func (e *Manager) processNextWorkItem(ctx context.Context) bool {
}
defer e.queue.Done(providerID)

network, err := e.doWork(ctx, providerID)
var labels map[string]string
e.waitersByProviderIDMu.Lock()
if w, ok := e.waitersByProviderID[providerID]; ok {
labels = w.networkLabels
}
e.waitersByProviderIDMu.Unlock()

network, err := e.doWork(ctx, providerID, labels)
e.emit(providerID, network, err)
e.queue.Forget(providerID)
return true
Expand Down Expand Up @@ -188,7 +197,7 @@ func (e *Manager) Start(ctx context.Context) error {
return nil
}

func (e *Manager) getOrCreateWaiter(providerID string) *waiter {
func (e *Manager) getOrCreateWaiter(providerID string, labels map[string]string) *waiter {
e.waitersByProviderIDMu.Lock()
defer e.waitersByProviderIDMu.Unlock()

Expand All @@ -197,7 +206,9 @@ func (e *Manager) getOrCreateWaiter(providerID string) *waiter {
return w
}

w = &waiter{done: make(chan struct{})}
w = &waiter{
done: make(chan struct{}),
networkLabels: labels}
e.waitersByProviderID[providerID] = w
e.queue.Add(providerID)
return w
Expand All @@ -220,8 +231,8 @@ func (e *Manager) emit(providerID string, network *networkingv1alpha1.Network, e
delete(e.waitersByProviderID, providerID)
}

func (e *Manager) GetNetwork(ctx context.Context, providerID string) (*networkingv1alpha1.Network, error) {
w := e.getOrCreateWaiter(providerID)
func (e *Manager) GetNetwork(ctx context.Context, providerID string, labels map[string]string) (*networkingv1alpha1.Network, error) {
w := e.getOrCreateWaiter(providerID, labels)
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
6 changes: 0 additions & 6 deletions broker/machinebroker/server/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
computev1alpha1 "github.com/ironcore-dev/ironcore/api/compute/v1alpha1"
networkingv1alpha1 "github.com/ironcore-dev/ironcore/api/networking/v1alpha1"
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
machinebrokerv1alpha1 "github.com/ironcore-dev/ironcore/broker/machinebroker/api/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/machinebroker/apiutils"
iri "github.com/ironcore-dev/ironcore/iri/apis/machine/v1alpha1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -169,16 +168,11 @@ func (s *Server) convertIronCoreNetworkInterfaceAttachment(
return nil, err
}

labels := ironcoreNic.NetworkInterface.GetLabels()
//TODO Remove once labels are not part of API
delete(labels, machinebrokerv1alpha1.ManagerLabel)

return &iri.NetworkInterface{
Name: ironcoreMachineNic.Name,
NetworkId: ironcoreNic.Network.Spec.ProviderID,
Ips: ips,
Attributes: ironcoreNic.NetworkInterface.Spec.Attributes,
Labels: labels,
}, nil
default:
return nil, fmt.Errorf("unrecognized ironcore machine network interface %#v", ironcoreMachineNic)
Expand Down
30 changes: 9 additions & 21 deletions broker/machinebroker/server/machine_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
commonv1alpha1 "github.com/ironcore-dev/ironcore/api/common/v1alpha1"
computev1alpha1 "github.com/ironcore-dev/ironcore/api/compute/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/common/cleaner"
brokerutils "github.com/ironcore-dev/ironcore/broker/common/utils"
machinebrokerv1alpha1 "github.com/ironcore-dev/ironcore/broker/machinebroker/api/v1alpha1"
utilsmaps "github.com/ironcore-dev/ironcore/utils/maps"

"github.com/ironcore-dev/ironcore/broker/machinebroker/apiutils"
machinepoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/machinepoollet/api/v1alpha1"

poolletutils "github.com/ironcore-dev/ironcore/poollet/common/utils"

iri "github.com/ironcore-dev/ironcore/iri/apis/machine/v1alpha1"
"github.com/ironcore-dev/ironcore/utils/maps"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -52,22 +52,6 @@ func (s *Server) prepareIronCoreMachinePower(power iri.Power) (computev1alpha1.P
}
}

func (s *Server) prepareIronCoreMachineLabels(machine *iri.Machine) map[string]string {
labels := make(map[string]string)

for downwardAPILabelName, defaultLabelName := range s.brokerDownwardAPILabels {
value := machine.GetMetadata().GetLabels()[poolletutils.DownwardAPILabel(machinepoolletv1alpha1.MachineDownwardAPIPrefix, downwardAPILabelName)]
if value == "" {
value = machine.GetMetadata().GetLabels()[defaultLabelName]
}
if value != "" {
labels[poolletutils.DownwardAPILabel(machinepoolletv1alpha1.MachineDownwardAPIPrefix, downwardAPILabelName)] = value
}
}

return labels
}

func (s *Server) prepareIronCoreMachineAnnotations(machine *iri.Machine) (map[string]string, error) {
annotationsValue, err := apiutils.EncodeAnnotationsAnnotation(machine.GetMetadata().GetAnnotations())
if err != nil {
Expand Down Expand Up @@ -116,7 +100,11 @@ func (s *Server) getIronCoreMachineConfig(machine *iri.Machine) (*IronCoreMachin
ironcoreVolumeCfgs[i] = ironcoreVolumeCfg
}

labels := s.prepareIronCoreMachineLabels(machine)
labels := brokerutils.PrepareDownwardAPILabels(
machine.GetMetadata().GetLabels(),
s.brokerDownwardAPILabels,
machinepoolletv1alpha1.MachineDownwardAPIPrefix,
)
annotations, err := s.prepareIronCoreMachineAnnotations(machine)
if err != nil {
return nil, fmt.Errorf("error preparing ironcore machine annotations: %w", err)
Expand Down Expand Up @@ -201,7 +189,7 @@ func (s *Server) createIronCoreMachine(
Namespace: s.cluster.Namespace(),
Name: s.cluster.IDGen().Generate(),
Annotations: cfg.Annotations,
Labels: maps.AppendMap(cfg.Labels, map[string]string{
Labels: utilsmaps.AppendMap(cfg.Labels, map[string]string{
machinebrokerv1alpha1.ManagerLabel: machinebrokerv1alpha1.MachineBrokerManager,
}),
},
Expand Down
2 changes: 1 addition & 1 deletion broker/machinebroker/server/machine_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *Server) aggregateIronCoreMachine(

aggIronCoreNic, err := s.aggregateIronCoreNetworkInterface(ctx, rd, ironcoreNic)
if err != nil {
return nil, fmt.Errorf("error aggregating network interface: %w", err)
return nil, fmt.Errorf("error aggregating ironcore network interface %s: %w", ironcoreNic.Name, err)
}

aggIronCoreNics[machineNic.Name] = aggIronCoreNic
Expand Down
Loading
Loading