From 227361901d6602d2769fa8613455f3dc86fc135d Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Wed, 29 Apr 2026 11:27:10 +0200 Subject: [PATCH 1/2] Refactor /traits API to single-ConfigMap model with Syncer interface Resolves #395 --- .../templates/configmap-traits.yaml | 11 - .../bundles/cortex-placement-shim/values.yaml | 3 - internal/shim/placement/handle_traits.go | 478 +++++++----------- internal/shim/placement/handle_traits_e2e.go | 21 +- internal/shim/placement/handle_traits_test.go | 42 +- internal/shim/placement/shim.go | 38 +- internal/shim/placement/syncer.go | 19 + internal/shim/placement/syncer_traits.go | 155 ++++++ internal/shim/placement/syncer_traits_test.go | 144 ++++++ 9 files changed, 536 insertions(+), 375 deletions(-) delete mode 100644 helm/bundles/cortex-placement-shim/templates/configmap-traits.yaml create mode 100644 internal/shim/placement/syncer.go create mode 100644 internal/shim/placement/syncer_traits.go create mode 100644 internal/shim/placement/syncer_traits_test.go diff --git a/helm/bundles/cortex-placement-shim/templates/configmap-traits.yaml b/helm/bundles/cortex-placement-shim/templates/configmap-traits.yaml deleted file mode 100644 index b6969aaa7..000000000 --- a/helm/bundles/cortex-placement-shim/templates/configmap-traits.yaml +++ /dev/null @@ -1,11 +0,0 @@ -{{- if ne ((index .Values "cortex-shim").conf.features.traits | default "passthrough") "passthrough" }} -{{- $cmName := (index .Values "cortex-shim").conf.traits.configMapName }} -apiVersion: v1 -kind: ConfigMap -metadata: - name: {{ $cmName }} - labels: - {{- include "chart.labels" . | nindent 4 }} -data: - traits: {{ (index .Values "cortex-shim").conf.traits.static | toJson | quote }} -{{- end }} diff --git a/helm/bundles/cortex-placement-shim/values.yaml b/helm/bundles/cortex-placement-shim/values.yaml index 7e1818e9e..54805cee3 100644 --- a/helm/bundles/cortex-placement-shim/values.yaml +++ b/helm/bundles/cortex-placement-shim/values.yaml @@ -62,9 +62,6 @@ cortex-shim: status: "CURRENT" traits: configMapName: "cortex-placement-shim-traits" - # Static traits included in every Helm install/upgrade. The shim - # merges them with dynamic CUSTOM_* traits at request time. - static: [] auth: tokenCacheTTL: "5m" policies: diff --git a/internal/shim/placement/handle_traits.go b/internal/shim/placement/handle_traits.go index 429b87cfc..3e10ab6fc 100644 --- a/internal/shim/placement/handle_traits.go +++ b/internal/shim/placement/handle_traits.go @@ -7,44 +7,21 @@ import ( "context" "encoding/json" "fmt" - "math/rand" "net/http" - "net/url" "os" "sort" "strings" "time" - "github.com/go-logr/logr" - "github.com/gophercloud/gophercloud/v2" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) const configMapKeyTraits = "traits" -func (s *Shim) staticTraitsConfigMapKey() client.ObjectKey { - return client.ObjectKey{ - Namespace: os.Getenv("POD_NAMESPACE"), - Name: s.config.Traits.ConfigMapName, - } -} - -func (s *Shim) customTraitsConfigMapKey() client.ObjectKey { - return client.ObjectKey{ - Namespace: os.Getenv("POD_NAMESPACE"), - Name: s.config.Traits.ConfigMapName + "-custom", - } -} - -func (s *Shim) traitsLockName() string { - return s.config.Traits.ConfigMapName + "-custom-lock" -} - // traitsListResponse matches the OpenStack Placement GET /traits response. type traitsListResponse struct { Traits []string `json:"traits"` @@ -52,10 +29,10 @@ type traitsListResponse struct { // HandleListTraits handles GET /traits requests. // -// Returns a sorted list of trait strings merged from the static (Helm-managed) -// and dynamic (CUSTOM_*) ConfigMaps. Supports optional query parameter "name" -// for filtering: "in:TRAIT_A,TRAIT_B" returns only named traits, -// "startswith:CUSTOM_" returns prefix matches. +// Feature modes: +// - passthrough: forwards to upstream placement. +// - hybrid: forwards to upstream placement. +// - crd: serves the trait list from the local ConfigMap. // // See: https://docs.openstack.org/api-ref/placement/#list-traits func (s *Shim) HandleListTraits(w http.ResponseWriter, r *http.Request) { @@ -67,15 +44,15 @@ func (s *Shim) HandleListTraits(w http.ResponseWriter, r *http.Request) { s.forward(w, r) return case FeatureModeCRD: - // Serve from local ConfigMaps. + // Serve from local ConfigMap. default: http.Error(w, "unknown feature mode", http.StatusInternalServerError) return } - traitSet, err := s.getAllTraits(ctx) + traitSet, err := s.getTraits(ctx) if err != nil { - log.Error(err, "failed to list traits from configmaps") + log.Error(err, "failed to list traits from configmap") http.Error(w, "failed to list traits", http.StatusInternalServerError) return } @@ -124,8 +101,10 @@ func (s *Shim) HandleListTraits(w http.ResponseWriter, r *http.Request) { // HandleShowTrait handles GET /traits/{name} requests. // -// Checks whether a trait with the given name exists in either the static -// or dynamic ConfigMap. Returns 204 No Content if found, 404 Not Found otherwise. +// Feature modes: +// - passthrough: forwards to upstream placement. +// - hybrid: forwards to upstream placement. +// - crd: checks the local ConfigMap for the trait. // // See: https://docs.openstack.org/api-ref/placement/#show-traits func (s *Shim) HandleShowTrait(w http.ResponseWriter, r *http.Request) { @@ -137,7 +116,7 @@ func (s *Shim) HandleShowTrait(w http.ResponseWriter, r *http.Request) { s.forward(w, r) return case FeatureModeCRD: - // Serve from local ConfigMaps. + // Serve from local ConfigMap. default: http.Error(w, "unknown feature mode", http.StatusInternalServerError) return @@ -164,22 +143,26 @@ func (s *Shim) HandleShowTrait(w http.ResponseWriter, r *http.Request) { // HandleUpdateTrait handles PUT /traits/{name} requests. // -// Creates a new custom trait in the dynamic ConfigMap. Only traits prefixed -// with CUSTOM_ may be created. Returns 201 Created if the trait is newly -// inserted, or 204 No Content if it already exists (in either ConfigMap). -// Returns 400 Bad Request if the name does not carry the CUSTOM_ prefix. +// Feature modes: +// - passthrough: forwards to upstream placement. +// - hybrid: forwards to upstream; on success, adds the trait to the local ConfigMap. +// - crd: writes the trait to the local ConfigMap (CUSTOM_ prefix required). // // See: https://docs.openstack.org/api-ref/placement/#update-trait func (s *Shim) HandleUpdateTrait(w http.ResponseWriter, r *http.Request) { ctx := r.Context() log := logf.FromContext(ctx) - switch s.featureModeFromConfOrHeader(r, s.config.Features.Traits) { - case FeatureModePassthrough, FeatureModeHybrid: + mode := s.featureModeFromConfOrHeader(r, s.config.Features.Traits) + switch mode { + case FeatureModePassthrough: s.forward(w, r) return + case FeatureModeHybrid: + s.handleUpdateTraitHybrid(w, r) + return case FeatureModeCRD: - // Serve from local ConfigMaps. + // Handle locally. default: http.Error(w, "unknown feature mode", http.StatusInternalServerError) return @@ -195,115 +178,69 @@ func (s *Shim) HandleUpdateTrait(w http.ResponseWriter, r *http.Request) { return } - // Fast path: trait already exists in either ConfigMap (no lock needed). - allTraits, err := s.getAllTraits(ctx) + created, err := s.addTraitToConfigMap(ctx, name) if err != nil { - log.Error(err, "failed to read traits for existence check", "trait", name) + log.Error(err, "failed to create trait", "trait", name) http.Error(w, "failed to create trait", http.StatusInternalServerError) return } - if _, exists := allTraits[name]; exists { - log.Info("trait already exists, nothing to do", "trait", name) + if created { + w.WriteHeader(http.StatusCreated) + } else { w.WriteHeader(http.StatusNoContent) - return } +} - // Slow path: acquire lock, read/create dynamic ConfigMap, add trait. - host, err := os.Hostname() - if err != nil { - host = "unknown" - } - lockerID := fmt.Sprintf("shim-%s-%d", host, time.Now().UnixNano()) - if err := s.resourceLocker.AcquireLock(ctx, s.traitsLockName(), lockerID); err != nil { - log.Error(err, "failed to acquire traits lock", "trait", name) - http.Error(w, "failed to create trait", http.StatusInternalServerError) +// handleUpdateTraitHybrid forwards PUT /traits/{name} to upstream, then +// updates the local ConfigMap on success. +func (s *Shim) handleUpdateTraitHybrid(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + name, ok := requiredPathParam(w, r, "name") + if !ok { return } - defer func() { - releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := s.resourceLocker.ReleaseLock(releaseCtx, s.traitsLockName(), lockerID); err != nil { - log.Error(err, "failed to release traits lock") - } - }() - cm := &corev1.ConfigMap{} - err = s.Get(ctx, s.customTraitsConfigMapKey(), cm) - if apierrors.IsNotFound(err) { - // Dynamic ConfigMap does not exist yet — create it with the new trait. - cm = &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: s.customTraitsConfigMapKey().Name, - Namespace: s.customTraitsConfigMapKey().Namespace, - }, - Data: map[string]string{configMapKeyTraits: "[]"}, - } - current := map[string]struct{}{name: {}} - if err := s.writeTraits(cm, current); err != nil { - log.Error(err, "failed to serialize traits", "trait", name) - http.Error(w, "failed to create trait", http.StatusInternalServerError) - return - } - if err := s.Create(ctx, cm); err != nil { - log.Error(err, "failed to create custom traits configmap", "trait", name) - http.Error(w, "failed to create trait", http.StatusInternalServerError) - return + s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { + // Copy the upstream response to the caller. + for k, vs := range resp.Header { + for _, v := range vs { + w.Header().Add(k, v) + } } - log.Info("created custom traits configmap with new trait", "trait", name) - s.syncTraitToUpstream(ctx, name, r.Header) - w.WriteHeader(http.StatusCreated) - return - } - if err != nil { - log.Error(err, "failed to get custom traits configmap", "trait", name) - http.Error(w, "failed to create trait", http.StatusInternalServerError) - return - } + w.WriteHeader(resp.StatusCode) - current, err := parseTraits(cm) - if err != nil { - log.Error(err, "failed to parse custom traits configmap", "trait", name) - http.Error(w, "failed to create trait", http.StatusInternalServerError) - return - } - if _, exists := current[name]; exists { - log.Info("trait already exists in custom configmap after lock acquisition", "trait", name) - w.WriteHeader(http.StatusNoContent) - return - } - current[name] = struct{}{} - if err := s.writeTraits(cm, current); err != nil { - log.Error(err, "failed to serialize traits", "trait", name) - http.Error(w, "failed to create trait", http.StatusInternalServerError) - return - } - if err := s.Update(ctx, cm); err != nil { - log.Error(err, "failed to update custom traits configmap", "trait", name) - http.Error(w, "failed to create trait", http.StatusInternalServerError) - return - } - log.Info("added custom trait to configmap", "trait", name) - s.syncTraitToUpstream(ctx, name, r.Header) - w.WriteHeader(http.StatusCreated) + if resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusNoContent { + if _, err := s.addTraitToConfigMap(ctx, name); err != nil { + log.Error(err, "hybrid: failed to add trait to local configmap", "trait", name) + } + } + }) } // HandleDeleteTrait handles DELETE /traits/{name} requests. // -// Deletes a custom trait from the dynamic ConfigMap. Standard traits (those -// without the CUSTOM_ prefix) cannot be deleted and return 400 Bad Request. -// Returns 404 if the trait does not exist. Returns 204 No Content on success. +// Feature modes: +// - passthrough: forwards to upstream placement. +// - hybrid: forwards to upstream; on success, removes the trait from the local ConfigMap. +// - crd: removes the trait from the local ConfigMap (CUSTOM_ prefix required). // // See: https://docs.openstack.org/api-ref/placement/#delete-traits func (s *Shim) HandleDeleteTrait(w http.ResponseWriter, r *http.Request) { ctx := r.Context() log := logf.FromContext(ctx) - switch s.featureModeFromConfOrHeader(r, s.config.Features.Traits) { - case FeatureModePassthrough, FeatureModeHybrid: + mode := s.featureModeFromConfOrHeader(r, s.config.Features.Traits) + switch mode { + case FeatureModePassthrough: s.forward(w, r) return + case FeatureModeHybrid: + s.handleDeleteTraitHybrid(w, r) + return case FeatureModeCRD: - // Serve from local ConfigMaps. + // Handle locally. default: http.Error(w, "unknown feature mode", http.StatusInternalServerError) return @@ -319,101 +256,57 @@ func (s *Shim) HandleDeleteTrait(w http.ResponseWriter, r *http.Request) { return } - host, err := os.Hostname() + removed, err := s.removeTraitFromConfigMap(ctx, name) if err != nil { - host = "unknown" - } - lockerID := fmt.Sprintf("shim-%s-%d", host, time.Now().UnixNano()) - if err := s.resourceLocker.AcquireLock(ctx, s.traitsLockName(), lockerID); err != nil { - log.Error(err, "failed to acquire traits lock", "trait", name) + log.Error(err, "failed to delete trait", "trait", name) http.Error(w, "failed to delete trait", http.StatusInternalServerError) return } - defer func() { - releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := s.resourceLocker.ReleaseLock(releaseCtx, s.traitsLockName(), lockerID); err != nil { - log.Error(err, "failed to release traits lock") - } - }() - - cm := &corev1.ConfigMap{} - err = s.Get(ctx, s.customTraitsConfigMapKey(), cm) - if apierrors.IsNotFound(err) { - log.Info("custom traits configmap not found, trait does not exist", "trait", name) - http.Error(w, "trait not found", http.StatusNotFound) - return - } - if err != nil { - log.Error(err, "failed to get custom traits configmap", "trait", name) - http.Error(w, "failed to delete trait", http.StatusInternalServerError) - return - } - current, err := parseTraits(cm) - if err != nil { - log.Error(err, "failed to parse custom traits configmap", "trait", name) - http.Error(w, "failed to delete trait", http.StatusInternalServerError) - return - } - if _, exists := current[name]; !exists { - log.Info("trait not found in custom configmap", "trait", name) + if !removed { + log.Info("trait not found in configmap", "trait", name) http.Error(w, "trait not found", http.StatusNotFound) return } - delete(current, name) - if err := s.writeTraits(cm, current); err != nil { - log.Error(err, "failed to serialize traits", "trait", name) - http.Error(w, "failed to delete trait", http.StatusInternalServerError) - return - } - if err := s.Update(ctx, cm); err != nil { - log.Error(err, "failed to update custom traits configmap", "trait", name) - http.Error(w, "failed to delete trait", http.StatusInternalServerError) - return - } - log.Info("deleted custom trait from configmap", "trait", name) + log.Info("deleted trait from configmap", "trait", name) w.WriteHeader(http.StatusNoContent) } -// getStaticTraits reads traits from the Helm-managed static ConfigMap. -func (s *Shim) getStaticTraits(ctx context.Context) (map[string]struct{}, error) { - cm := &corev1.ConfigMap{} - if err := s.Get(ctx, s.staticTraitsConfigMapKey(), cm); err != nil { - return nil, fmt.Errorf("get static configmap %s: %w", s.config.Traits.ConfigMapName, err) +// handleDeleteTraitHybrid forwards DELETE /traits/{name} to upstream, then +// updates the local ConfigMap on success. +func (s *Shim) handleDeleteTraitHybrid(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + name, ok := requiredPathParam(w, r, "name") + if !ok { + return } - return parseTraits(cm) + + s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { + for k, vs := range resp.Header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + + if resp.StatusCode == http.StatusNoContent { + if _, err := s.removeTraitFromConfigMap(ctx, name); err != nil { + log.Error(err, "hybrid: failed to remove trait from local configmap", "trait", name) + } + } + }) } -// getCustomTraits reads traits from the dynamic ConfigMap created by the shim. -// Returns an empty set if the ConfigMap does not exist yet. -func (s *Shim) getCustomTraits(ctx context.Context) (map[string]struct{}, error) { +// getTraits reads traits from the single ConfigMap. +func (s *Shim) getTraits(ctx context.Context) (map[string]struct{}, error) { cm := &corev1.ConfigMap{} - err := s.Get(ctx, s.customTraitsConfigMapKey(), cm) - if apierrors.IsNotFound(err) { - return make(map[string]struct{}), nil - } - if err != nil { - return nil, fmt.Errorf("get custom configmap %s-custom: %w", s.config.Traits.ConfigMapName, err) + if err := s.Get(ctx, client.ObjectKey{Namespace: os.Getenv("POD_NAMESPACE"), Name: s.config.Traits.ConfigMapName}, cm); err != nil { + return nil, fmt.Errorf("get traits configmap %s: %w", s.config.Traits.ConfigMapName, err) } return parseTraits(cm) } -// getAllTraits merges static and custom traits into a single set. -func (s *Shim) getAllTraits(ctx context.Context) (map[string]struct{}, error) { - static, err := s.getStaticTraits(ctx) - if err != nil { - return nil, err - } - custom, err := s.getCustomTraits(ctx) - if err != nil { - return nil, err - } - for t := range custom { - static[t] = struct{}{} - } - return static, nil -} - // parseTraits extracts the trait set from a ConfigMap. func parseTraits(cm *corev1.ConfigMap) (map[string]struct{}, error) { raw, ok := cm.Data[configMapKeyTraits] @@ -432,7 +325,7 @@ func parseTraits(cm *corev1.ConfigMap) (map[string]struct{}, error) { } func (s *Shim) hasTrait(ctx context.Context, name string) (bool, error) { - traits, err := s.getAllTraits(ctx) + traits, err := s.getTraits(ctx) if err != nil { return false, err } @@ -440,8 +333,8 @@ func (s *Shim) hasTrait(ctx context.Context, name string) (bool, error) { return ok, nil } -// writeTraits serializes the trait set into the ConfigMap's data field. -func (s *Shim) writeTraits(cm *corev1.ConfigMap, traitSet map[string]struct{}) error { +// writeTraitsToConfigMap serializes the trait set into the ConfigMap's data field. +func writeTraitsToConfigMap(cm *corev1.ConfigMap, traitSet map[string]struct{}) error { traits := make([]string, 0, len(traitSet)) for t := range traitSet { traits = append(traits, t) @@ -459,121 +352,110 @@ func (s *Shim) writeTraits(cm *corev1.ConfigMap, traitSet map[string]struct{}) e return nil } -// syncTraitToUpstream best-effort creates the trait in upstream placement so -// that endpoints forwarded to upstream (e.g. PUT /resource_providers/{uuid}/traits) -// can reference locally-created custom traits. Errors are logged but never -// propagated — upstream may be unreachable and that is acceptable. -func (s *Shim) syncTraitToUpstream(ctx context.Context, name string, incomingHeader http.Header) { - log := logf.FromContext(ctx) - if s.httpClient == nil { - log.V(1).Info("skipping upstream trait sync, no http client configured", "trait", name) - return - } - u, err := url.Parse(s.config.PlacementURL) +// addTraitToConfigMap adds a trait to the ConfigMap under the resource lock. +// Returns true if the trait was newly created, false if it already existed. +func (s *Shim) addTraitToConfigMap(ctx context.Context, name string) (bool, error) { + // Fast path: trait already exists (no lock needed). + traits, err := s.getTraits(ctx) if err != nil { - log.Error(err, "failed to parse placement URL for trait sync", "trait", name) - return + return false, err } - u.Path, err = url.JoinPath(u.Path, "/traits/"+name) - if err != nil { - log.Error(err, "failed to build upstream trait URL", "trait", name) - return + if _, exists := traits[name]; exists { + return false, nil } - req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), http.NoBody) + + // Slow path: acquire lock, re-read, add trait. + host, err := os.Hostname() if err != nil { - log.Error(err, "failed to create upstream trait request", "trait", name) - return + return false, fmt.Errorf("get hostname: %w", err) } - // Forward authentication headers so upstream placement accepts the request. - req.Header = incomingHeader.Clone() - resp, err := s.httpClient.Do(req) - if err != nil { - log.Info("best-effort upstream trait sync failed, upstream may be down", "trait", name, "error", err.Error()) - return + lockerID := fmt.Sprintf("shim-%s-%d", host, time.Now().UnixNano()) + if err := s.resourceLocker.AcquireLock(ctx, s.config.Traits.ConfigMapName+"-lock", lockerID); err != nil { + return false, fmt.Errorf("acquire traits lock: %w", err) } - defer resp.Body.Close() - log.Info("synced custom trait to upstream placement", "trait", name, "status", resp.StatusCode) -} + defer func() { + releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = s.resourceLocker.ReleaseLock(releaseCtx, s.config.Traits.ConfigMapName+"-lock", lockerID) //nolint:errcheck + }() -// startTraitSyncLoop runs a periodic goroutine that fetches traits from -// upstream placement and writes them into the static ConfigMap. Only active -// when features.traits is hybrid. The loop exits when ctx is cancelled. -func (s *Shim) startTraitSyncLoop(ctx context.Context) { - if s.config.Features.Traits.orDefault() != FeatureModeHybrid { - return + cm := &corev1.ConfigMap{} + key := client.ObjectKey{Namespace: os.Getenv("POD_NAMESPACE"), Name: s.config.Traits.ConfigMapName} + if err := s.Get(ctx, key, cm); err != nil { + if apierrors.IsNotFound(err) { + cm = &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: key.Name, + Namespace: key.Namespace, + }, + Data: map[string]string{configMapKeyTraits: "[]"}, + } + current := map[string]struct{}{name: {}} + if err := writeTraitsToConfigMap(cm, current); err != nil { + return false, err + } + if err := s.Create(ctx, cm); err != nil { + return false, fmt.Errorf("create traits configmap: %w", err) + } + return true, nil + } + return false, fmt.Errorf("get traits configmap: %w", err) } - log := ctrl.Log.WithName("placement-shim").WithName("trait-sync") - jitter := time.Duration(rand.Int63n(int64(30 * time.Second))) //nolint:gosec - log.Info("starting trait sync loop", "jitter", jitter) - select { - case <-ctx.Done(): - return - case <-time.After(jitter): + current, err := parseTraits(cm) + if err != nil { + return false, err } - - s.syncTraitsFromUpstream(ctx, log) - - ticker := time.NewTicker(60 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - s.syncTraitsFromUpstream(ctx, log) - } + if _, exists := current[name]; exists { + return false, nil } -} - -// syncTraitsFromUpstream fetches GET /traits from upstream placement and -// writes the result into the static ConfigMap so that the shim's local -// view stays in sync with upstream. Uses the gophercloud ServiceClient -// for automatic token management (including reauth on 401). -func (s *Shim) syncTraitsFromUpstream(ctx context.Context, log logr.Logger) { - if s.placementServiceClient == nil { - log.V(1).Info("skipping upstream trait sync, no placement service client configured") - return + current[name] = struct{}{} + if err := writeTraitsToConfigMap(cm, current); err != nil { + return false, err } - u, err := url.JoinPath(s.placementServiceClient.Endpoint, "/traits") - if err != nil { - log.Error(err, "failed to build upstream traits URL") - return + if err := s.Update(ctx, cm); err != nil { + return false, fmt.Errorf("update traits configmap: %w", err) } - resp, err := s.placementServiceClient.Request(ctx, http.MethodGet, u, &gophercloud.RequestOpts{ - OkCodes: []int{http.StatusOK}, - MoreHeaders: map[string]string{ - "OpenStack-API-Version": "placement 1.6", - }, - KeepResponseBody: true, - }) + return true, nil +} + +// removeTraitFromConfigMap removes a trait from the ConfigMap under the +// resource lock. Returns true if the trait was found and removed. +func (s *Shim) removeTraitFromConfigMap(ctx context.Context, name string) (bool, error) { + host, err := os.Hostname() if err != nil { - log.Info("upstream trait sync failed", "error", err.Error()) - return + return false, fmt.Errorf("get hostname: %w", err) } - defer resp.Body.Close() - var body traitsListResponse - if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { - log.Error(err, "failed to decode upstream trait list") - return + lockerID := fmt.Sprintf("shim-%s-%d", host, time.Now().UnixNano()) + if err := s.resourceLocker.AcquireLock(ctx, s.config.Traits.ConfigMapName+"-lock", lockerID); err != nil { + return false, fmt.Errorf("acquire traits lock: %w", err) } + defer func() { + releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = s.resourceLocker.ReleaseLock(releaseCtx, s.config.Traits.ConfigMapName+"-lock", lockerID) //nolint:errcheck + }() cm := &corev1.ConfigMap{} - if err := s.Get(ctx, s.staticTraitsConfigMapKey(), cm); err != nil { - log.Error(err, "failed to get static traits configmap for sync") - return + if err := s.Get(ctx, client.ObjectKey{Namespace: os.Getenv("POD_NAMESPACE"), Name: s.config.Traits.ConfigMapName}, cm); err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("get traits configmap: %w", err) } - traitSet := make(map[string]struct{}, len(body.Traits)) - for _, t := range body.Traits { - traitSet[t] = struct{}{} + current, err := parseTraits(cm) + if err != nil { + return false, err } - if err := s.writeTraits(cm, traitSet); err != nil { - log.Error(err, "failed to serialize synced traits") - return + if _, exists := current[name]; !exists { + return false, nil + } + delete(current, name) + if err := writeTraitsToConfigMap(cm, current); err != nil { + return false, err } if err := s.Update(ctx, cm); err != nil { - log.Error(err, "failed to update static traits configmap with upstream data") - return + return false, fmt.Errorf("update traits configmap: %w", err) } - log.Info("synced traits from upstream placement", "count", len(body.Traits)) + return true, nil } diff --git a/internal/shim/placement/handle_traits_e2e.go b/internal/shim/placement/handle_traits_e2e.go index 4a5831f72..9c204e0f8 100644 --- a/internal/shim/placement/handle_traits_e2e.go +++ b/internal/shim/placement/handle_traits_e2e.go @@ -80,13 +80,10 @@ func e2eTestTraits(ctx context.Context, _ client.Client) error { if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil { return fmt.Errorf("failed to decode GET /traits response: %w", err) } - // When traits are served locally (hybrid or crd mode) the static list may - // be empty. Only require at least one trait when forwarding to upstream + // When traits are served locally (hybrid or crd mode) the list may be + // empty. Only require at least one trait when forwarding to upstream // placement, which always has standard traits. traitsMode := e2eCurrentMode(ctx) - if traitsMode == "" { - traitsMode = config.Features.Traits.orDefault() - } if traitsMode == FeatureModePassthrough && len(listResp.Traits) == 0 { return errors.New("GET /traits: expected at least one trait, got 0") } @@ -135,19 +132,9 @@ func e2eTestTraits(ctx context.Context, _ client.Client) error { } log.Info("Correctly received 404 for nonexistent trait") - // ==================== Phase 2: CRUD tests (feature-gated) ==================== - - // CRUD tests require traits ConfigMaps which are only created when the - // configured traits mode is hybrid or crd. The override header changes - // handler routing but cannot create ConfigMaps that don't exist. - configuredTraitsMode := config.Features.Traits.orDefault() - if traitsMode == FeatureModePassthrough || configuredTraitsMode == FeatureModePassthrough { - log.Info("Skipping trait CRUD e2e tests", - "overrideMode", traitsMode, "configuredMode", configuredTraitsMode) - return nil - } + // ==================== Phase 2: CRUD tests ==================== - log.Info("=== Phase 2: CRUD trait tests (traits mode non-passthrough) ===") + log.Info("=== Phase 2: CRUD trait tests ===") const testTrait = "CUSTOM_CORTEX_E2E_TRAIT" diff --git a/internal/shim/placement/handle_traits_test.go b/internal/shim/placement/handle_traits_test.go index bf692fd41..c7531796a 100644 --- a/internal/shim/placement/handle_traits_test.go +++ b/internal/shim/placement/handle_traits_test.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "net/http" - "net/http/httptest" "testing" "github.com/cobaltcore-dev/cortex/pkg/resourcelock" @@ -42,13 +41,12 @@ func newTestConfigMap(namespace, name string, traits []string) *corev1.ConfigMap } } -func newTraitShim(t *testing.T, staticTraits []string, customTraits ...string) *Shim { +func newTraitShim(t *testing.T, traits []string, extraTraits ...string) *Shim { t.Helper() t.Setenv("POD_NAMESPACE", "default") - objs := []client.Object{newTestConfigMap("default", "test-cm", staticTraits)} - if len(customTraits) > 0 { - objs = append(objs, newTestConfigMap("default", "test-cm-custom", customTraits)) - } + all := append([]string{}, traits...) + all = append(all, extraTraits...) + objs := []client.Object{newTestConfigMap("default", "test-cm", all)} cl := newFakeClientWithScheme(t, objs...) down, up := newTestTimers() return &Shim{ @@ -264,39 +262,11 @@ func TestHandleUpdateTraitLocalBadPrefix(t *testing.T) { } } -func TestHandleUpdateTraitLocalSyncsToUpstream(t *testing.T) { - var gotMethod, gotPath string - upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - gotMethod = r.Method - gotPath = r.URL.Path - w.WriteHeader(http.StatusNoContent) - })) - t.Cleanup(upstream.Close) +func TestHandleUpdateTraitLocalNoUpstreamContact(t *testing.T) { s := newTraitShim(t, nil) - s.config.PlacementURL = upstream.URL - s.httpClient = upstream.Client() - - w := serveHandler(t, "PUT", "/traits/{name}", s.HandleUpdateTrait, "/traits/CUSTOM_NEW") - if w.Code != http.StatusCreated { - t.Fatalf("status = %d, want %d", w.Code, http.StatusCreated) - } - if gotMethod != "PUT" || gotPath != "/traits/CUSTOM_NEW" { - t.Fatalf("upstream got %s %s, want PUT /traits/CUSTOM_NEW", gotMethod, gotPath) - } -} - -func TestHandleUpdateTraitLocalUpstreamDown(t *testing.T) { - upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - })) - t.Cleanup(upstream.Close) - s := newTraitShim(t, nil) - s.config.PlacementURL = upstream.URL - s.httpClient = upstream.Client() - w := serveHandler(t, "PUT", "/traits/{name}", s.HandleUpdateTrait, "/traits/CUSTOM_NEW") if w.Code != http.StatusCreated { - t.Fatalf("status = %d, want %d; upstream failure should not block local creation", w.Code, http.StatusCreated) + t.Fatalf("status = %d, want %d; CRD mode should not contact upstream", w.Code, http.StatusCreated) } } diff --git a/internal/shim/placement/shim.go b/internal/shim/placement/shim.go index b46546b63..35cbd2303 100644 --- a/internal/shim/placement/shim.go +++ b/internal/shim/placement/shim.go @@ -247,15 +247,15 @@ func (c *config) validate() error { } } traitsMode := c.Features.Traits.orDefault() - if traitsMode == FeatureModeHybrid || traitsMode == FeatureModeCRD { - if c.Traits == nil { - return fmt.Errorf("traits config is required when features.traits is %s", traitsMode) - } + if traitsMode != FeatureModePassthrough && c.Traits == nil { + return fmt.Errorf("traits config is required when features.traits is %s", traitsMode) + } + if c.Traits != nil { if c.Traits.ConfigMapName == "" { - return fmt.Errorf("traits.configMapName is required when features.traits is %s", traitsMode) + return errors.New("traits.configMapName is required when traits config is present") } - if traitsMode == FeatureModeCRD && os.Getenv("POD_NAMESPACE") == "" { - return errors.New("pod namespace (POD_NAMESPACE) is required when features.traits is crd") + if os.Getenv("POD_NAMESPACE") == "" { + return errors.New("pod namespace (POD_NAMESPACE) is required when traits config is present") } } if c.Auth != nil && c.KeystoneURL == "" { @@ -303,14 +303,17 @@ type Shim struct { tokenCache *tokenCache // tokenIntrospector validates tokens against Keystone. tokenIntrospector tokenIntrospector - // resourceLocker serializes writes to the custom traits ConfigMap - // across replicas using a Kubernetes Lease. + // resourceLocker serializes writes to ConfigMaps across replicas + // using a Kubernetes Lease. resourceLocker *resourcelock.ResourceLocker // placementServiceClient is an authenticated gophercloud service client // used by background tasks (trait sync) to make requests to upstream // placement with automatic token management (including reauth on 401). // Nil when Keystone credentials are not configured. placementServiceClient *gophercloud.ServiceClient + // syncers are background workers that manage ConfigMap-backed local + // stores (e.g. traits, resource classes). Started uniformly in Start. + syncers []Syncer } // Describe implements prometheus.Collector. @@ -433,7 +436,22 @@ func (s *Shim) Start(ctx context.Context) error { if err := s.initPlacementServiceClient(ctx); err != nil { return err } - go s.startTraitSyncLoop(ctx) + if s.config.Traits != nil { + s.syncers = append(s.syncers, NewTraitSyncer( + s.Client, + s.config.Traits.ConfigMapName, + os.Getenv("POD_NAMESPACE"), + s.placementServiceClient, + )) + } + for _, syncer := range s.syncers { + if err := syncer.Init(ctx); err != nil { + return err + } + } + for _, syncer := range s.syncers { + go syncer.Run(ctx) + } return nil } diff --git a/internal/shim/placement/syncer.go b/internal/shim/placement/syncer.go new file mode 100644 index 000000000..f00487fe9 --- /dev/null +++ b/internal/shim/placement/syncer.go @@ -0,0 +1,19 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package placement + +import "context" + +// Syncer manages the lifecycle of a ConfigMap-backed local store: +// creating the ConfigMap on startup, and running a periodic background +// sync from upstream placement. +type Syncer interface { + // Init creates the ConfigMap if it does not exist. Called once during + // Shim.Start before any requests are served. + Init(ctx context.Context) error + + // Run starts the periodic background sync from upstream. Blocks until + // ctx is cancelled. Called as a goroutine from Shim.Start. + Run(ctx context.Context) +} diff --git a/internal/shim/placement/syncer_traits.go b/internal/shim/placement/syncer_traits.go new file mode 100644 index 000000000..735bcb825 --- /dev/null +++ b/internal/shim/placement/syncer_traits.go @@ -0,0 +1,155 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package placement + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "net/url" + "time" + + "github.com/gophercloud/gophercloud/v2" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// TraitSyncer manages the lifecycle of the single traits ConfigMap. +// It creates the ConfigMap on startup and periodically syncs from upstream. +type TraitSyncer struct { + client client.Client + configMapName string + namespace string + placementClient *gophercloud.ServiceClient +} + +func NewTraitSyncer( + cl client.Client, + configMapName string, + namespace string, + placementClient *gophercloud.ServiceClient, +) *TraitSyncer { + + return &TraitSyncer{ + client: cl, + configMapName: configMapName, + namespace: namespace, + placementClient: placementClient, + } +} + +// Init creates the traits ConfigMap if it does not already exist. +func (ts *TraitSyncer) Init(ctx context.Context) error { + log := ctrl.Log.WithName("placement-shim").WithName("trait-syncer") + cm := &corev1.ConfigMap{} + key := client.ObjectKey{Namespace: ts.namespace, Name: ts.configMapName} + err := ts.client.Get(ctx, key, cm) + if err == nil { + log.Info("Traits ConfigMap already exists", "name", ts.configMapName) + return nil + } + if !apierrors.IsNotFound(err) { + return fmt.Errorf("checking traits configmap: %w", err) + } + cm = &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: ts.configMapName, + Namespace: ts.namespace, + }, + Data: map[string]string{configMapKeyTraits: "[]"}, + } + if err := ts.client.Create(ctx, cm); err != nil { + if apierrors.IsAlreadyExists(err) { + log.Info("Traits ConfigMap was created concurrently", "name", ts.configMapName) + return nil + } + return fmt.Errorf("creating traits configmap: %w", err) + } + log.Info("Created traits ConfigMap", "name", ts.configMapName) + return nil +} + +// Run starts the periodic background sync from upstream placement. +// Blocks until ctx is cancelled. +func (ts *TraitSyncer) Run(ctx context.Context) { + log := ctrl.Log.WithName("placement-shim").WithName("trait-syncer") + if ts.placementClient == nil { + log.Info("No placement service client configured, trait sync loop will not run") + return + } + + jitter := time.Duration(rand.Int63n(int64(30 * time.Second))) //nolint:gosec + log.Info("Starting trait sync loop", "jitter", jitter) + + select { + case <-ctx.Done(): + return + case <-time.After(jitter): + } + + ts.sync(ctx) + + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ts.sync(ctx) + } + } +} + +// sync fetches GET /traits from upstream placement and writes the result +// into the ConfigMap. +func (ts *TraitSyncer) sync(ctx context.Context) { + log := ctrl.Log.WithName("placement-shim").WithName("trait-syncer") + u, err := url.JoinPath(ts.placementClient.Endpoint, "/traits") + if err != nil { + log.Error(err, "Failed to build upstream traits URL") + return + } + resp, err := ts.placementClient.Request(ctx, http.MethodGet, u, &gophercloud.RequestOpts{ + OkCodes: []int{http.StatusOK}, + MoreHeaders: map[string]string{ + "OpenStack-API-Version": "placement 1.6", + }, + KeepResponseBody: true, + }) + if err != nil { + log.Info("Upstream trait sync failed", "error", err.Error()) + return + } + defer resp.Body.Close() + var body traitsListResponse + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + log.Error(err, "Failed to decode upstream trait list") + return + } + + cm := &corev1.ConfigMap{} + if err := ts.client.Get(ctx, client.ObjectKey{Namespace: ts.namespace, Name: ts.configMapName}, cm); err != nil { + log.Error(err, "Failed to get traits ConfigMap for sync") + return + } + traitSet := make(map[string]struct{}, len(body.Traits)) + for _, t := range body.Traits { + traitSet[t] = struct{}{} + } + if err := writeTraitsToConfigMap(cm, traitSet); err != nil { + log.Error(err, "Failed to serialize synced traits") + return + } + if err := ts.client.Update(ctx, cm); err != nil { + log.Error(err, "Failed to update traits ConfigMap with upstream data") + return + } + log.Info("Synced traits from upstream placement", "count", len(body.Traits)) +} diff --git a/internal/shim/placement/syncer_traits_test.go b/internal/shim/placement/syncer_traits_test.go new file mode 100644 index 000000000..c3038c668 --- /dev/null +++ b/internal/shim/placement/syncer_traits_test.go @@ -0,0 +1,144 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package placement + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gophercloud/gophercloud/v2" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestTraitSyncerInitCreatesConfigMap(t *testing.T) { + cl := newFakeClientWithScheme(t) + ts := NewTraitSyncer(cl, "test-traits", "default", nil) + + if err := ts.Init(context.Background()); err != nil { + t.Fatalf("Init: %v", err) + } + + cm := &corev1.ConfigMap{} + if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-traits"}, cm); err != nil { + t.Fatalf("get ConfigMap: %v", err) + } + if cm.Data[configMapKeyTraits] != "[]" { + t.Fatalf("expected empty traits array, got %q", cm.Data[configMapKeyTraits]) + } +} + +func TestTraitSyncerInitIdempotent(t *testing.T) { + existing := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test-traits", Namespace: "default"}, + Data: map[string]string{configMapKeyTraits: `["CUSTOM_EXISTING"]`}, + } + cl := newFakeClientWithScheme(t, existing) + ts := NewTraitSyncer(cl, "test-traits", "default", nil) + + if err := ts.Init(context.Background()); err != nil { + t.Fatalf("Init: %v", err) + } + + cm := &corev1.ConfigMap{} + if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-traits"}, cm); err != nil { + t.Fatalf("get ConfigMap: %v", err) + } + if cm.Data[configMapKeyTraits] != `["CUSTOM_EXISTING"]` { + t.Fatalf("Init overwrote existing data: got %q", cm.Data[configMapKeyTraits]) + } +} + +func TestTraitSyncerRunNoClient(t *testing.T) { + cl := newFakeClientWithScheme(t) + ts := NewTraitSyncer(cl, "test-traits", "default", nil) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + ts.Run(ctx) +} + +func TestTraitSyncerSyncWritesUpstreamTraits(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/traits" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(traitsListResponse{ + Traits: []string{"HW_CPU_X86_AVX2", "CUSTOM_SYNCED"}, + }); err != nil { + t.Errorf("encode response: %v", err) + } + })) + t.Cleanup(upstream.Close) + + existing := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test-traits", Namespace: "default"}, + Data: map[string]string{configMapKeyTraits: "[]"}, + } + cl := newFakeClientWithScheme(t, existing) + + sc := &gophercloud.ServiceClient{ + ProviderClient: &gophercloud.ProviderClient{}, + Endpoint: upstream.URL, + } + sc.HTTPClient = *upstream.Client() + + ts := NewTraitSyncer(cl, "test-traits", "default", sc) + ts.sync(context.Background()) + + cm := &corev1.ConfigMap{} + if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-traits"}, cm); err != nil { + t.Fatalf("get ConfigMap: %v", err) + } + + var traits []string + if err := json.Unmarshal([]byte(cm.Data[configMapKeyTraits]), &traits); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(traits) != 2 { + t.Fatalf("expected 2 traits, got %d: %v", len(traits), traits) + } + want := map[string]bool{"CUSTOM_SYNCED": true, "HW_CPU_X86_AVX2": true} + for _, tr := range traits { + if !want[tr] { + t.Errorf("unexpected trait: %s", tr) + } + } +} + +func TestTraitSyncerSyncUpstreamError(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + t.Cleanup(upstream.Close) + + existing := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test-traits", Namespace: "default"}, + Data: map[string]string{configMapKeyTraits: `["CUSTOM_ORIGINAL"]`}, + } + cl := newFakeClientWithScheme(t, existing) + + sc := &gophercloud.ServiceClient{ + ProviderClient: &gophercloud.ProviderClient{}, + Endpoint: upstream.URL, + } + sc.HTTPClient = *upstream.Client() + + ts := NewTraitSyncer(cl, "test-traits", "default", sc) + ts.sync(context.Background()) + + cm := &corev1.ConfigMap{} + if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-traits"}, cm); err != nil { + t.Fatalf("get ConfigMap: %v", err) + } + if cm.Data[configMapKeyTraits] != `["CUSTOM_ORIGINAL"]` { + t.Fatalf("sync should not have modified ConfigMap on error, got %q", cm.Data[configMapKeyTraits]) + } +} From 891e77d715bcfcdf23eb79e3ca21c97dc5325922 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Wed, 29 Apr 2026 11:40:15 +0200 Subject: [PATCH 2/2] Address review feedback: body copy, sync gating, lock serialization - Copy upstream response body in hybrid mode hooks so error payloads are preserved for the caller. - Only start TraitSyncer.Run in hybrid/passthrough modes to prevent the background sync from overwriting local CRD-mode state. - Acquire the same resource lock in TraitSyncer.sync to serialize with concurrent CRUD operations on the ConfigMap. --- internal/shim/placement/handle_traits.go | 8 +++++++- internal/shim/placement/shim.go | 6 +++++- internal/shim/placement/syncer_traits.go | 20 ++++++++++++++++++- internal/shim/placement/syncer_traits_test.go | 11 +++++----- 4 files changed, 37 insertions(+), 8 deletions(-) diff --git a/internal/shim/placement/handle_traits.go b/internal/shim/placement/handle_traits.go index 3e10ab6fc..d815b9d55 100644 --- a/internal/shim/placement/handle_traits.go +++ b/internal/shim/placement/handle_traits.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "os" "sort" @@ -203,13 +204,15 @@ func (s *Shim) handleUpdateTraitHybrid(w http.ResponseWriter, r *http.Request) { } s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { - // Copy the upstream response to the caller. for k, vs := range resp.Header { for _, v := range vs { w.Header().Add(k, v) } } w.WriteHeader(resp.StatusCode) + if resp.Body != nil { + io.Copy(w, resp.Body) //nolint:errcheck + } if resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusNoContent { if _, err := s.addTraitToConfigMap(ctx, name); err != nil { @@ -289,6 +292,9 @@ func (s *Shim) handleDeleteTraitHybrid(w http.ResponseWriter, r *http.Request) { } } w.WriteHeader(resp.StatusCode) + if resp.Body != nil { + io.Copy(w, resp.Body) //nolint:errcheck + } if resp.StatusCode == http.StatusNoContent { if _, err := s.removeTraitFromConfigMap(ctx, name); err != nil { diff --git a/internal/shim/placement/shim.go b/internal/shim/placement/shim.go index 35cbd2303..f601788a4 100644 --- a/internal/shim/placement/shim.go +++ b/internal/shim/placement/shim.go @@ -442,6 +442,7 @@ func (s *Shim) Start(ctx context.Context) error { s.config.Traits.ConfigMapName, os.Getenv("POD_NAMESPACE"), s.placementServiceClient, + s.resourceLocker, )) } for _, syncer := range s.syncers { @@ -449,8 +450,11 @@ func (s *Shim) Start(ctx context.Context) error { return err } } + traitsMode := s.config.Features.Traits.orDefault() for _, syncer := range s.syncers { - go syncer.Run(ctx) + if traitsMode == FeatureModeHybrid || traitsMode == FeatureModePassthrough { + go syncer.Run(ctx) + } } return nil } diff --git a/internal/shim/placement/syncer_traits.go b/internal/shim/placement/syncer_traits.go index 735bcb825..d8067e1f3 100644 --- a/internal/shim/placement/syncer_traits.go +++ b/internal/shim/placement/syncer_traits.go @@ -10,8 +10,10 @@ import ( "math/rand" "net/http" "net/url" + "os" "time" + "github.com/cobaltcore-dev/cortex/pkg/resourcelock" "github.com/gophercloud/gophercloud/v2" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -27,6 +29,7 @@ type TraitSyncer struct { configMapName string namespace string placementClient *gophercloud.ServiceClient + resourceLocker *resourcelock.ResourceLocker } func NewTraitSyncer( @@ -34,6 +37,7 @@ func NewTraitSyncer( configMapName string, namespace string, placementClient *gophercloud.ServiceClient, + resourceLocker *resourcelock.ResourceLocker, ) *TraitSyncer { return &TraitSyncer{ @@ -41,6 +45,7 @@ func NewTraitSyncer( configMapName: configMapName, namespace: namespace, placementClient: placementClient, + resourceLocker: resourceLocker, } } @@ -108,7 +113,7 @@ func (ts *TraitSyncer) Run(ctx context.Context) { } // sync fetches GET /traits from upstream placement and writes the result -// into the ConfigMap. +// into the ConfigMap under the resource lock. func (ts *TraitSyncer) sync(ctx context.Context) { log := ctrl.Log.WithName("placement-shim").WithName("trait-syncer") u, err := url.JoinPath(ts.placementClient.Endpoint, "/traits") @@ -134,6 +139,19 @@ func (ts *TraitSyncer) sync(ctx context.Context) { return } + host, _ := os.Hostname() //nolint:errcheck + lockerID := fmt.Sprintf("syncer-%s-%d", host, time.Now().UnixNano()) + lockName := ts.configMapName + "-lock" + if err := ts.resourceLocker.AcquireLock(ctx, lockName, lockerID); err != nil { + log.Error(err, "Failed to acquire lock for trait sync") + return + } + defer func() { + releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = ts.resourceLocker.ReleaseLock(releaseCtx, lockName, lockerID) //nolint:errcheck + }() + cm := &corev1.ConfigMap{} if err := ts.client.Get(ctx, client.ObjectKey{Namespace: ts.namespace, Name: ts.configMapName}, cm); err != nil { log.Error(err, "Failed to get traits ConfigMap for sync") diff --git a/internal/shim/placement/syncer_traits_test.go b/internal/shim/placement/syncer_traits_test.go index c3038c668..23cada155 100644 --- a/internal/shim/placement/syncer_traits_test.go +++ b/internal/shim/placement/syncer_traits_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/cobaltcore-dev/cortex/pkg/resourcelock" "github.com/gophercloud/gophercloud/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -19,7 +20,7 @@ import ( func TestTraitSyncerInitCreatesConfigMap(t *testing.T) { cl := newFakeClientWithScheme(t) - ts := NewTraitSyncer(cl, "test-traits", "default", nil) + ts := NewTraitSyncer(cl, "test-traits", "default", nil, resourcelock.NewResourceLocker(cl, "default")) if err := ts.Init(context.Background()); err != nil { t.Fatalf("Init: %v", err) @@ -40,7 +41,7 @@ func TestTraitSyncerInitIdempotent(t *testing.T) { Data: map[string]string{configMapKeyTraits: `["CUSTOM_EXISTING"]`}, } cl := newFakeClientWithScheme(t, existing) - ts := NewTraitSyncer(cl, "test-traits", "default", nil) + ts := NewTraitSyncer(cl, "test-traits", "default", nil, resourcelock.NewResourceLocker(cl, "default")) if err := ts.Init(context.Background()); err != nil { t.Fatalf("Init: %v", err) @@ -57,7 +58,7 @@ func TestTraitSyncerInitIdempotent(t *testing.T) { func TestTraitSyncerRunNoClient(t *testing.T) { cl := newFakeClientWithScheme(t) - ts := NewTraitSyncer(cl, "test-traits", "default", nil) + ts := NewTraitSyncer(cl, "test-traits", "default", nil, resourcelock.NewResourceLocker(cl, "default")) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() @@ -90,7 +91,7 @@ func TestTraitSyncerSyncWritesUpstreamTraits(t *testing.T) { } sc.HTTPClient = *upstream.Client() - ts := NewTraitSyncer(cl, "test-traits", "default", sc) + ts := NewTraitSyncer(cl, "test-traits", "default", sc, resourcelock.NewResourceLocker(cl, "default")) ts.sync(context.Background()) cm := &corev1.ConfigMap{} @@ -131,7 +132,7 @@ func TestTraitSyncerSyncUpstreamError(t *testing.T) { } sc.HTTPClient = *upstream.Client() - ts := NewTraitSyncer(cl, "test-traits", "default", sc) + ts := NewTraitSyncer(cl, "test-traits", "default", sc, resourcelock.NewResourceLocker(cl, "default")) ts.sync(context.Background()) cm := &corev1.ConfigMap{}