This repository has been archived on 2025-08-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
operator/internal/controller/imageupdate_controller.go
Andy Kolibri Vendetti 6e32cf842a
All checks were successful
Lint / Run on Ubuntu (push) Successful in 22s
Tests / Run on Ubuntu (push) Successful in 49s
Lint / Run on Ubuntu (pull_request) Successful in 20s
Tests / Run on Ubuntu (pull_request) Successful in 26s
fix
2025-05-04 20:06:54 +05:00

222 lines
7.8 KiB
Go

// internal/controller/imageupdate_controller.go
package controller
import (
"context"
"fmt"
"strings"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
configv1alpha1 "git.vendetti.ru/andy/operator/api/v1alpha1"
"github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/name"
)
const (
DefaultRestartAnnotation = "andy.vendetti.ru/restartedAt"
)
// ImageUpdateReconciler reconciles Deployment objects to check for image updates.
type ImageUpdateReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups=operator.andy.vendetti.ru,resources=nodetainterconfigs,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
func (r *ImageUpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx).WithValues("deployment", req.NamespacedName)
var config configv1alpha1.NodeTainterConfig
configKey := types.NamespacedName{Name: GlobalTaintConfigName}
if err := r.Get(ctx, configKey, &config); err != nil {
if !errors.IsNotFound(err) {
log.Error(err, "Failed to get NodeTainterConfig for image updates", "configName", GlobalTaintConfigName)
return ctrl.Result{}, err
}
log.V(1).Info("Global NodeTainterConfig not found, image update checks skipped.", "configName", GlobalTaintConfigName)
return ctrl.Result{}, nil
}
if config.Spec.ImageUpdatePolicy == nil || !config.Spec.ImageUpdatePolicy.Enabled {
log.V(1).Info("Image update policy is disabled in NodeTainterConfig.")
return ctrl.Result{}, nil
}
policy := config.Spec.ImageUpdatePolicy
if len(policy.MonitoredTags) == 0 {
log.V(1).Info("No monitored tags configured in ImageUpdatePolicy.")
return ctrl.Result{}, nil
}
var deployment appsv1.Deployment
if err := r.Get(ctx, req.NamespacedName, &deployment); err != nil {
if errors.IsNotFound(err) {
log.Info("Deployment not found. Ignoring.")
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get Deployment")
return ctrl.Result{}, err
}
log.V(1).Info("Checking deployment for image updates")
needsRestart := false
restartAnnotation := policy.RestartAnnotation
if restartAnnotation == "" {
restartAnnotation = DefaultRestartAnnotation
}
podList, err := r.findPodsForDeployment(ctx, &deployment)
if err != nil {
log.Error(err, "Failed to list pods for deployment")
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
for _, container := range deployment.Spec.Template.Spec.Containers {
containerLog := log.WithValues("container", container.Name, "image", container.Image)
imageName, imageTag, isMonitored := r.parseImageAndCheckTag(container.Image, policy.MonitoredTags)
if !isMonitored {
containerLog.V(1).Info("Image tag is not monitored, skipping.")
continue
}
currentDigest, err := r.findCurrentImageDigest(container.Name, podList)
if err != nil {
containerLog.Error(err, "Could not determine current image digest from running pods")
continue
}
if currentDigest == "" {
containerLog.V(1).Info("No running pods found with imageID for this container, skipping check.")
continue
}
containerLog = containerLog.WithValues("currentDigest", currentDigest)
latestDigest, err := crane.Digest(imageName + ":" + imageTag)
if err != nil {
containerLog.Error(err, "Failed to get latest digest from registry", "image", imageName+":"+imageTag)
r.Recorder.Eventf(&deployment, corev1.EventTypeWarning, "RegistryError", "Failed to fetch digest for %s:%s: %v", imageName, imageTag, err)
continue
}
containerLog = containerLog.WithValues("latestDigest", latestDigest)
if currentDigest != latestDigest {
containerLog.Info("Image update detected!", "image", imageName+":"+imageTag)
r.Recorder.Eventf(&deployment, corev1.EventTypeNormal, "UpdateAvailable", "New digest %s detected for image %s:%s (current: %s)", latestDigest, imageName, imageTag, currentDigest)
needsRestart = true
break
} else {
containerLog.V(1).Info("Image is up-to-date.")
}
}
if needsRestart {
deploymentCopy := deployment.DeepCopy()
if deploymentCopy.Spec.Template.Annotations == nil {
deploymentCopy.Spec.Template.Annotations = make(map[string]string)
}
restartValue := time.Now().Format(time.RFC3339)
deploymentCopy.Spec.Template.Annotations[restartAnnotation] = restartValue
log.Info("Triggering deployment restart due to image update", "annotationKey", restartAnnotation, "annotationValue", restartValue)
if err := r.Patch(ctx, deploymentCopy, client.MergeFrom(&deployment)); err != nil {
log.Error(err, "Failed to patch Deployment to trigger restart")
r.Recorder.Eventf(&deployment, corev1.EventTypeWarning, "UpdateFailed", "Failed to trigger restart: %v", err)
return ctrl.Result{}, err
}
log.Info("Deployment patched successfully to trigger restart.")
r.Recorder.Eventf(&deployment, corev1.EventTypeNormal, "RestartTriggered", "Triggered restart due to updated image")
}
checkInterval, err := time.ParseDuration(policy.CheckInterval)
if err != nil {
log.Error(err, "Failed to parse CheckInterval from config, using default 1h", "configuredInterval", policy.CheckInterval)
checkInterval = time.Hour // Fallback
}
log.V(1).Info("Requeuing deployment for next check", "after", checkInterval.String())
return ctrl.Result{RequeueAfter: checkInterval}, nil
}
func (r *ImageUpdateReconciler) parseImageAndCheckTag(image string, monitoredTags []string) (imgName, imgTag string, monitored bool) {
ref, err := name.ParseReference(image, name.WeakValidation)
if err != nil {
return "", "", false
}
imgName = ref.Context().Name()
imgTag = ref.Identifier()
if strings.HasPrefix(imgTag, "sha256:") {
return imgName, imgTag, false
}
for _, monitoredKeyword := range monitoredTags {
if strings.Contains(imgTag, monitoredKeyword) {
return imgName, imgTag, true
}
}
return imgName, imgTag, false
}
func (r *ImageUpdateReconciler) findPodsForDeployment(ctx context.Context, deployment *appsv1.Deployment) (*corev1.PodList, error) {
selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("failed to convert deployment selector: %w", err)
}
podList := &corev1.PodList{}
err = r.List(ctx, podList, client.InNamespace(deployment.Namespace), client.MatchingLabelsSelector{Selector: selector})
if err != nil {
return nil, fmt.Errorf("failed to list pods: %w", err)
}
return podList, nil
}
func (r *ImageUpdateReconciler) findCurrentImageDigest(containerName string, podList *corev1.PodList) (string, error) {
for _, pod := range podList.Items {
if pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
continue
}
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name == containerName && cs.ImageID != "" {
parts := strings.SplitN(cs.ImageID, "@", 2)
if len(parts) == 2 && strings.HasPrefix(parts[1], "sha256:") {
return parts[1], nil
}
if strings.HasPrefix(cs.ImageID, "sha256:") {
return cs.ImageID, nil
}
return "", fmt.Errorf("unrecognized imageID format in pod %s: %s", pod.Name, cs.ImageID)
}
}
}
return "", nil
}
func (r *ImageUpdateReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.Recorder = mgr.GetEventRecorderFor("imageupdate-controller")
return ctrl.NewControllerManagedBy(mgr).
Named("imageupdate").
For(&appsv1.Deployment{}).
Complete(r)
}