/* Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controller import ( "context" "fmt" "sort" "strings" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" apivalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" configv1alpha1 "git.vendetti.ru/andy/operator/api/v1alpha1" ) const ( GlobalTaintConfigName = "global-taint-rules" // Condition Types ConditionTypeReady = "Ready" ConditionReasonConfigParsingError = "ConfigParsingError" ConditionReasonConfigNotFound = "ConfigNotFound" ConditionReasonReady = "Ready" ) // NodeTainterConfigReconciler reconciles a NodeTainterConfig object type NodeTainterConfigReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder } // kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch // +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/status,verbs=get;update;patch // kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/finalizers,verbs=update // For nodes control // +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;update;patch // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // TODO(user): Modify the Reconcile function to compare the state specified by // the NodeTainterConfig object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.20.2/pkg/reconcile func (r *NodeTainterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // _ = log.FromContext(ctx) log := log.FromContext(ctx) // 1. Getting current Node var node corev1.Node if err := r.Get(ctx, req.NamespacedName, &node); err != nil { if apierrors.IsNotFound(err) { // Node deleted, nothing to do log.Info("Node not found. Ignoring.", "node", req.NamespacedName) return ctrl.Result{}, nil } // Other error with getting Node entity log.Error(err, "Failed to get Node", "node", req.NamespacedName) return ctrl.Result{}, err // Repeat } log = log.WithValues("node", node.Name) // Now logger knows Node name // 2. Getting global NodeTainterConfig var config configv1alpha1.NodeTainterConfig configKey := types.NamespacedName{Name: GlobalTaintConfigName} // Namespace is empty for Cluster scope if err := r.Get(ctx, configKey, &config); err != nil { nodeUpdateErr := r.updateNodeTaintStatus(ctx, &node, nil, fmt.Sprintf("Failed to get NodeTainterConfig %s: %v", GlobalTaintConfigName, err)) if nodeUpdateErr != nil { log.Error(nodeUpdateErr, "Failed to update node status after config fetch error") } if apierrors.IsNotFound(err) { log.Error(err, "Global NodeTainterConfig not found", "configName", GlobalTaintConfigName) r.Recorder.Eventf(&node, corev1.EventTypeWarning, "ConfigMissing", "Required NodeTainterConfig '%s' not found", GlobalTaintConfigName) return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil } // Other error with getting CRD log.Error(err, "Failed to get NodeTainterConfig", "configName", GlobalTaintConfigName) return ctrl.Result{}, err // Repeat } log = log.WithValues("node", node.Name, "config", config.Name, "configGeneration", config.Generation) // log.Info("Processing node using rules from NodeTainterConfig", "configName", config.Name) // 3. Parse rules from CRD (this is just a field access now) // Use config.Spec.Rules straight specLabelRules := config.Spec.LabelRules if specLabelRules == nil { specLabelRules = make(map[string]string) // If the field is empty log.Info("Taint rules map is nil in config, assuming no rules", "configName", config.Name) } // Converting map[string]string into map[string]corev1.Taint for convenience parsedRules, parseErrs := parseLabelRulesFromSpec(specLabelRules) if len(parseErrs) > 0 { parsingError := fmt.Errorf("invalid rules found in NodeTainterConfig %s: %v", config.Name, parseErrs) log.Error(parsingError, "Rule parsing failed", "configName", config.Name, "parsingErrors", parseErrs) r.Recorder.Eventf(&config, corev1.EventTypeWarning, "InvalidConfig", parsingError.Error()) _ = r.updateCRDStatus(ctx, &config, metav1.ConditionFalse, ConditionReasonConfigParsingError, parsingError.Error()) _ = r.updateNodeTaintStatus(ctx, &node, nil, parsingError.Error()) return ctrl.Result{}, nil } // log.V(1).Info("Parsed taint rules from CRD", "rulesCount", len(parsedRules)) // 4. Define desired Taints for THAT Node desiredTaints := calculateDesiredTaints(node.Labels, parsedRules) log.V(1).Info("Calculated desired taints", "taints", desiredTaints) // 5. Get current Taints and compare/update originalTaints := node.Spec.Taints // Passing parsedRules for the func to know which Taints are controlled by this operator needsUpdate, newTaints := mergeAndCheckTaints(originalTaints, desiredTaints, parsedRules) // Function var updateErr error if needsUpdate { log.Info("Taints require update", "old", originalTaints, "new", newTaints) nodeCopy := node.DeepCopy() nodeCopy.Spec.Taints = newTaints // Using Patch for atomicity and less conflict risk if err := r.Patch(ctx, nodeCopy, client.MergeFrom(&node)); err != nil { log.Error(err, "Failed to patch Node taints") r.Recorder.Eventf(&node, corev1.EventTypeWarning, "UpdateFailed", "Failed to patch taints: %v", err) updateErr = err } else { log.Info("Successfully patched Node taints") r.Recorder.Eventf(&node, corev1.EventTypeNormal, "TaintsUpdated", "Taints updated based on rules from %s", config.Name) updateErr = r.updateNodeTaintStatus(ctx, &node, newTaints, "") } } else { log.Info("Node taints are already up-to-date") updateErr = r.updateNodeTaintStatus(ctx, &node, originalTaints, "") } if updateErr != nil { return ctrl.Result{}, updateErr } return ctrl.Result{}, nil } // Map function for NodeTainterConfig: Trigger reconcile for ALL nodes when the specific config changes func (r *NodeTainterConfigReconciler) mapConfigToNodes(ctx context.Context, obj client.Object) []reconcile.Request { config, ok := obj.(*configv1alpha1.NodeTainterConfig) log := log.FromContext(ctx) // Interested only in global config if !ok || config.Name != GlobalTaintConfigName { // log.V(1).Info("Ignoring unrelated config change", "object", client.ObjectKeyFromObject(obj)) return nil } log.Info("Global NodeTainterConfig changed, queuing reconciliation for all nodes", "configName", config.Name) var nodeList corev1.NodeList if err := r.List(ctx, &nodeList); err != nil { log.Error(err, "Failed to list nodes for config change") return nil } requests := make([]reconcile.Request, len(nodeList.Items)) for i, node := range nodeList.Items { requests[i] = reconcile.Request{NamespacedName: types.NamespacedName{Name: node.Name}} } log.Info("Queued node reconcile requests", "count", len(requests)) return requests } // SetupWithManager sets up the controller with the Manager. func (r *NodeTainterConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { r.Recorder = mgr.GetEventRecorderFor("nodetainter-controller") return ctrl.NewControllerManagedBy(mgr). // Main resource we are watching is Node For(&corev1.Node{}). // Watching for changes of our global CRD config. // If it changes, queuing ALL Nodes. Watches( &configv1alpha1.NodeTainterConfig{}, handler.EnqueueRequestsFromMapFunc(r.mapConfigToNodes), // For(&operatorv1alpha1.NodeTainterConfig{}). // Named("nodetainterconfig"). ). Complete(r) } // --- UTILS FUNCTIONS --- // Converts Taint to string for status/logs func TaintToString(taint *corev1.Taint) string { return fmt.Sprintf("%s=%s:%s", taint.Key, taint.Value, taint.Effect) } // Converts Taints slice to string slice func TaintsToStrings(taints []corev1.Taint) []string { res := make([]string, len(taints)) for i, t := range taints { res[i] = TaintToString(&t) } sort.Strings(res) return res } // Parses rules from CRD Spec // returns map["labelKey=labelValue"]corev1.Taint and errors func parseLabelRulesFromSpec(specLabelRules map[string]string) (map[string]corev1.Taint, []error) { parsed := make(map[string]corev1.Taint) var errs []error for ruleSelector, taintString := range specLabelRules { ruleSelector = strings.TrimSpace(ruleSelector) taintString = strings.TrimSpace(taintString) if ruleSelector == "" || taintString == "" { errs = append(errs, fmt.Errorf("rule has empty selector ('%s') or taint string ('%s')", ruleSelector, taintString)) continue } partsSelector := strings.SplitN(ruleSelector, "=", 2) if len(partsSelector) != 2 { errs = append(errs, fmt.Errorf("invalid rule selector format '%s': missing '='", ruleSelector)) continue } labelKey := strings.TrimSpace(partsSelector[0]) labelValue := strings.TrimSpace(partsSelector[1]) if labelKey == "" { errs = append(errs, fmt.Errorf("invalid rule selector format '%s': empty label key", ruleSelector)) continue } if msgs := apivalidation.IsQualifiedName(labelKey); len(msgs) > 0 { errs = append(errs, fmt.Errorf("invalid label key in selector '%s': %v", ruleSelector, msgs)) continue } if labelValue != "" { if msgs := apivalidation.IsValidLabelValue(labelValue); len(msgs) > 0 { errs = append(errs, fmt.Errorf("invalid label value in selector '%s': %v", ruleSelector, msgs)) continue } } partsEffect := strings.SplitN(taintString, ":", 2) if len(partsEffect) != 2 || partsEffect[1] == "" { errs = append(errs, fmt.Errorf("invalid taint format for rule '%s': '%s' (missing effect)", ruleSelector, taintString)) continue } keyAndValue := partsEffect[0] effectString := partsEffect[1] partsKeyValue := strings.SplitN(keyAndValue, "=", 2) if len(partsKeyValue) != 2 || partsKeyValue[0] == "" || partsKeyValue[1] == "" { errs = append(errs, fmt.Errorf("invalid taint format for rule '%s': '%s' (invalid key/value)", ruleSelector, taintString)) continue } taintKey := partsKeyValue[0] taintValue := partsKeyValue[1] if msgs := apivalidation.IsQualifiedName(taintKey); len(msgs) > 0 { if simpleMsgs := apivalidation.IsDNS1123Label(taintKey); len(simpleMsgs) > 0 { errs = append(errs, fmt.Errorf("invalid taint key for rule '%s': '%s' in '%s' (%v / %v)", ruleSelector, taintKey, taintString, msgs, simpleMsgs)) continue } } var effect corev1.TaintEffect switch effectString { case string(corev1.TaintEffectNoSchedule): effect = corev1.TaintEffectNoSchedule case string(corev1.TaintEffectPreferNoSchedule): effect = corev1.TaintEffectPreferNoSchedule case string(corev1.TaintEffectNoExecute): effect = corev1.TaintEffectNoExecute default: errs = append(errs, fmt.Errorf("invalid taint effect for rule '%s': '%s' in '%s'", ruleSelector, effectString, taintString)) continue } taint := corev1.Taint{Key: taintKey, Value: taintValue, Effect: effect} parsed[ruleSelector] = taint // Key = "labelKey=labelValue" } return parsed, errs } // Defines Taints depending on node labels and rules func calculateDesiredTaints(nodeLabels map[string]string, parsedLabelRules map[string]corev1.Taint) []corev1.Taint { desired := []corev1.Taint{} foundTaints := make(map[string]bool) if nodeLabels == nil { nodeLabels = make(map[string]string) } for ruleSelector, taint := range parsedLabelRules { parts := strings.SplitN(ruleSelector, "=", 2) if len(parts) != 2 { continue } ruleKey := parts[0] ruleValue := parts[1] actualValue, exists := nodeLabels[ruleKey] if exists && actualValue == ruleValue { taintKeyEffect := fmt.Sprintf("%s:%s", taint.Key, taint.Effect) if !foundTaints[taintKeyEffect] { desired = append(desired, taint) foundTaints[taintKeyEffect] = true } } } return desired } // Creates unique string for Taint (Key:Effect) func TaintKeyEffect(taint *corev1.Taint) string { return fmt.Sprintf("%s:%s", taint.Key, taint.Effect) } // Compares current and desired controlled Taints func mergeAndCheckTaints(currentTaints []corev1.Taint, desiredTaints []corev1.Taint, parsedLabelRules map[string]corev1.Taint) (bool, []corev1.Taint) { managedTaintTypes := sets.NewString() for _, ruleTaint := range parsedLabelRules { managedTaintTypes.Insert(TaintKeyEffect(&ruleTaint)) } currentManagedTaints := make(map[string]corev1.Taint) unmanagedTaints := []corev1.Taint{} for _, taint := range currentTaints { ke := TaintKeyEffect(&taint) if managedTaintTypes.Has(ke) { currentManagedTaints[ke] = taint } else { unmanagedTaints = append(unmanagedTaints, taint) } } desiredTaintsMap := make(map[string]corev1.Taint) // key:Effect -> Taint for _, taint := range desiredTaints { ke := TaintKeyEffect(&taint) if managedTaintTypes.Has(ke) { desiredTaintsMap[ke] = taint } } needsUpdate := false if len(currentManagedTaints) != len(desiredTaintsMap) { needsUpdate = true } else { for ke, desiredTaint := range desiredTaintsMap { currentTaint, exists := currentManagedTaints[ke] if !exists || currentTaint.Value != desiredTaint.Value { needsUpdate = true break } } } if needsUpdate { newTaints := make([]corev1.Taint, 0, len(unmanagedTaints)+len(desiredTaintsMap)) newTaints = append(newTaints, unmanagedTaints...) desiredKeys := make([]string, 0, len(desiredTaintsMap)) for ke := range desiredTaintsMap { desiredKeys = append(desiredKeys, ke) } sort.Strings(desiredKeys) for _, ke := range desiredKeys { newTaints = append(newTaints, desiredTaintsMap[ke]) } return true, newTaints } return false, currentTaints } // Updates NodeTainterConfig status // TODO: Call this function on CRD updates or controller start/errors func (r *NodeTainterConfigReconciler) updateCRDStatus(ctx context.Context, config *configv1alpha1.NodeTainterConfig, status metav1.ConditionStatus, reason, message string) error { log := log.FromContext(ctx).WithValues("config", config.Name) configCopy := config.DeepCopy() configCopy.Status.ObservedGeneration = config.Generation newCondition := metav1.Condition{ Type: ConditionTypeReady, Status: status, Reason: reason, Message: message, LastTransitionTime: metav1.Now(), } // TODO: Use 'meta.SetStatusCondition' from 'k8s.io/apimachinery/pkg/api/meta' for correct conditions updates configCopy.Status.Conditions = []metav1.Condition{newCondition} // TODO: Update NodeTaintStatus based on data from all nodes // configCopy.Status.NodeTaintStatus = ... if err := r.Status().Patch(ctx, configCopy, client.MergeFrom(config)); err != nil { log.Error(err, "Failed to patch NodeTainterConfig status") return err } log.Info("NodeTainterConfig status updated", "reason", reason, "message", message) return nil } // Updates info about Taints for correct Node in CRD status func (r *NodeTainterConfigReconciler) updateNodeTaintStatus(ctx context.Context, node *corev1.Node, appliedTaints []corev1.Taint, errorMsg string) error { log := log.FromContext(ctx).WithValues("node", node.Name) var config configv1alpha1.NodeTainterConfig configKey := types.NamespacedName{Name: GlobalTaintConfigName} if err := r.Get(ctx, configKey, &config); err != nil { log.Error(err, "Failed to get NodeTainterConfig for status update", "configName", GlobalTaintConfigName) return fmt.Errorf("failed to get config %s for status update: %w", GlobalTaintConfigName, err) } configCopy := config.DeepCopy() found := false nodeStatus := configv1alpha1.NodeTaintInfo{ NodeName: node.Name, AppliedTaints: TaintsToStrings(appliedTaints), Error: errorMsg, } for i := range configCopy.Status.NodeTaintStatus { if configCopy.Status.NodeTaintStatus[i].NodeName == node.Name { configCopy.Status.NodeTaintStatus[i] = nodeStatus found = true break } } if !found { configCopy.Status.NodeTaintStatus = append(configCopy.Status.NodeTaintStatus, nodeStatus) } sort.Slice(configCopy.Status.NodeTaintStatus, func(i, j int) bool { return configCopy.Status.NodeTaintStatus[i].NodeName < configCopy.Status.NodeTaintStatus[j].NodeName }) if err := r.Status().Patch(ctx, configCopy, client.MergeFrom(&config)); err != nil { log.Error(err, "Failed to patch NodeTainterConfig status with node info", "node", node.Name) return err } log.V(1).Info("Updated node status in CRD", "applied", nodeStatus.AppliedTaints, "error", nodeStatus.Error) return nil }