222 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			222 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // internal/controller/imageupdate_controller.go
 | |
| package controller
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	appsv1 "k8s.io/api/apps/v1"
 | |
| 	corev1 "k8s.io/api/core/v1"
 | |
| 	"k8s.io/apimachinery/pkg/api/errors"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/types"
 | |
| 	"k8s.io/client-go/tools/record"
 | |
| 	ctrl "sigs.k8s.io/controller-runtime"
 | |
| 	"sigs.k8s.io/controller-runtime/pkg/client"
 | |
| 	"sigs.k8s.io/controller-runtime/pkg/log"
 | |
| 
 | |
| 	configv1alpha1 "git.vendetti.ru/andy/operator/api/v1alpha1"
 | |
| 
 | |
| 	"github.com/google/go-containerregistry/pkg/crane"
 | |
| 	"github.com/google/go-containerregistry/pkg/name"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	DefaultRestartAnnotation = "andy.vendetti.ru/restartedAt"
 | |
| )
 | |
| 
 | |
| // ImageUpdateReconciler reconciles Deployment objects to check for image updates.
 | |
| type ImageUpdateReconciler struct {
 | |
| 	client.Client
 | |
| 	Scheme   *runtime.Scheme
 | |
| 	Recorder record.EventRecorder
 | |
| }
 | |
| 
 | |
| // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;update;patch
 | |
| // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
 | |
| // +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch
 | |
| // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
 | |
| 
 | |
