mvp of Node Tainter

This commit is contained in:
2025-04-29 22:54:55 +05:00
parent ac36dd70b6
commit 01b75983e3
8 changed files with 700 additions and 21 deletions

View File

@@ -18,24 +18,54 @@ package controller
import (
"context"
"fmt"
"sort"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
apivalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
operatorv1alpha1 "git.vendetti.ru/andy/operator/api/v1alpha1"
configv1alpha1 "git.vendetti.ru/andy/operator/api/v1alpha1"
)
const (
GlobalTaintConfigName = "global-taint-rules"
// Condition Types
ConditionTypeReady = "Ready"
ConditionReasonConfigParsingError = "ConfigParsingError"
ConditionReasonConfigNotFound = "ConfigNotFound"
ConditionReasonReady = "Ready"
)
// NodeTainterConfigReconciler reconciles a NodeTainterConfig object
type NodeTainterConfigReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
// +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch;create;update;patch;delete
// kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch
// +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/finalizers,verbs=update
// kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/finalizers,verbs=update
// For nodes control
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
@@ -47,17 +77,432 @@ type NodeTainterConfigReconciler struct {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.20.2/pkg/reconcile
func (r *NodeTainterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
// _ = log.FromContext(ctx)
log := log.FromContext(ctx)
// TODO(user): your logic here
// 1. Getting current Node
var node corev1.Node
if err := r.Get(ctx, req.NamespacedName, &node); err != nil {
if apierrors.IsNotFound(err) {
// Node deleted, nothing to do
log.Info("Node not found. Ignoring.", "node", req.NamespacedName)
return ctrl.Result{}, nil
}
// Other error with getting Node entity
log.Error(err, "Failed to get Node", "node", req.NamespacedName)
return ctrl.Result{}, err // Repeat
}
log = log.WithValues("node", node.Name) // Now logger knows Node name
// 2. Getting global NodeTainterConfig
var config configv1alpha1.NodeTainterConfig
configKey := types.NamespacedName{Name: GlobalTaintConfigName} // Namespace is empty for Cluster scope
if err := r.Get(ctx, configKey, &config); err != nil {
nodeUpdateErr := r.updateNodeTaintStatus(ctx, &node, nil, fmt.Sprintf("Failed to get NodeTainterConfig %s: %v", GlobalTaintConfigName, err))
if nodeUpdateErr != nil {
log.Error(nodeUpdateErr, "Failed to update node status after config fetch error")
}
if apierrors.IsNotFound(err) {
log.Error(err, "Global NodeTainterConfig not found", "configName", GlobalTaintConfigName)
r.Recorder.Eventf(&node, corev1.EventTypeWarning, "ConfigMissing", "Required NodeTainterConfig '%s' not found", GlobalTaintConfigName)
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
// Other error with getting CRD
log.Error(err, "Failed to get NodeTainterConfig", "configName", GlobalTaintConfigName)
return ctrl.Result{}, err // Repeat
}
log = log.WithValues("node", node.Name, "config", config.Name, "configGeneration", config.Generation)
// log.Info("Processing node using rules from NodeTainterConfig", "configName", config.Name)
// 3. Parse rules from CRD (this is just a field access now)
// Use config.Spec.Rules straight
specLabelRules := config.Spec.LabelRules
if specLabelRules == nil {
specLabelRules = make(map[string]string) // If the field is empty
log.Info("Taint rules map is nil in config, assuming no rules", "configName", config.Name)
}
// Converting map[string]string into map[string]corev1.Taint for convenience
parsedRules, parseErrs := parseLabelRulesFromSpec(specLabelRules)
if len(parseErrs) > 0 {
errMsg := fmt.Sprintf("Invalid rules found in NodeTainterConfig %s: %v", config.Name, parseErrs)
log.Error(fmt.Errorf(errMsg), "Rule parsing failed")
r.Recorder.Eventf(&config, corev1.EventTypeWarning, "InvalidConfig", errMsg)
_ = r.updateCRDStatus(ctx, &config, metav1.ConditionFalse, ConditionReasonConfigParsingError, errMsg)
_ = r.updateNodeTaintStatus(ctx, &node, nil, errMsg)
return ctrl.Result{}, nil
}
//log.V(1).Info("Parsed taint rules from CRD", "rulesCount", len(parsedRules))
// 4. Define desired Taints for THAT Node
desiredTaints := calculateDesiredTaints(node.Labels, parsedRules)
log.V(1).Info("Calculated desired taints", "taints", desiredTaints)
// 5. Get current Taints and compare/update
originalTaints := node.Spec.Taints
// Passing parsedRules for the func to know which Taints are controlled by this operator
needsUpdate, newTaints := mergeAndCheckTaints(originalTaints, desiredTaints, parsedRules) // Function
var updateErr error
if needsUpdate {
log.Info("Taints require update", "old", originalTaints, "new", newTaints)
nodeCopy := node.DeepCopy()
nodeCopy.Spec.Taints = newTaints
// Using Patch for atomicity and less conflict risk
if err := r.Patch(ctx, nodeCopy, client.MergeFrom(&node)); err != nil {
log.Error(err, "Failed to patch Node taints")
r.Recorder.Eventf(&node, corev1.EventTypeWarning, "UpdateFailed", "Failed to patch taints: %v", err)
updateErr = err
} else {
log.Info("Successfully patched Node taints")
r.Recorder.Eventf(&node, corev1.EventTypeNormal, "TaintsUpdated", "Taints updated based on rules from %s", config.Name)
updateErr = r.updateNodeTaintStatus(ctx, &node, newTaints, "")
}
} else {
log.Info("Node taints are already up-to-date")
updateErr = r.updateNodeTaintStatus(ctx, &node, originalTaints, "")
}
if updateErr != nil {
return ctrl.Result{}, updateErr
}
return ctrl.Result{}, nil
}
// Map function for NodeTainterConfig: Trigger reconcile for ALL nodes when the specific config changes
func (r *NodeTainterConfigReconciler) mapConfigToNodes(ctx context.Context, obj client.Object) []reconcile.Request {
config, ok := obj.(*configv1alpha1.NodeTainterConfig)
log := log.FromContext(ctx)
// Interested only in global config
if !ok || config.Name != GlobalTaintConfigName {
// log.V(1).Info("Ignoring unrelated config change", "object", client.ObjectKeyFromObject(obj))
return nil
}
log.Info("Global NodeTainterConfig changed, queuing reconciliation for all nodes", "configName", config.Name)
var nodeList corev1.NodeList
if err := r.List(ctx, &nodeList); err != nil {
log.Error(err, "Failed to list nodes for config change")
return nil
}
requests := make([]reconcile.Request, len(nodeList.Items))
for i, node := range nodeList.Items {
requests[i] = reconcile.Request{NamespacedName: types.NamespacedName{Name: node.Name}}
}
log.Info("Queued node reconcile requests", "count", len(requests))
return requests
}
// SetupWithManager sets up the controller with the Manager.
func (r *NodeTainterConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.Recorder = mgr.GetEventRecorderFor("nodetainter-controller")
return ctrl.NewControllerManagedBy(mgr).
For(&operatorv1alpha1.NodeTainterConfig{}).
Named("nodetainterconfig").
// Main resource we are watching is Node
For(&corev1.Node{}).
// Watching for changes of our global CRD config.
// If it changes, queuing ALL Nodes.
Watches(
&configv1alpha1.NodeTainterConfig{},
handler.EnqueueRequestsFromMapFunc(r.mapConfigToNodes),
// For(&operatorv1alpha1.NodeTainterConfig{}).
// Named("nodetainterconfig").
).
Complete(r)
}
// --- UTILS FUNCTIONS ---
// TaintToString конвертирует тейнт в строку для статуса/логов
func TaintToString(taint *corev1.Taint) string {
return fmt.Sprintf("%s=%s:%s", taint.Key, taint.Value, taint.Effect)
}
// TaintsToString конвертирует слайс тейнтов в слайс строк
func TaintsToStrings(taints []corev1.Taint) []string {
res := make([]string, len(taints))
for i, t := range taints {
res[i] = TaintToString(&t)
}
sort.Strings(res) // Сортируем для консистентности статуса
return res
}
// parseLabelRulesFromSpec парсит правила из CRD Spec
// Возвращает map["labelKey=labelValue"]corev1.Taint и ошибки
func parseLabelRulesFromSpec(specLabelRules map[string]string) (map[string]corev1.Taint, []error) {
parsed := make(map[string]corev1.Taint)
var errs []error
for ruleSelector, taintString := range specLabelRules {
ruleSelector = strings.TrimSpace(ruleSelector)
taintString = strings.TrimSpace(taintString)
if ruleSelector == "" || taintString == "" {
errs = append(errs, fmt.Errorf("rule has empty selector ('%s') or taint string ('%s')", ruleSelector, taintString))
continue
}
// Парсим селектор "key=value"
partsSelector := strings.SplitN(ruleSelector, "=", 2)
if len(partsSelector) != 2 { // Должен быть знак =
errs = append(errs, fmt.Errorf("invalid rule selector format '%s': missing '='", ruleSelector))
continue
}
labelKey := strings.TrimSpace(partsSelector[0])
labelValue := strings.TrimSpace(partsSelector[1]) // Может быть пустым!
if labelKey == "" {
errs = append(errs, fmt.Errorf("invalid rule selector format '%s': empty label key", ruleSelector))
continue
}
// Валидируем ключ лейбла
if msgs := apivalidation.IsQualifiedName(labelKey); len(msgs) > 0 {
errs = append(errs, fmt.Errorf("invalid label key in selector '%s': %v", ruleSelector, msgs))
continue
}
// Валидируем значение лейбла (если не пустое)
if labelValue != "" {
if msgs := apivalidation.IsValidLabelValue(labelValue); len(msgs) > 0 {
errs = append(errs, fmt.Errorf("invalid label value in selector '%s': %v", ruleSelector, msgs))
continue
}
}
// Парсим строку тейнта "key=value:Effect" (используем улучшенную логику из прошлого ответа)
partsEffect := strings.SplitN(taintString, ":", 2)
if len(partsEffect) != 2 || partsEffect[1] == "" {
errs = append(errs, fmt.Errorf("invalid taint format for rule '%s': '%s' (missing effect)", ruleSelector, taintString))
continue
}
keyAndValue := partsEffect[0]
effectString := partsEffect[1]
partsKeyValue := strings.SplitN(keyAndValue, "=", 2)
if len(partsKeyValue) != 2 || partsKeyValue[0] == "" || partsKeyValue[1] == "" {
errs = append(errs, fmt.Errorf("invalid taint format for rule '%s': '%s' (invalid key/value)", ruleSelector, taintString))
continue
}
taintKey := partsKeyValue[0]
taintValue := partsKeyValue[1]
if msgs := apivalidation.IsQualifiedName(taintKey); len(msgs) > 0 {
if simpleMsgs := apivalidation.IsDNS1123Label(taintKey); len(simpleMsgs) > 0 {
errs = append(errs, fmt.Errorf("invalid taint key for rule '%s': '%s' in '%s' (%v / %v)", ruleSelector, taintKey, taintString, msgs, simpleMsgs))
continue
}
}
var effect corev1.TaintEffect
switch effectString {
case string(corev1.TaintEffectNoSchedule):
effect = corev1.TaintEffectNoSchedule
case string(corev1.TaintEffectPreferNoSchedule):
effect = corev1.TaintEffectPreferNoSchedule
case string(corev1.TaintEffectNoExecute):
effect = corev1.TaintEffectNoExecute
default:
errs = append(errs, fmt.Errorf("invalid taint effect for rule '%s': '%s' in '%s'", ruleSelector, effectString, taintString))
continue
}
// Все ок
taint := corev1.Taint{Key: taintKey, Value: taintValue, Effect: effect}
parsed[ruleSelector] = taint // Ключ = "labelKey=labelValue"
}
return parsed, errs
}
// calculateDesiredTaints определяет тейнты на основе лейблов ноды и правил
func calculateDesiredTaints(nodeLabels map[string]string, parsedLabelRules map[string]corev1.Taint) []corev1.Taint {
desired := []corev1.Taint{}
foundTaints := make(map[string]bool) // Для уникальности Key:Effect
if nodeLabels == nil {
nodeLabels = make(map[string]string) // Безопасность
}
for ruleSelector, taint := range parsedLabelRules {
parts := strings.SplitN(ruleSelector, "=", 2)
if len(parts) != 2 {
continue
} // Уже должно быть отва лидировано
ruleKey := parts[0]
ruleValue := parts[1] // Может быть пустой
actualValue, exists := nodeLabels[ruleKey]
// Логика сравнения:
// 1. Ключ лейбла должен существовать на ноде.
// 2. Значение лейбла на ноде должно ТОЧНО совпадать со значением в правиле (включая пустую строку).
if exists && actualValue == ruleValue {
taintKeyEffect := fmt.Sprintf("%s:%s", taint.Key, taint.Effect)
if !foundTaints[taintKeyEffect] {
desired = append(desired, taint)
foundTaints[taintKeyEffect] = true
}
}
}
return desired
}
// TaintKeyEffect создает уникальную строку для тейнта (Key:Effect)
func TaintKeyEffect(taint *corev1.Taint) string {
return fmt.Sprintf("%s:%s", taint.Key, taint.Effect)
}
// mergeAndCheckTaints сравнивает текущие и желаемые тейнты, управляемые оператором.
// parsedLabelRules: map["labelKey=labelValue"]corev1.Taint - содержит ВСЕ валидные правила из конфига.
func mergeAndCheckTaints(currentTaints []corev1.Taint, desiredTaints []corev1.Taint, parsedLabelRules map[string]corev1.Taint) (bool, []corev1.Taint) {
// 1. Определяем, какие типы тейнтов (Key:Effect) управляются нами по всем правилам
managedTaintTypes := sets.NewString()
for _, ruleTaint := range parsedLabelRules { // Итерируем по значениям (Taint объектам)
managedTaintTypes.Insert(TaintKeyEffect(&ruleTaint))
}
// 2. Разделяем текущие тейнты на управляемые и неуправляемые
currentManagedTaints := make(map[string]corev1.Taint) // key:Effect -> Taint
unmanagedTaints := []corev1.Taint{}
for _, taint := range currentTaints {
ke := TaintKeyEffect(&taint)
if managedTaintTypes.Has(ke) {
currentManagedTaints[ke] = taint
} else {
unmanagedTaints = append(unmanagedTaints, taint)
}
}
// 3. Создаем map желаемых тейнтов для быстрого поиска
desiredTaintsMap := make(map[string]corev1.Taint) // key:Effect -> Taint
for _, taint := range desiredTaints {
// Проверка, что желаемый тейнт действительно определен в правилах (на всякий случай)
ke := TaintKeyEffect(&taint)
if managedTaintTypes.Has(ke) {
desiredTaintsMap[ke] = taint
}
}
// 4. Сравниваем управляемые текущие и желаемые
needsUpdate := false
if len(currentManagedTaints) != len(desiredTaintsMap) {
needsUpdate = true
} else {
for ke, desiredTaint := range desiredTaintsMap {
currentTaint, exists := currentManagedTaints[ke]
if !exists || currentTaint.Value != desiredTaint.Value { // Сравниваем и значения
needsUpdate = true
break
}
}
}
// 5. Собираем новый список тейнтов, если нужно обновление
if needsUpdate {
newTaints := make([]corev1.Taint, 0, len(unmanagedTaints)+len(desiredTaintsMap))
newTaints = append(newTaints, unmanagedTaints...)
desiredKeys := make([]string, 0, len(desiredTaintsMap))
for ke := range desiredTaintsMap {
desiredKeys = append(desiredKeys, ke)
}
sort.Strings(desiredKeys) // Сортируем для консистентности
for _, ke := range desiredKeys {
newTaints = append(newTaints, desiredTaintsMap[ke])
}
return true, newTaints
}
return false, currentTaints
}
// updateCRDStatus обновляет статус ресурса NodeTainterConfig
// TODO: Вызывать эту функцию при изменении CRD или при старте/ошибках контроллера.
func (r *NodeTainterConfigReconciler) updateCRDStatus(ctx context.Context, config *configv1alpha1.NodeTainterConfig, status metav1.ConditionStatus, reason, message string) error {
log := log.FromContext(ctx).WithValues("config", config.Name)
configCopy := config.DeepCopy()
// Устанавливаем observedGeneration
configCopy.Status.ObservedGeneration = config.Generation
// Обновляем Condition
newCondition := metav1.Condition{
Type: ConditionTypeReady,
Status: status,
Reason: reason,
Message: message,
LastTransitionTime: metav1.Now(),
}
// TODO: Использовать 'meta.SetStatusCondition' из 'k8s.io/apimachinery/pkg/api/meta' для правильного обновления conditions
// Примерно так:
// meta.SetStatusCondition(&configCopy.Status.Conditions, newCondition)
// Пока просто заменяем для простоты
configCopy.Status.Conditions = []metav1.Condition{newCondition}
// TODO: Обновить NodeTaintStatus на основе данных со всех нод (может быть сложно и затратно)
// configCopy.Status.NodeTaintStatus = ...
// Используем Patch для обновления статуса
if err := r.Status().Patch(ctx, configCopy, client.MergeFrom(config)); err != nil {
log.Error(err, "Failed to patch NodeTainterConfig status")
return err
}
log.Info("NodeTainterConfig status updated", "reason", reason, "message", message)
return nil
}
// updateNodeTaintStatus обновляет информацию о тейнтах для конкретной ноды в статусе CRD
// TODO: Эта функция в текущем виде будет вызывать конфликты, т.к. каждый Reconcile ноды
// будет пытаться перезаписать весь Status.NodeTaintStatus.
// Правильный подход: читать текущий статус CRD, обновлять только запись для текущей ноды, патчить.
// Это усложняет код, пока оставим так для демонстрации, но ЭТО НУЖНО ИСПРАВИТЬ для production.
func (r *NodeTainterConfigReconciler) updateNodeTaintStatus(ctx context.Context, node *corev1.Node, appliedTaints []corev1.Taint, errorMsg string) error {
log := log.FromContext(ctx).WithValues("node", node.Name)
var config configv1alpha1.NodeTainterConfig
configKey := types.NamespacedName{Name: GlobalTaintConfigName}
// Получаем CRD еще раз, чтобы обновить его статус
if err := r.Get(ctx, configKey, &config); err != nil {
log.Error(err, "Failed to get NodeTainterConfig for status update", "configName", GlobalTaintConfigName)
// Не можем обновить статус, если не получили CRD
return fmt.Errorf("failed to get config %s for status update: %w", GlobalTaintConfigName, err)
}
configCopy := config.DeepCopy()
// Ищем статус для текущей ноды
found := false
nodeStatus := configv1alpha1.NodeTaintInfo{
NodeName: node.Name,
AppliedTaints: TaintsToStrings(appliedTaints),
Error: errorMsg,
}
for i := range configCopy.Status.NodeTaintStatus {
if configCopy.Status.NodeTaintStatus[i].NodeName == node.Name {
configCopy.Status.NodeTaintStatus[i] = nodeStatus
found = true
break
}
}
if !found {
configCopy.Status.NodeTaintStatus = append(configCopy.Status.NodeTaintStatus, nodeStatus)
}
// Сортируем для консистентности
sort.Slice(configCopy.Status.NodeTaintStatus, func(i, j int) bool {
return configCopy.Status.NodeTaintStatus[i].NodeName < configCopy.Status.NodeTaintStatus[j].NodeName
})
// Патчим статус
if err := r.Status().Patch(ctx, configCopy, client.MergeFrom(&config)); err != nil {
log.Error(err, "Failed to patch NodeTainterConfig status with node info", "node", node.Name)
return err
}
log.V(1).Info("Updated node status in CRD", "applied", nodeStatus.AppliedTaints, "error", nodeStatus.Error)
return nil
}