This repository has been archived on 2025-08-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
operator/internal/controller/nodetainterconfig_controller.go

509 lines
21 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"sort"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
apivalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
configv1alpha1 "git.vendetti.ru/andy/operator/api/v1alpha1"
)
const (
GlobalTaintConfigName = "global-taint-rules"
// Condition Types
ConditionTypeReady = "Ready"
ConditionReasonConfigParsingError = "ConfigParsingError"
ConditionReasonConfigNotFound = "ConfigNotFound"
ConditionReasonReady = "Ready"
)
// NodeTainterConfigReconciler reconciles a NodeTainterConfig object
type NodeTainterConfigReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
// kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch
// +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/status,verbs=get;update;patch
// kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/finalizers,verbs=update
// For nodes control
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the NodeTainterConfig object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.20.2/pkg/reconcile
func (r *NodeTainterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// _ = log.FromContext(ctx)
log := log.FromContext(ctx)
// 1. Getting current Node
var node corev1.Node
if err := r.Get(ctx, req.NamespacedName, &node); err != nil {
if apierrors.IsNotFound(err) {
// Node deleted, nothing to do
log.Info("Node not found. Ignoring.", "node", req.NamespacedName)
return ctrl.Result{}, nil
}
// Other error with getting Node entity
log.Error(err, "Failed to get Node", "node", req.NamespacedName)
return ctrl.Result{}, err // Repeat
}
log = log.WithValues("node", node.Name) // Now logger knows Node name
// 2. Getting global NodeTainterConfig
var config configv1alpha1.NodeTainterConfig
configKey := types.NamespacedName{Name: GlobalTaintConfigName} // Namespace is empty for Cluster scope
if err := r.Get(ctx, configKey, &config); err != nil {
nodeUpdateErr := r.updateNodeTaintStatus(ctx, &node, nil, fmt.Sprintf("Failed to get NodeTainterConfig %s: %v", GlobalTaintConfigName, err))
if nodeUpdateErr != nil {
log.Error(nodeUpdateErr, "Failed to update node status after config fetch error")
}
if apierrors.IsNotFound(err) {
log.Error(err, "Global NodeTainterConfig not found", "configName", GlobalTaintConfigName)
r.Recorder.Eventf(&node, corev1.EventTypeWarning, "ConfigMissing", "Required NodeTainterConfig '%s' not found", GlobalTaintConfigName)
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
// Other error with getting CRD
log.Error(err, "Failed to get NodeTainterConfig", "configName", GlobalTaintConfigName)
return ctrl.Result{}, err // Repeat
}
log = log.WithValues("node", node.Name, "config", config.Name, "configGeneration", config.Generation)
// log.Info("Processing node using rules from NodeTainterConfig", "configName", config.Name)
// 3. Parse rules from CRD (this is just a field access now)
// Use config.Spec.Rules straight
specLabelRules := config.Spec.LabelRules
if specLabelRules == nil {
specLabelRules = make(map[string]string) // If the field is empty
log.Info("Taint rules map is nil in config, assuming no rules", "configName", config.Name)
}
// Converting map[string]string into map[string]corev1.Taint for convenience
parsedRules, parseErrs := parseLabelRulesFromSpec(specLabelRules)
if len(parseErrs) > 0 {
errMsg := fmt.Sprintf("Invalid rules found in NodeTainterConfig %s: %v", config.Name, parseErrs)
log.Error(fmt.Errorf(errMsg), "Rule parsing failed")
r.Recorder.Eventf(&config, corev1.EventTypeWarning, "InvalidConfig", errMsg)
_ = r.updateCRDStatus(ctx, &config, metav1.ConditionFalse, ConditionReasonConfigParsingError, errMsg)
_ = r.updateNodeTaintStatus(ctx, &node, nil, errMsg)
return ctrl.Result{}, nil
}
//log.V(1).Info("Parsed taint rules from CRD", "rulesCount", len(parsedRules))
// 4. Define desired Taints for THAT Node
desiredTaints := calculateDesiredTaints(node.Labels, parsedRules)
log.V(1).Info("Calculated desired taints", "taints", desiredTaints)
// 5. Get current Taints and compare/update
originalTaints := node.Spec.Taints
// Passing parsedRules for the func to know which Taints are controlled by this operator
needsUpdate, newTaints := mergeAndCheckTaints(originalTaints, desiredTaints, parsedRules) // Function
var updateErr error
if needsUpdate {
log.Info("Taints require update", "old", originalTaints, "new", newTaints)
nodeCopy := node.DeepCopy()
nodeCopy.Spec.Taints = newTaints
// Using Patch for atomicity and less conflict risk
if err := r.Patch(ctx, nodeCopy, client.MergeFrom(&node)); err != nil {
log.Error(err, "Failed to patch Node taints")
r.Recorder.Eventf(&node, corev1.EventTypeWarning, "UpdateFailed", "Failed to patch taints: %v", err)
updateErr = err
} else {
log.Info("Successfully patched Node taints")
r.Recorder.Eventf(&node, corev1.EventTypeNormal, "TaintsUpdated", "Taints updated based on rules from %s", config.Name)
updateErr = r.updateNodeTaintStatus(ctx, &node, newTaints, "")
}
} else {
log.Info("Node taints are already up-to-date")
updateErr = r.updateNodeTaintStatus(ctx, &node, originalTaints, "")
}
if updateErr != nil {
return ctrl.Result{}, updateErr
}
return ctrl.Result{}, nil
}
// Map function for NodeTainterConfig: Trigger reconcile for ALL nodes when the specific config changes
func (r *NodeTainterConfigReconciler) mapConfigToNodes(ctx context.Context, obj client.Object) []reconcile.Request {
config, ok := obj.(*configv1alpha1.NodeTainterConfig)
log := log.FromContext(ctx)
// Interested only in global config
if !ok || config.Name != GlobalTaintConfigName {
// log.V(1).Info("Ignoring unrelated config change", "object", client.ObjectKeyFromObject(obj))
return nil
}
log.Info("Global NodeTainterConfig changed, queuing reconciliation for all nodes", "configName", config.Name)
var nodeList corev1.NodeList
if err := r.List(ctx, &nodeList); err != nil {
log.Error(err, "Failed to list nodes for config change")
return nil
}
requests := make([]reconcile.Request, len(nodeList.Items))
for i, node := range nodeList.Items {
requests[i] = reconcile.Request{NamespacedName: types.NamespacedName{Name: node.Name}}
}
log.Info("Queued node reconcile requests", "count", len(requests))
return requests
}
// SetupWithManager sets up the controller with the Manager.
func (r *NodeTainterConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.Recorder = mgr.GetEventRecorderFor("nodetainter-controller")
return ctrl.NewControllerManagedBy(mgr).
// Main resource we are watching is Node
For(&corev1.Node{}).
// Watching for changes of our global CRD config.
// If it changes, queuing ALL Nodes.
Watches(
&configv1alpha1.NodeTainterConfig{},
handler.EnqueueRequestsFromMapFunc(r.mapConfigToNodes),
// For(&operatorv1alpha1.NodeTainterConfig{}).
// Named("nodetainterconfig").
).
Complete(r)
}
// --- UTILS FUNCTIONS ---
// TaintToString конвертирует тейнт в строку для статуса/логов
func TaintToString(taint *corev1.Taint) string {
return fmt.Sprintf("%s=%s:%s", taint.Key, taint.Value, taint.Effect)
}
// TaintsToString конвертирует слайс тейнтов в слайс строк
func TaintsToStrings(taints []corev1.Taint) []string {
res := make([]string, len(taints))
for i, t := range taints {
res[i] = TaintToString(&t)
}
sort.Strings(res) // Сортируем для консистентности статуса
return res
}
// parseLabelRulesFromSpec парсит правила из CRD Spec
// Возвращает map["labelKey=labelValue"]corev1.Taint и ошибки
func parseLabelRulesFromSpec(specLabelRules map[string]string) (map[string]corev1.Taint, []error) {
parsed := make(map[string]corev1.Taint)
var errs []error
for ruleSelector, taintString := range specLabelRules {
ruleSelector = strings.TrimSpace(ruleSelector)
taintString = strings.TrimSpace(taintString)
if ruleSelector == "" || taintString == "" {
errs = append(errs, fmt.Errorf("rule has empty selector ('%s') or taint string ('%s')", ruleSelector, taintString))
continue
}
// Парсим селектор "key=value"
partsSelector := strings.SplitN(ruleSelector, "=", 2)
if len(partsSelector) != 2 { // Должен быть знак =
errs = append(errs, fmt.Errorf("invalid rule selector format '%s': missing '='", ruleSelector))
continue
}
labelKey := strings.TrimSpace(partsSelector[0])
labelValue := strings.TrimSpace(partsSelector[1]) // Может быть пустым!
if labelKey == "" {
errs = append(errs, fmt.Errorf("invalid rule selector format '%s': empty label key", ruleSelector))
continue
}
// Валидируем ключ лейбла
if msgs := apivalidation.IsQualifiedName(labelKey); len(msgs) > 0 {
errs = append(errs, fmt.Errorf("invalid label key in selector '%s': %v", ruleSelector, msgs))
continue
}
// Валидируем значение лейбла (если не пустое)
if labelValue != "" {
if msgs := apivalidation.IsValidLabelValue(labelValue); len(msgs) > 0 {
errs = append(errs, fmt.Errorf("invalid label value in selector '%s': %v", ruleSelector, msgs))
continue
}
}
// Парсим строку тейнта "key=value:Effect" (используем улучшенную логику из прошлого ответа)
partsEffect := strings.SplitN(taintString, ":", 2)
if len(partsEffect) != 2 || partsEffect[1] == "" {
errs = append(errs, fmt.Errorf("invalid taint format for rule '%s': '%s' (missing effect)", ruleSelector, taintString))
continue
}
keyAndValue := partsEffect[0]
effectString := partsEffect[1]
partsKeyValue := strings.SplitN(keyAndValue, "=", 2)
if len(partsKeyValue) != 2 || partsKeyValue[0] == "" || partsKeyValue[1] == "" {
errs = append(errs, fmt.Errorf("invalid taint format for rule '%s': '%s' (invalid key/value)", ruleSelector, taintString))
continue
}
taintKey := partsKeyValue[0]
taintValue := partsKeyValue[1]
if msgs := apivalidation.IsQualifiedName(taintKey); len(msgs) > 0 {
if simpleMsgs := apivalidation.IsDNS1123Label(taintKey); len(simpleMsgs) > 0 {
errs = append(errs, fmt.Errorf("invalid taint key for rule '%s': '%s' in '%s' (%v / %v)", ruleSelector, taintKey, taintString, msgs, simpleMsgs))
continue
}
}
var effect corev1.TaintEffect
switch effectString {
case string(corev1.TaintEffectNoSchedule):
effect = corev1.TaintEffectNoSchedule
case string(corev1.TaintEffectPreferNoSchedule):
effect = corev1.TaintEffectPreferNoSchedule
case string(corev1.TaintEffectNoExecute):
effect = corev1.TaintEffectNoExecute
default:
errs = append(errs, fmt.Errorf("invalid taint effect for rule '%s': '%s' in '%s'", ruleSelector, effectString, taintString))
continue
}
// Все ок
taint := corev1.Taint{Key: taintKey, Value: taintValue, Effect: effect}
parsed[ruleSelector] = taint // Ключ = "labelKey=labelValue"
}
return parsed, errs
}
// calculateDesiredTaints определяет тейнты на основе лейблов ноды и правил
func calculateDesiredTaints(nodeLabels map[string]string, parsedLabelRules map[string]corev1.Taint) []corev1.Taint {
desired := []corev1.Taint{}
foundTaints := make(map[string]bool) // Для уникальности Key:Effect
if nodeLabels == nil {
nodeLabels = make(map[string]string) // Безопасность
}
for ruleSelector, taint := range parsedLabelRules {
parts := strings.SplitN(ruleSelector, "=", 2)
if len(parts) != 2 {
continue
} // Уже должно быть отва лидировано
ruleKey := parts[0]
ruleValue := parts[1] // Может быть пустой
actualValue, exists := nodeLabels[ruleKey]
// Логика сравнения:
// 1. Ключ лейбла должен существовать на ноде.
// 2. Значение лейбла на ноде должно ТОЧНО совпадать со значением в правиле (включая пустую строку).
if exists && actualValue == ruleValue {
taintKeyEffect := fmt.Sprintf("%s:%s", taint.Key, taint.Effect)
if !foundTaints[taintKeyEffect] {
desired = append(desired, taint)
foundTaints[taintKeyEffect] = true
}
}
}
return desired
}
// TaintKeyEffect создает уникальную строку для тейнта (Key:Effect)
func TaintKeyEffect(taint *corev1.Taint) string {
return fmt.Sprintf("%s:%s", taint.Key, taint.Effect)
}
// mergeAndCheckTaints сравнивает текущие и желаемые тейнты, управляемые оператором.
// parsedLabelRules: map["labelKey=labelValue"]corev1.Taint - содержит ВСЕ валидные правила из конфига.
func mergeAndCheckTaints(currentTaints []corev1.Taint, desiredTaints []corev1.Taint, parsedLabelRules map[string]corev1.Taint) (bool, []corev1.Taint) {
// 1. Определяем, какие типы тейнтов (Key:Effect) управляются нами по всем правилам
managedTaintTypes := sets.NewString()
for _, ruleTaint := range parsedLabelRules { // Итерируем по значениям (Taint объектам)
managedTaintTypes.Insert(TaintKeyEffect(&ruleTaint))
}
// 2. Разделяем текущие тейнты на управляемые и неуправляемые
currentManagedTaints := make(map[string]corev1.Taint) // key:Effect -> Taint
unmanagedTaints := []corev1.Taint{}
for _, taint := range currentTaints {
ke := TaintKeyEffect(&taint)
if managedTaintTypes.Has(ke) {
currentManagedTaints[ke] = taint
} else {
unmanagedTaints = append(unmanagedTaints, taint)
}
}
// 3. Создаем map желаемых тейнтов для быстрого поиска
desiredTaintsMap := make(map[string]corev1.Taint) // key:Effect -> Taint
for _, taint := range desiredTaints {
// Проверка, что желаемый тейнт действительно определен в правилах (на всякий случай)
ke := TaintKeyEffect(&taint)
if managedTaintTypes.Has(ke) {
desiredTaintsMap[ke] = taint
}
}
// 4. Сравниваем управляемые текущие и желаемые
needsUpdate := false
if len(currentManagedTaints) != len(desiredTaintsMap) {
needsUpdate = true
} else {
for ke, desiredTaint := range desiredTaintsMap {
currentTaint, exists := currentManagedTaints[ke]
if !exists || currentTaint.Value != desiredTaint.Value { // Сравниваем и значения
needsUpdate = true
break
}
}
}
// 5. Собираем новый список тейнтов, если нужно обновление
if needsUpdate {
newTaints := make([]corev1.Taint, 0, len(unmanagedTaints)+len(desiredTaintsMap))
newTaints = append(newTaints, unmanagedTaints...)
desiredKeys := make([]string, 0, len(desiredTaintsMap))
for ke := range desiredTaintsMap {
desiredKeys = append(desiredKeys, ke)
}
sort.Strings(desiredKeys) // Сортируем для консистентности
for _, ke := range desiredKeys {
newTaints = append(newTaints, desiredTaintsMap[ke])
}
return true, newTaints
}
return false, currentTaints
}
// updateCRDStatus обновляет статус ресурса NodeTainterConfig
// TODO: Вызывать эту функцию при изменении CRD или при старте/ошибках контроллера.
func (r *NodeTainterConfigReconciler) updateCRDStatus(ctx context.Context, config *configv1alpha1.NodeTainterConfig, status metav1.ConditionStatus, reason, message string) error {
log := log.FromContext(ctx).WithValues("config", config.Name)
configCopy := config.DeepCopy()
// Устанавливаем observedGeneration
configCopy.Status.ObservedGeneration = config.Generation
// Обновляем Condition
newCondition := metav1.Condition{
Type: ConditionTypeReady,
Status: status,
Reason: reason,
Message: message,
LastTransitionTime: metav1.Now(),
}
// TODO: Использовать 'meta.SetStatusCondition' из 'k8s.io/apimachinery/pkg/api/meta' для правильного обновления conditions
// Примерно так:
// meta.SetStatusCondition(&configCopy.Status.Conditions, newCondition)
// Пока просто заменяем для простоты
configCopy.Status.Conditions = []metav1.Condition{newCondition}
// TODO: Обновить NodeTaintStatus на основе данных со всех нод (может быть сложно и затратно)
// configCopy.Status.NodeTaintStatus = ...
// Используем Patch для обновления статуса
if err := r.Status().Patch(ctx, configCopy, client.MergeFrom(config)); err != nil {
log.Error(err, "Failed to patch NodeTainterConfig status")
return err
}
log.Info("NodeTainterConfig status updated", "reason", reason, "message", message)
return nil
}
// updateNodeTaintStatus обновляет информацию о тейнтах для конкретной ноды в статусе CRD
// TODO: Эта функция в текущем виде будет вызывать конфликты, т.к. каждый Reconcile ноды
// будет пытаться перезаписать весь Status.NodeTaintStatus.
// Правильный подход: читать текущий статус CRD, обновлять только запись для текущей ноды, патчить.
// Это усложняет код, пока оставим так для демонстрации, но ЭТО НУЖНО ИСПРАВИТЬ для production.
func (r *NodeTainterConfigReconciler) updateNodeTaintStatus(ctx context.Context, node *corev1.Node, appliedTaints []corev1.Taint, errorMsg string) error {
log := log.FromContext(ctx).WithValues("node", node.Name)
var config configv1alpha1.NodeTainterConfig
configKey := types.NamespacedName{Name: GlobalTaintConfigName}
// Получаем CRD еще раз, чтобы обновить его статус
if err := r.Get(ctx, configKey, &config); err != nil {
log.Error(err, "Failed to get NodeTainterConfig for status update", "configName", GlobalTaintConfigName)
// Не можем обновить статус, если не получили CRD
return fmt.Errorf("failed to get config %s for status update: %w", GlobalTaintConfigName, err)
}
configCopy := config.DeepCopy()
// Ищем статус для текущей ноды
found := false
nodeStatus := configv1alpha1.NodeTaintInfo{
NodeName: node.Name,
AppliedTaints: TaintsToStrings(appliedTaints),
Error: errorMsg,
}
for i := range configCopy.Status.NodeTaintStatus {
if configCopy.Status.NodeTaintStatus[i].NodeName == node.Name {
configCopy.Status.NodeTaintStatus[i] = nodeStatus
found = true
break
}
}
if !found {
configCopy.Status.NodeTaintStatus = append(configCopy.Status.NodeTaintStatus, nodeStatus)
}
// Сортируем для консистентности
sort.Slice(configCopy.Status.NodeTaintStatus, func(i, j int) bool {
return configCopy.Status.NodeTaintStatus[i].NodeName < configCopy.Status.NodeTaintStatus[j].NodeName
})
// Патчим статус
if err := r.Status().Patch(ctx, configCopy, client.MergeFrom(&config)); err != nil {
log.Error(err, "Failed to patch NodeTainterConfig status with node info", "node", node.Name)
return err
}
log.V(1).Info("Updated node status in CRD", "applied", nodeStatus.AppliedTaints, "error", nodeStatus.Error)
return nil
}