| func (r *ImageUpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
 | |
| 	log := log.FromContext(ctx).WithValues("deployment", req.NamespacedName)
 | |
| 
 | |
| 	var config configv1alpha1.NodeTainterConfig
 | |
| 	configKey := types.NamespacedName{Name: GlobalTaintConfigName}
 | |
| 	if err := r.Get(ctx, configKey, &config); err != nil {
 | |
| 		if !errors.IsNotFound(err) {
 | |
| 			log.Error(err, "Failed to get NodeTainterConfig for image updates", "configName", GlobalTaintConfigName)
 | |
| 			return ctrl.Result{}, err
 | |
| 		}
 | |
| 		log.V(1).Info("Global NodeTainterConfig not found, image update checks skipped.", "configName", GlobalTaintConfigName)
 | |
| 		return ctrl.Result{}, nil
 | |
| 	}
 | |
| 
 | |
| 	if config.Spec.ImageUpdatePolicy == nil || !config.Spec.ImageUpdatePolicy.Enabled {
 | |
| 		log.V(1).Info("Image update policy is disabled in NodeTainterConfig.")
 | |
| 		return ctrl.Result{}, nil
 | |
| 	}
 | |
| 
 | |
| 	policy := config.Spec.ImageUpdatePolicy
 | |
| 	if len(policy.MonitoredTags) == 0 {
 | |
| 		log.V(1).Info("No monitored tags configured in ImageUpdatePolicy.")
 | |
| 		return ctrl.Result{}, nil
 | |
| 	}
 | |
| 
 | |
| 	var deployment appsv1.Deployment
 | |
| 	if err := r.Get(ctx, req.NamespacedName, &deployment); err != nil {
 | |
| 		if errors.IsNotFound(err) {
 | |
| 			log.Info("Deployment not found. Ignoring.")
 | |
| 			return ctrl.Result{}, nil
 | |
| 		}
 | |
| 		log.Error(err, "Failed to get Deployment")
 | |
| 		return ctrl.Result{}, err
 | |
| 	}
 | |
| 
 | |
| 	log.V(1).Info("Checking deployment for image updates")
 | |
| 	needsRestart := false
 | |
| 	restartAnnotation := policy.RestartAnnotation
 | |
| 	if restartAnnotation == "" {
 | |
| 		restartAnnotation = DefaultRestartAnnotation
 | |
| 	}
 | |
| 
 | |
| 	podList, err := r.findPodsForDeployment(ctx, &deployment)
 | |
| 	if err != nil {
 | |
| 		log.Error(err, "Failed to list pods for deployment")
 | |
| 		return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
 | |
| 	}
 | |
| 
 | |
| 	for _, container := range deployment.Spec.Template.Spec.Containers {
 | |
| 		containerLog := log.WithValues("container", container.Name, "image", container.Image)
 | |
| 
 | |
| 		imageName, imageTag, isMonitored := r.parseImageAndCheckTag(container.Image, policy.MonitoredTags)
 | |
| 		if !isMonitored {
 | |
| 			containerLog.V(1).Info("Image tag is not monitored, skipping.")
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		currentDigest, err := r.findCurrentImageDigest(container.Name, podList)
 | |
| 		if err != nil {
 | |
| 			containerLog.Error(err, "Could not determine current image digest from running pods")
 | |
| 			continue
 | |
| 		}
 | |
| 		if currentDigest == "" {
 | |
| 			containerLog.V(1).Info("No running pods found with imageID for this container, skipping check.")
 | |
| 			continue
 | |
| 		}
 | |
| 		containerLog = containerLog.WithValues("currentDigest", currentDigest)
 | |
| 
 | |
| 		latestDigest, err := crane.Digest(imageName + ":" + imageTag)
 | |
| 		if err != nil {
 | |
| 			containerLog.Error(err, "Failed to get latest digest from registry", "image", imageName+":"+imageTag)
 | |
| 			r.Recorder.Eventf(&deployment, corev1.EventTypeWarning, "RegistryError", "Failed to fetch digest for %s:%s: %v", imageName, imageTag, err)
 | |
| 			continue
 | |
| 		}
 | |
| 		containerLog = containerLog.WithValues("latestDigest", latestDigest)
 | |
| 
 | |
| 		if currentDigest != latestDigest {
 | |
| 			containerLog.Info("Image update detected!", "image", imageName+":"+imageTag)
 | |
| 			r.Recorder.Eventf(&deployment, corev1.EventTypeNormal, "UpdateAvailable", "New digest %s detected for image %s:%s (current: %s)", latestDigest, imageName, imageTag, currentDigest)
 | |
| 			needsRestart = true
 | |
| 			break
 | |
| 		} else {
 | |
| 			containerLog.V(1).Info("Image is up-to-date.")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if needsRestart {
 | |
| 		deploymentCopy := deployment.DeepCopy()
 | |
| 		if deploymentCopy.Spec.Template.Annotations == nil {
 | |
| 			deploymentCopy.Spec.Template.Annotations = make(map[string]string)
 | |
| 		}
 | |
| 		restartValue := time.Now().Format(time.RFC3339)
 | |
| 		deploymentCopy.Spec.Template.Annotations[restartAnnotation] = restartValue
 | |
| 
 | |
| 		log.Info("Triggering deployment restart due to image update", "annotationKey", restartAnnotation, "annotationValue", restartValue)
 | |
| 		if err := r.Patch(ctx, deploymentCopy, client.MergeFrom(&deployment)); err != nil {
 | |
| 			log.Error(err, "Failed to patch Deployment to trigger restart")
 | |
| 			r.Recorder.Eventf(&deployment, corev1.EventTypeWarning, "UpdateFailed", "Failed to trigger restart: %v", err)
 | |
| 			return ctrl.Result{}, err
 | |
| 		}
 | |
| 		log.Info("Deployment patched successfully to trigger restart.")
 | |
| 		r.Recorder.Eventf(&deployment, corev1.EventTypeNormal, "RestartTriggered", "Triggered restart due to updated image")
 | |
| 	}
 | |
| 
 | |
| 	checkInterval, err := time.ParseDuration(policy.CheckInterval)
 | |
| 	if err != nil {
 | |
| 		log.Error(err, "Failed to parse CheckInterval from config, using default 1h", "configuredInterval", policy.CheckInterval)
 | |
| 		checkInterval = time.Hour // Fallback
 | |
| 	}
 | |
| 
 | |
| 	log.V(1).Info("Requeuing deployment for next check", "after", checkInterval.String())
 | |
| 	return ctrl.Result{RequeueAfter: checkInterval}, nil
 | |
| }
 | |
| 
 | |
| func (r *ImageUpdateReconciler) parseImageAndCheckTag(image string, monitoredTags []string) (imgName, imgTag string, monitored bool) {
 | |
| 	ref, err := name.ParseReference(image, name.WeakValidation)
 | |
| 	if err != nil {
 | |
| 		return "", "", false
 | |
| 	}
 | |
| 
 | |
| 	imgName = ref.Context().Name()
 | |
| 	imgTag = ref.Identifier()
 | |
| 
 | |
| 	if strings.HasPrefix(imgTag, "sha256:") {
 | |
| 		return imgName, imgTag, false
 | |
| 	}
 | |
| 
 | |
| 	for _, monitoredKeyword := range monitoredTags {
 | |
| 		if strings.Contains(imgTag, monitoredKeyword) {
 | |
| 			return imgName, imgTag, true
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return imgName, imgTag, false
 | |
| }
 | |
| 
 | |
| func (r *ImageUpdateReconciler) findPodsForDeployment(ctx context.Context, deployment *appsv1.Deployment) (*corev1.PodList, error) {
 | |
| 	selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to convert deployment selector: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	podList := &corev1.PodList{}
 | |
| 	err = r.List(ctx, podList, client.InNamespace(deployment.Namespace), client.MatchingLabelsSelector{Selector: selector})
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to list pods: %w", err)
 | |
| 	}
 | |
| 	return podList, nil
 | |
| }
 | |
| 
 | |
| func (r *ImageUpdateReconciler) findCurrentImageDigest(containerName string, podList *corev1.PodList) (string, error) {
 | |
| 	for _, pod := range podList.Items {
 | |
| 		if pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
 | |
| 			continue
 | |
| 		}
 | |
| 		for _, cs := range pod.Status.ContainerStatuses {
 | |
| 			if cs.Name == containerName && cs.ImageID != "" {
 | |
| 				parts := strings.SplitN(cs.ImageID, "@", 2)
 | |
| 				if len(parts) == 2 && strings.HasPrefix(parts[1], "sha256:") {
 | |
| 					return parts[1], nil
 | |
| 				}
 | |
| 				if strings.HasPrefix(cs.ImageID, "sha256:") {
 | |
| 					return cs.ImageID, nil
 | |
| 				}
 | |
| 				return "", fmt.Errorf("unrecognized imageID format in pod %s: %s", pod.Name, cs.ImageID)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return "", nil
 | |
| }
 | |
| 
 | |
| func (r *ImageUpdateReconciler) SetupWithManager(mgr ctrl.Manager) error {
 | |
| 	r.Recorder = mgr.GetEventRecorderFor("imageupdate-controller")
 | |
| 
 | |
| 	return ctrl.NewControllerManagedBy(mgr).
 | |
| 		Named("imageupdate").
 | |
| 		For(&appsv1.Deployment{}).
 | |
| 		Complete(r)
 | |
| }
 |