/* Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controller import ( "context" "fmt" "sort" "strings" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" apivalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" configv1alpha1 "git.vendetti.ru/andy/operator/api/v1alpha1" ) const ( GlobalTaintConfigName = "global-taint-rules" // Condition Types ConditionTypeReady = "Ready" ConditionReasonConfigParsingError = "ConfigParsingError" ConditionReasonConfigNotFound = "ConfigNotFound" ConditionReasonReady = "Ready" ) // NodeTainterConfigReconciler reconciles a NodeTainterConfig object type NodeTainterConfigReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder } // kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch // +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/status,verbs=get;update;patch // kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/finalizers,verbs=update // For nodes control // +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;update;patch // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // TODO(user): Modify the Reconcile function to compare the state specified by // the NodeTainterConfig object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.20.2/pkg/reconcile func (r *NodeTainterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // _ = log.FromContext(ctx) log := log.FromContext(ctx) // 1. Getting current Node var node corev1.Node if err := r.Get(ctx, req.NamespacedName, &node); err != nil { if apierrors.IsNotFound(err) { // Node deleted, nothing to do log.Info("Node not found. Ignoring.", "node", req.NamespacedName) return ctrl.Result{}, nil } // Other error with getting Node entity log.Error(err, "Failed to get Node", "node", req.NamespacedName) return ctrl.Result{}, err // Repeat } log = log.WithValues("node", node.Name) // Now logger knows Node name // 2. Getting global NodeTainterConfig var config configv1alpha1.NodeTainterConfig configKey := types.NamespacedName{Name: GlobalTaintConfigName} // Namespace is empty for Cluster scope if err := r.Get(ctx, configKey, &config); err != nil { nodeUpdateErr := r.updateNodeTaintStatus(ctx, &node, nil, fmt.Sprintf("Failed to get NodeTainterConfig %s: %v", GlobalTaintConfigName, err)) if nodeUpdateErr != nil { log.Error(nodeUpdateErr, "Failed to update node status after config fetch error") } if apierrors.IsNotFound(err) { log.Error(err, "Global NodeTainterConfig not found", "configName", GlobalTaintConfigName) r.Recorder.Eventf(&node, corev1.EventTypeWarning, "ConfigMissing", "Required NodeTainterConfig '%s' not found", GlobalTaintConfigName) return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil } // Other error with getting CRD log.Error(err, "Failed to get NodeTainterConfig", "configName", GlobalTaintConfigName) return ctrl.Result{}, err // Repeat } log = log.WithValues("node", node.Name, "config", config.Name, "configGeneration", config.Generation) // log.Info("Processing node using rules from NodeTainterConfig", "configName", config.Name) // 3. Parse rules from CRD (this is just a field access now) // Use config.Spec.Rules straight specLabelRules := config.Spec.LabelRules if specLabelRules == nil { specLabelRules = make(map[string]string) // If the field is empty log.Info("Taint rules map is nil in config, assuming no rules", "configName", config.Name) } // Converting map[string]string into map[string]corev1.Taint for convenience parsedRules, parseErrs := parseLabelRulesFromSpec(specLabelRules) if len(parseErrs) > 0 { parsingError := fmt.Errorf("invalid rules found in NodeTainterConfig %s: %v", config.Name, parseErrs) log.Error(parsingError, "Rule parsing failed", "configName", config.Name, "parsingErrors", parseErrs) r.Recorder.Eventf(&config, corev1.EventTypeWarning, "InvalidConfig", parsingError.Error()) _ = r.updateCRDStatus(ctx, &config, metav1.ConditionFalse, ConditionReasonConfigParsingError, parsingError.Error()) _ = r.updateNodeTaintStatus(ctx, &node, nil, parsingError.Error()) return ctrl.Result{}, nil } // log.V(1).Info("Parsed taint rules from CRD", "rulesCount", len(parsedRules)) // 4. Define desired Taints for THAT Node desiredTaints := calculateDesiredTaints(node.Labels, parsedRules) log.V(1).Info("Calculated desired taints", "taints", desiredTaints) // 5. Get current Taints and compare/update originalTaints := node.Spec.Taints // Passing parsedRules for the func to know which Taints are controlled by this operator needsUpdate, newTaints := mergeAndCheckTaints(originalTaints, desiredTaints, parsedRules) // Function var updateErr error if needsUpdate { log.Info("Taints require update", "old", originalTaints, "new", newTaints) nodeCopy := node.DeepCopy() nodeCopy.Spec.Taints = newTaints // Using Patch for atomicity and less conflict risk if err := r.Patch(ctx, nodeCopy, client.MergeFrom(&node)); err != nil { log.Error(err, "Failed to patch Node taints") r.Recorder.Eventf(&node, corev1.EventTypeWarning, "UpdateFailed", "Failed to patch taints: %v", err) updateErr = err } else { log.Info("Successfully patched Node taints") r.Recorder.Eventf(&node, corev1.EventTypeNormal, "TaintsUpdated", "Taints updated based on rules from %s", config.Name) updateErr = r.updateNodeTaintStatus(ctx, &node, newTaints, "") } } else { log.Info("Node taints are already up-to-date") updateErr = r.updateNodeTaintStatus(ctx, &node, originalTaints, "") } if updateErr != nil { return ctrl.Result{}, updateErr } return ctrl.Result{}, nil } // Map function for NodeTainterConfig: Trigger reconcile for ALL nodes when the specific config changes func (r *NodeTainterConfigReconciler) mapConfigToNodes(ctx context.Context, obj client.Object) []reconcile.Request { config, ok := obj.(*configv1alpha1.NodeTainterConfig) log := log.FromContext(ctx) // Interested only in global config if !ok || config.Name != GlobalTaintConfigName { // log.V(1).Info("Ignoring unrelated config change", "object", client.ObjectKeyFromObject(obj)) return nil } log.Info("Global NodeTainterConfig changed, queuing reconciliation for all nodes", "configName", config.Name) var nodeList corev1.NodeList if err := r.List(ctx, &nodeList); err != nil { log.Error(err, "Failed to list nodes for config change") return nil } requests := make([]reconcile.Request, len(nodeList.Items)) for i, node := range nodeList.Items { requests[i] = reconcile.Request{NamespacedName: types.NamespacedName{Name: node.Name}} } log.Info("Queued node reconcile requests", "count", len(requests)) return requests } // SetupWithManager sets up the controller with the Manager. func (r *NodeTainterConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { r.Recorder = mgr.GetEventRecorderFor("nodetainter-controller") return ctrl.NewControllerManagedBy(mgr). // Main resource we are watching is Node For(&corev1.Node{}). // Watching for changes of our global CRD config. // If it changes, queuing ALL Nodes. Watches( &configv1alpha1.NodeTainterConfig{}, handler.EnqueueRequestsFromMapFunc(r.mapConfigToNodes), // For(&operatorv1alpha1.NodeTainterConfig{}). // Named("nodetainterconfig"). ). Complete(r) } // --- UTILS FUNCTIONS --- // TaintToString конвертирует тейнт в строку для статуса/логов func TaintToString(taint *corev1.Taint) string { return fmt.Sprintf("%s=%s:%s", taint.Key, taint.Value, taint.Effect) } // TaintsToString конвертирует слайс тейнтов в слайс строк func TaintsToStrings(taints []corev1.Taint) []string { res := make([]string, len(taints)) for i, t := range taints { res[i] = TaintToString(&t) } sort.Strings(res) // Сортируем для консистентности статуса return res } // parseLabelRulesFromSpec парсит правила из CRD Spec // Возвращает map["labelKey=labelValue"]corev1.Taint и ошибки func parseLabelRulesFromSpec(specLabelRules map[string]string) (map[string]corev1.Taint, []error) { parsed := make(map[string]corev1.Taint) var errs []error for ruleSelector, taintString := range specLabelRules { ruleSelector = strings.TrimSpace(ruleSelector) taintString = strings.TrimSpace(taintString) if ruleSelector == "" || taintString == "" { errs = append(errs, fmt.Errorf("rule has empty selector ('%s') or taint string ('%s')", ruleSelector, taintString)) continue } // Парсим селектор "key=value" partsSelector := strings.SplitN(ruleSelector, "=", 2) if len(partsSelector) != 2 { // Должен быть знак = errs = append(errs, fmt.Errorf("invalid rule selector format '%s': missing '='", ruleSelector)) continue } labelKey := strings.TrimSpace(partsSelector[0]) labelValue := strings.TrimSpace(partsSelector[1]) // Может быть пустым! if labelKey == "" { errs = append(errs, fmt.Errorf("invalid rule selector format '%s': empty label key", ruleSelector)) continue } // Валидируем ключ лейбла if msgs := apivalidation.IsQualifiedName(labelKey); len(msgs) > 0 { errs = append(errs, fmt.Errorf("invalid label key in selector '%s': %v", ruleSelector, msgs)) continue } // Валидируем значение лейбла (если не пустое) if labelValue != "" { if msgs := apivalidation.IsValidLabelValue(labelValue); len(msgs) > 0 { errs = append(errs, fmt.Errorf("invalid label value in selector '%s': %v", ruleSelector, msgs)) continue } } // Парсим строку тейнта "key=value:Effect" (используем улучшенную логику из прошлого ответа) partsEffect := strings.SplitN(taintString, ":", 2) if len(partsEffect) != 2 || partsEffect[1] == "" { errs = append(errs, fmt.Errorf("invalid taint format for rule '%s': '%s' (missing effect)", ruleSelector, taintString)) continue } keyAndValue := partsEffect[0] effectString := partsEffect[1] partsKeyValue := strings.SplitN(keyAndValue, "=", 2) if len(partsKeyValue) != 2 || partsKeyValue[0] == "" || partsKeyValue[1] == "" { errs = append(errs, fmt.Errorf("invalid taint format for rule '%s': '%s' (invalid key/value)", ruleSelector, taintString)) continue } taintKey := partsKeyValue[0] taintValue := partsKeyValue[1] if msgs := apivalidation.IsQualifiedName(taintKey); len(msgs) > 0 { if simpleMsgs := apivalidation.IsDNS1123Label(taintKey); len(simpleMsgs) > 0 { errs = append(errs, fmt.Errorf("invalid taint key for rule '%s': '%s' in '%s' (%v / %v)", ruleSelector, taintKey, taintString, msgs, simpleMsgs)) continue } } var effect corev1.TaintEffect switch effectString { case string(corev1.TaintEffectNoSchedule): effect = corev1.TaintEffectNoSchedule case string(corev1.TaintEffectPreferNoSchedule): effect = corev1.TaintEffectPreferNoSchedule case string(corev1.TaintEffectNoExecute): effect = corev1.TaintEffectNoExecute default: errs = append(errs, fmt.Errorf("invalid taint effect for rule '%s': '%s' in '%s'", ruleSelector, effectString, taintString)) continue } // Все ок taint := corev1.Taint{Key: taintKey, Value: taintValue, Effect: effect} parsed[ruleSelector] = taint // Ключ = "labelKey=labelValue" } return parsed, errs } // calculateDesiredTaints определяет тейнты на основе лейблов ноды и правил func calculateDesiredTaints(nodeLabels map[string]string, parsedLabelRules map[string]corev1.Taint) []corev1.Taint { desired := []corev1.Taint{} foundTaints := make(map[string]bool) // Для уникальности Key:Effect if nodeLabels == nil { nodeLabels = make(map[string]string) // Безопасность } for ruleSelector, taint := range parsedLabelRules { parts := strings.SplitN(ruleSelector, "=", 2) if len(parts) != 2 { continue } // Уже должно быть отва лидировано ruleKey := parts[0] ruleValue := parts[1] // Может быть пустой actualValue, exists := nodeLabels[ruleKey] // Логика сравнения: // 1. Ключ лейбла должен существовать на ноде. // 2. Значение лейбла на ноде должно ТОЧНО совпадать со значением в правиле (включая пустую строку). if exists && actualValue == ruleValue { taintKeyEffect := fmt.Sprintf("%s:%s", taint.Key, taint.Effect) if !foundTaints[taintKeyEffect] { desired = append(desired, taint) foundTaints[taintKeyEffect] = true } } } return desired } // TaintKeyEffect создает уникальную строку для тейнта (Key:Effect) func TaintKeyEffect(taint *corev1.Taint) string { return fmt.Sprintf("%s:%s", taint.Key, taint.Effect) } // mergeAndCheckTaints сравнивает текущие и желаемые тейнты, управляемые оператором. // parsedLabelRules: map["labelKey=labelValue"]corev1.Taint - содержит ВСЕ валидные правила из конфига. func mergeAndCheckTaints(currentTaints []corev1.Taint, desiredTaints []corev1.Taint, parsedLabelRules map[string]corev1.Taint) (bool, []corev1.Taint) { // 1. Определяем, какие типы тейнтов (Key:Effect) управляются нами по всем правилам managedTaintTypes := sets.NewString() for _, ruleTaint := range parsedLabelRules { // Итерируем по значениям (Taint объектам) managedTaintTypes.Insert(TaintKeyEffect(&ruleTaint)) } // 2. Разделяем текущие тейнты на управляемые и неуправляемые currentManagedTaints := make(map[string]corev1.Taint) // key:Effect -> Taint unmanagedTaints := []corev1.Taint{} for _, taint := range currentTaints { ke := TaintKeyEffect(&taint) if managedTaintTypes.Has(ke) { currentManagedTaints[ke] = taint } else { unmanagedTaints = append(unmanagedTaints, taint) } } // 3. Создаем map желаемых тейнтов для быстрого поиска desiredTaintsMap := make(map[string]corev1.Taint) // key:Effect -> Taint for _, taint := range desiredTaints { // Проверка, что желаемый тейнт действительно определен в правилах (на всякий случай) ke := TaintKeyEffect(&taint) if managedTaintTypes.Has(ke) { desiredTaintsMap[ke] = taint } } // 4. Сравниваем управляемые текущие и желаемые needsUpdate := false if len(currentManagedTaints) != len(desiredTaintsMap) { needsUpdate = true } else { for ke, desiredTaint := range desiredTaintsMap { currentTaint, exists := currentManagedTaints[ke] if !exists || currentTaint.Value != desiredTaint.Value { // Сравниваем и значения needsUpdate = true break } } } // 5. Собираем новый список тейнтов, если нужно обновление if needsUpdate { newTaints := make([]corev1.Taint, 0, len(unmanagedTaints)+len(desiredTaintsMap)) newTaints = append(newTaints, unmanagedTaints...) desiredKeys := make([]string, 0, len(desiredTaintsMap)) for ke := range desiredTaintsMap { desiredKeys = append(desiredKeys, ke) } sort.Strings(desiredKeys) // Сортируем для консистентности for _, ke := range desiredKeys { newTaints = append(newTaints, desiredTaintsMap[ke]) } return true, newTaints } return false, currentTaints } // updateCRDStatus обновляет статус ресурса NodeTainterConfig // TODO: Вызывать эту функцию при изменении CRD или при старте/ошибках контроллера. func (r *NodeTainterConfigReconciler) updateCRDStatus(ctx context.Context, config *configv1alpha1.NodeTainterConfig, status metav1.ConditionStatus, reason, message string) error { log := log.FromContext(ctx).WithValues("config", config.Name) configCopy := config.DeepCopy() // Устанавливаем observedGeneration configCopy.Status.ObservedGeneration = config.Generation // Обновляем Condition newCondition := metav1.Condition{ Type: ConditionTypeReady, Status: status, Reason: reason, Message: message, LastTransitionTime: metav1.Now(), } // TODO: Использовать 'meta.SetStatusCondition' из 'k8s.io/apimachinery/pkg/api/meta' для правильного обновления conditions // Примерно так: // meta.SetStatusCondition(&configCopy.Status.Conditions, newCondition) // Пока просто заменяем для простоты configCopy.Status.Conditions = []metav1.Condition{newCondition} // TODO: Обновить NodeTaintStatus на основе данных со всех нод (может быть сложно и затратно) // configCopy.Status.NodeTaintStatus = ... // Используем Patch для обновления статуса if err := r.Status().Patch(ctx, configCopy, client.MergeFrom(config)); err != nil { log.Error(err, "Failed to patch NodeTainterConfig status") return err } log.Info("NodeTainterConfig status updated", "reason", reason, "message", message) return nil } // updateNodeTaintStatus обновляет информацию о тейнтах для конкретной ноды в статусе CRD // TODO: Эта функция в текущем виде будет вызывать конфликты, т.к. каждый Reconcile ноды // будет пытаться перезаписать весь Status.NodeTaintStatus. // Правильный подход: читать текущий статус CRD, обновлять только запись для текущей ноды, патчить. // Это усложняет код, пока оставим так для демонстрации, но ЭТО НУЖНО ИСПРАВИТЬ для production. func (r *NodeTainterConfigReconciler) updateNodeTaintStatus(ctx context.Context, node *corev1.Node, appliedTaints []corev1.Taint, errorMsg string) error { log := log.FromContext(ctx).WithValues("node", node.Name) var config configv1alpha1.NodeTainterConfig configKey := types.NamespacedName{Name: GlobalTaintConfigName} // Получаем CRD еще раз, чтобы обновить его статус if err := r.Get(ctx, configKey, &config); err != nil { log.Error(err, "Failed to get NodeTainterConfig for status update", "configName", GlobalTaintConfigName) // Не можем обновить статус, если не получили CRD return fmt.Errorf("failed to get config %s for status update: %w", GlobalTaintConfigName, err) } configCopy := config.DeepCopy() // Ищем статус для текущей ноды found := false nodeStatus := configv1alpha1.NodeTaintInfo{ NodeName: node.Name, AppliedTaints: TaintsToStrings(appliedTaints), Error: errorMsg, } for i := range configCopy.Status.NodeTaintStatus { if configCopy.Status.NodeTaintStatus[i].NodeName == node.Name { configCopy.Status.NodeTaintStatus[i] = nodeStatus found = true break } } if !found { configCopy.Status.NodeTaintStatus = append(configCopy.Status.NodeTaintStatus, nodeStatus) } // Сортируем для консистентности sort.Slice(configCopy.Status.NodeTaintStatus, func(i, j int) bool { return configCopy.Status.NodeTaintStatus[i].NodeName < configCopy.Status.NodeTaintStatus[j].NodeName }) // Патчим статус if err := r.Status().Patch(ctx, configCopy, client.MergeFrom(&config)); err != nil { log.Error(err, "Failed to patch NodeTainterConfig status with node info", "node", node.Name) return err } log.V(1).Info("Updated node status in CRD", "applied", nodeStatus.AppliedTaints, "error", nodeStatus.Error) return nil }