This repository has been archived on 2025-08-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
operator/internal/controller/nodetainterconfig_controller.go
Andy Kolibri Vendetti fc5f580243
All checks were successful
Lint / Run on Ubuntu (push) Successful in 26s
Tests / Run on Ubuntu (push) Successful in 27s
comments fixes
2025-04-30 01:04:13 +05:00

481 lines
17 KiB
Go

/*
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"sort"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
apivalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
configv1alpha1 "git.vendetti.ru/andy/operator/api/v1alpha1"
)
const (
GlobalTaintConfigName = "global-taint-rules"
// Condition Types
ConditionTypeReady = "Ready"
ConditionReasonConfigParsingError = "ConfigParsingError"
ConditionReasonConfigNotFound = "ConfigNotFound"
ConditionReasonReady = "Ready"
)
// NodeTainterConfigReconciler reconciles a NodeTainterConfig object
type NodeTainterConfigReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
// kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch
// +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/status,verbs=get;update;patch
// kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs/finalizers,verbs=update
// For nodes control
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the NodeTainterConfig object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.20.2/pkg/reconcile
func (r *NodeTainterConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// _ = log.FromContext(ctx)
log := log.FromContext(ctx)
// 1. Getting current Node
var node corev1.Node
if err := r.Get(ctx, req.NamespacedName, &node); err != nil {
if apierrors.IsNotFound(err) {
// Node deleted, nothing to do
log.Info("Node not found. Ignoring.", "node", req.NamespacedName)
return ctrl.Result{}, nil
}
// Other error with getting Node entity
log.Error(err, "Failed to get Node", "node", req.NamespacedName)
return ctrl.Result{}, err // Repeat
}
log = log.WithValues("node", node.Name) // Now logger knows Node name
// 2. Getting global NodeTainterConfig
var config configv1alpha1.NodeTainterConfig
configKey := types.NamespacedName{Name: GlobalTaintConfigName} // Namespace is empty for Cluster scope
if err := r.Get(ctx, configKey, &config); err != nil {
nodeUpdateErr := r.updateNodeTaintStatus(ctx, &node, nil, fmt.Sprintf("Failed to get NodeTainterConfig %s: %v", GlobalTaintConfigName, err))
if nodeUpdateErr != nil {
log.Error(nodeUpdateErr, "Failed to update node status after config fetch error")
}
if apierrors.IsNotFound(err) {
log.Error(err, "Global NodeTainterConfig not found", "configName", GlobalTaintConfigName)
r.Recorder.Eventf(&node, corev1.EventTypeWarning, "ConfigMissing", "Required NodeTainterConfig '%s' not found", GlobalTaintConfigName)
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
// Other error with getting CRD
log.Error(err, "Failed to get NodeTainterConfig", "configName", GlobalTaintConfigName)
return ctrl.Result{}, err // Repeat
}
log = log.WithValues("node", node.Name, "config", config.Name, "configGeneration", config.Generation)
// log.Info("Processing node using rules from NodeTainterConfig", "configName", config.Name)
// 3. Parse rules from CRD (this is just a field access now)
// Use config.Spec.Rules straight
specLabelRules := config.Spec.LabelRules
if specLabelRules == nil {
specLabelRules = make(map[string]string) // If the field is empty
log.Info("Taint rules map is nil in config, assuming no rules", "configName", config.Name)
}
// Converting map[string]string into map[string]corev1.Taint for convenience
parsedRules, parseErrs := parseLabelRulesFromSpec(specLabelRules)
if len(parseErrs) > 0 {
parsingError := fmt.Errorf("invalid rules found in NodeTainterConfig %s: %v", config.Name, parseErrs)
log.Error(parsingError, "Rule parsing failed", "configName", config.Name, "parsingErrors", parseErrs)
r.Recorder.Eventf(&config, corev1.EventTypeWarning, "InvalidConfig", parsingError.Error())
_ = r.updateCRDStatus(ctx, &config, metav1.ConditionFalse, ConditionReasonConfigParsingError, parsingError.Error())
_ = r.updateNodeTaintStatus(ctx, &node, nil, parsingError.Error())
return ctrl.Result{}, nil
}
// log.V(1).Info("Parsed taint rules from CRD", "rulesCount", len(parsedRules))
// 4. Define desired Taints for THAT Node
desiredTaints := calculateDesiredTaints(node.Labels, parsedRules)
log.V(1).Info("Calculated desired taints", "taints", desiredTaints)
// 5. Get current Taints and compare/update
originalTaints := node.Spec.Taints
// Passing parsedRules for the func to know which Taints are controlled by this operator
needsUpdate, newTaints := mergeAndCheckTaints(originalTaints, desiredTaints, parsedRules) // Function
var updateErr error
if needsUpdate {
log.Info("Taints require update", "old", originalTaints, "new", newTaints)
nodeCopy := node.DeepCopy()
nodeCopy.Spec.Taints = newTaints
// Using Patch for atomicity and less conflict risk
if err := r.Patch(ctx, nodeCopy, client.MergeFrom(&node)); err != nil {
log.Error(err, "Failed to patch Node taints")
r.Recorder.Eventf(&node, corev1.EventTypeWarning, "UpdateFailed", "Failed to patch taints: %v", err)
updateErr = err
} else {
log.Info("Successfully patched Node taints")
r.Recorder.Eventf(&node, corev1.EventTypeNormal, "TaintsUpdated", "Taints updated based on rules from %s", config.Name)
updateErr = r.updateNodeTaintStatus(ctx, &node, newTaints, "")
}
} else {
log.Info("Node taints are already up-to-date")
updateErr = r.updateNodeTaintStatus(ctx, &node, originalTaints, "")
}
if updateErr != nil {
return ctrl.Result{}, updateErr
}
return ctrl.Result{}, nil
}
// Map function for NodeTainterConfig: Trigger reconcile for ALL nodes when the specific config changes
func (r *NodeTainterConfigReconciler) mapConfigToNodes(ctx context.Context, obj client.Object) []reconcile.Request {
config, ok := obj.(*configv1alpha1.NodeTainterConfig)
log := log.FromContext(ctx)
// Interested only in global config
if !ok || config.Name != GlobalTaintConfigName {
// log.V(1).Info("Ignoring unrelated config change", "object", client.ObjectKeyFromObject(obj))
return nil
}
log.Info("Global NodeTainterConfig changed, queuing reconciliation for all nodes", "configName", config.Name)
var nodeList corev1.NodeList
if err := r.List(ctx, &nodeList); err != nil {
log.Error(err, "Failed to list nodes for config change")
return nil
}
requests := make([]reconcile.Request, len(nodeList.Items))
for i, node := range nodeList.Items {
requests[i] = reconcile.Request{NamespacedName: types.NamespacedName{Name: node.Name}}
}
log.Info("Queued node reconcile requests", "count", len(requests))
return requests
}
// SetupWithManager sets up the controller with the Manager.
func (r *NodeTainterConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.Recorder = mgr.GetEventRecorderFor("nodetainter-controller")
return ctrl.NewControllerManagedBy(mgr).
// Main resource we are watching is Node
For(&corev1.Node{}).
// Watching for changes of our global CRD config.
// If it changes, queuing ALL Nodes.
Watches(
&configv1alpha1.NodeTainterConfig{},
handler.EnqueueRequestsFromMapFunc(r.mapConfigToNodes),
// For(&operatorv1alpha1.NodeTainterConfig{}).
// Named("nodetainterconfig").
).
Complete(r)
}
// --- UTILS FUNCTIONS ---
// Converts Taint to string for status/logs
func TaintToString(taint *corev1.Taint) string {
return fmt.Sprintf("%s=%s:%s", taint.Key, taint.Value, taint.Effect)
}
// Converts Taints slice to string slice
func TaintsToStrings(taints []corev1.Taint) []string {
res := make([]string, len(taints))
for i, t := range taints {
res[i] = TaintToString(&t)
}
sort.Strings(res)
return res
}
// Parses rules from CRD Spec
// returns map["labelKey=labelValue"]corev1.Taint and errors
func parseLabelRulesFromSpec(specLabelRules map[string]string) (map[string]corev1.Taint, []error) {
parsed := make(map[string]corev1.Taint)
var errs []error
for ruleSelector, taintString := range specLabelRules {
ruleSelector = strings.TrimSpace(ruleSelector)
taintString = strings.TrimSpace(taintString)
if ruleSelector == "" || taintString == "" {
errs = append(errs, fmt.Errorf("rule has empty selector ('%s') or taint string ('%s')", ruleSelector, taintString))
continue
}
partsSelector := strings.SplitN(ruleSelector, "=", 2)
if len(partsSelector) != 2 {
errs = append(errs, fmt.Errorf("invalid rule selector format '%s': missing '='", ruleSelector))
continue
}
labelKey := strings.TrimSpace(partsSelector[0])
labelValue := strings.TrimSpace(partsSelector[1])
if labelKey == "" {
errs = append(errs, fmt.Errorf("invalid rule selector format '%s': empty label key", ruleSelector))
continue
}
if msgs := apivalidation.IsQualifiedName(labelKey); len(msgs) > 0 {
errs = append(errs, fmt.Errorf("invalid label key in selector '%s': %v", ruleSelector, msgs))
continue
}
if labelValue != "" {
if msgs := apivalidation.IsValidLabelValue(labelValue); len(msgs) > 0 {
errs = append(errs, fmt.Errorf("invalid label value in selector '%s': %v", ruleSelector, msgs))
continue
}
}
partsEffect := strings.SplitN(taintString, ":", 2)
if len(partsEffect) != 2 || partsEffect[1] == "" {
errs = append(errs, fmt.Errorf("invalid taint format for rule '%s': '%s' (missing effect)", ruleSelector, taintString))
continue
}
keyAndValue := partsEffect[0]
effectString := partsEffect[1]
partsKeyValue := strings.SplitN(keyAndValue, "=", 2)
if len(partsKeyValue) != 2 || partsKeyValue[0] == "" || partsKeyValue[1] == "" {
errs = append(errs, fmt.Errorf("invalid taint format for rule '%s': '%s' (invalid key/value)", ruleSelector, taintString))
continue
}
taintKey := partsKeyValue[0]
taintValue := partsKeyValue[1]
if msgs := apivalidation.IsQualifiedName(taintKey); len(msgs) > 0 {
if simpleMsgs := apivalidation.IsDNS1123Label(taintKey); len(simpleMsgs) > 0 {
errs = append(errs, fmt.Errorf("invalid taint key for rule '%s': '%s' in '%s' (%v / %v)", ruleSelector, taintKey, taintString, msgs, simpleMsgs))
continue
}
}
var effect corev1.TaintEffect
switch effectString {
case string(corev1.TaintEffectNoSchedule):
effect = corev1.TaintEffectNoSchedule
case string(corev1.TaintEffectPreferNoSchedule):
effect = corev1.TaintEffectPreferNoSchedule
case string(corev1.TaintEffectNoExecute):
effect = corev1.TaintEffectNoExecute
default:
errs = append(errs, fmt.Errorf("invalid taint effect for rule '%s': '%s' in '%s'", ruleSelector, effectString, taintString))
continue
}
taint := corev1.Taint{Key: taintKey, Value: taintValue, Effect: effect}
parsed[ruleSelector] = taint // Key = "labelKey=labelValue"
}
return parsed, errs
}
// Defines Taints depending on node labels and rules
func calculateDesiredTaints(nodeLabels map[string]string, parsedLabelRules map[string]corev1.Taint) []corev1.Taint {
desired := []corev1.Taint{}
foundTaints := make(map[string]bool)
if nodeLabels == nil {
nodeLabels = make(map[string]string)
}
for ruleSelector, taint := range parsedLabelRules {
parts := strings.SplitN(ruleSelector, "=", 2)
if len(parts) != 2 {
continue
}
ruleKey := parts[0]
ruleValue := parts[1]
actualValue, exists := nodeLabels[ruleKey]
if exists && actualValue == ruleValue {
taintKeyEffect := fmt.Sprintf("%s:%s", taint.Key, taint.Effect)
if !foundTaints[taintKeyEffect] {
desired = append(desired, taint)
foundTaints[taintKeyEffect] = true
}
}
}
return desired
}
// Creates unique string for Taint (Key:Effect)
func TaintKeyEffect(taint *corev1.Taint) string {
return fmt.Sprintf("%s:%s", taint.Key, taint.Effect)
}
// Compares current and desired controlled Taints
func mergeAndCheckTaints(currentTaints []corev1.Taint, desiredTaints []corev1.Taint, parsedLabelRules map[string]corev1.Taint) (bool, []corev1.Taint) {
managedTaintTypes := sets.NewString()
for _, ruleTaint := range parsedLabelRules {
managedTaintTypes.Insert(TaintKeyEffect(&ruleTaint))
}
currentManagedTaints := make(map[string]corev1.Taint)
unmanagedTaints := []corev1.Taint{}
for _, taint := range currentTaints {
ke := TaintKeyEffect(&taint)
if managedTaintTypes.Has(ke) {
currentManagedTaints[ke] = taint
} else {
unmanagedTaints = append(unmanagedTaints, taint)
}
}
desiredTaintsMap := make(map[string]corev1.Taint) // key:Effect -> Taint
for _, taint := range desiredTaints {
ke := TaintKeyEffect(&taint)
if managedTaintTypes.Has(ke) {
desiredTaintsMap[ke] = taint
}
}
needsUpdate := false
if len(currentManagedTaints) != len(desiredTaintsMap) {
needsUpdate = true
} else {
for ke, desiredTaint := range desiredTaintsMap {
currentTaint, exists := currentManagedTaints[ke]
if !exists || currentTaint.Value != desiredTaint.Value {
needsUpdate = true
break
}
}
}
if needsUpdate {
newTaints := make([]corev1.Taint, 0, len(unmanagedTaints)+len(desiredTaintsMap))
newTaints = append(newTaints, unmanagedTaints...)
desiredKeys := make([]string, 0, len(desiredTaintsMap))
for ke := range desiredTaintsMap {
desiredKeys = append(desiredKeys, ke)
}
sort.Strings(desiredKeys)
for _, ke := range desiredKeys {
newTaints = append(newTaints, desiredTaintsMap[ke])
}
return true, newTaints
}
return false, currentTaints
}
// Updates NodeTainterConfig status
// TODO: Call this function on CRD updates or controller start/errors
func (r *NodeTainterConfigReconciler) updateCRDStatus(ctx context.Context, config *configv1alpha1.NodeTainterConfig, status metav1.ConditionStatus, reason, message string) error {
log := log.FromContext(ctx).WithValues("config", config.Name)
configCopy := config.DeepCopy()
configCopy.Status.ObservedGeneration = config.Generation
newCondition := metav1.Condition{
Type: ConditionTypeReady,
Status: status,
Reason: reason,
Message: message,
LastTransitionTime: metav1.Now(),
}
// TODO: Use 'meta.SetStatusCondition' from 'k8s.io/apimachinery/pkg/api/meta' for correct conditions updates
configCopy.Status.Conditions = []metav1.Condition{newCondition}
// TODO: Update NodeTaintStatus based on data from all nodes
// configCopy.Status.NodeTaintStatus = ...
if err := r.Status().Patch(ctx, configCopy, client.MergeFrom(config)); err != nil {
log.Error(err, "Failed to patch NodeTainterConfig status")
return err
}
log.Info("NodeTainterConfig status updated", "reason", reason, "message", message)
return nil
}
// Updates info about Taints for correct Node in CRD status
func (r *NodeTainterConfigReconciler) updateNodeTaintStatus(ctx context.Context, node *corev1.Node, appliedTaints []corev1.Taint, errorMsg string) error {
log := log.FromContext(ctx).WithValues("node", node.Name)
var config configv1alpha1.NodeTainterConfig
configKey := types.NamespacedName{Name: GlobalTaintConfigName}
if err := r.Get(ctx, configKey, &config); err != nil {
log.Error(err, "Failed to get NodeTainterConfig for status update", "configName", GlobalTaintConfigName)
return fmt.Errorf("failed to get config %s for status update: %w", GlobalTaintConfigName, err)
}
configCopy := config.DeepCopy()
found := false
nodeStatus := configv1alpha1.NodeTaintInfo{
NodeName: node.Name,
AppliedTaints: TaintsToStrings(appliedTaints),
Error: errorMsg,
}
for i := range configCopy.Status.NodeTaintStatus {
if configCopy.Status.NodeTaintStatus[i].NodeName == node.Name {
configCopy.Status.NodeTaintStatus[i] = nodeStatus
found = true
break
}
}
if !found {
configCopy.Status.NodeTaintStatus = append(configCopy.Status.NodeTaintStatus, nodeStatus)
}
sort.Slice(configCopy.Status.NodeTaintStatus, func(i, j int) bool {
return configCopy.Status.NodeTaintStatus[i].NodeName < configCopy.Status.NodeTaintStatus[j].NodeName
})
if err := r.Status().Patch(ctx, configCopy, client.MergeFrom(&config)); err != nil {
log.Error(err, "Failed to patch NodeTainterConfig status with node info", "node", node.Name)
return err
}
log.V(1).Info("Updated node status in CRD", "applied", nodeStatus.AppliedTaints, "error", nodeStatus.Error)
return nil
}