From 227361901d6602d2769fa8613455f3dc86fc135d Mon Sep 17 00:00:00 2001
From: Philipp Matthes
Date: Wed, 29 Apr 2026 11:27:10 +0200
Subject: [PATCH 1/2] Refactor /traits API to single-ConfigMap model with
Syncer interface
Resolves #395
---
.../templates/configmap-traits.yaml | 11 -
.../bundles/cortex-placement-shim/values.yaml | 3 -
internal/shim/placement/handle_traits.go | 478 +++++++-----------
internal/shim/placement/handle_traits_e2e.go | 21 +-
internal/shim/placement/handle_traits_test.go | 42 +-
internal/shim/placement/shim.go | 38 +-
internal/shim/placement/syncer.go | 19 +
internal/shim/placement/syncer_traits.go | 155 ++++++
internal/shim/placement/syncer_traits_test.go | 144 ++++++
9 files changed, 536 insertions(+), 375 deletions(-)
delete mode 100644 helm/bundles/cortex-placement-shim/templates/configmap-traits.yaml
create mode 100644 internal/shim/placement/syncer.go
create mode 100644 internal/shim/placement/syncer_traits.go
create mode 100644 internal/shim/placement/syncer_traits_test.go
diff --git a/helm/bundles/cortex-placement-shim/templates/configmap-traits.yaml b/helm/bundles/cortex-placement-shim/templates/configmap-traits.yaml
deleted file mode 100644
index b6969aaa7..000000000
--- a/helm/bundles/cortex-placement-shim/templates/configmap-traits.yaml
+++ /dev/null
@@ -1,11 +0,0 @@
-{{- if ne ((index .Values "cortex-shim").conf.features.traits | default "passthrough") "passthrough" }}
-{{- $cmName := (index .Values "cortex-shim").conf.traits.configMapName }}
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: {{ $cmName }}
- labels:
- {{- include "chart.labels" . | nindent 4 }}
-data:
- traits: {{ (index .Values "cortex-shim").conf.traits.static | toJson | quote }}
-{{- end }}
diff --git a/helm/bundles/cortex-placement-shim/values.yaml b/helm/bundles/cortex-placement-shim/values.yaml
index 7e1818e9e..54805cee3 100644
--- a/helm/bundles/cortex-placement-shim/values.yaml
+++ b/helm/bundles/cortex-placement-shim/values.yaml
@@ -62,9 +62,6 @@ cortex-shim:
status: "CURRENT"
traits:
configMapName: "cortex-placement-shim-traits"
- # Static traits included in every Helm install/upgrade. The shim
- # merges them with dynamic CUSTOM_* traits at request time.
- static: []
auth:
tokenCacheTTL: "5m"
policies:
diff --git a/internal/shim/placement/handle_traits.go b/internal/shim/placement/handle_traits.go
index 429b87cfc..3e10ab6fc 100644
--- a/internal/shim/placement/handle_traits.go
+++ b/internal/shim/placement/handle_traits.go
@@ -7,44 +7,21 @@ import (
"context"
"encoding/json"
"fmt"
- "math/rand"
"net/http"
- "net/url"
"os"
"sort"
"strings"
"time"
- "github.com/go-logr/logr"
- "github.com/gophercloud/gophercloud/v2"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
const configMapKeyTraits = "traits"
-func (s *Shim) staticTraitsConfigMapKey() client.ObjectKey {
- return client.ObjectKey{
- Namespace: os.Getenv("POD_NAMESPACE"),
- Name: s.config.Traits.ConfigMapName,
- }
-}
-
-func (s *Shim) customTraitsConfigMapKey() client.ObjectKey {
- return client.ObjectKey{
- Namespace: os.Getenv("POD_NAMESPACE"),
- Name: s.config.Traits.ConfigMapName + "-custom",
- }
-}
-
-func (s *Shim) traitsLockName() string {
- return s.config.Traits.ConfigMapName + "-custom-lock"
-}
-
// traitsListResponse matches the OpenStack Placement GET /traits response.
type traitsListResponse struct {
Traits []string `json:"traits"`
@@ -52,10 +29,10 @@ type traitsListResponse struct {
// HandleListTraits handles GET /traits requests.
//
-// Returns a sorted list of trait strings merged from the static (Helm-managed)
-// and dynamic (CUSTOM_*) ConfigMaps. Supports optional query parameter "name"
-// for filtering: "in:TRAIT_A,TRAIT_B" returns only named traits,
-// "startswith:CUSTOM_" returns prefix matches.
+// Feature modes:
+// - passthrough: forwards to upstream placement.
+// - hybrid: forwards to upstream placement.
+// - crd: serves the trait list from the local ConfigMap.
//
// See: https://docs.openstack.org/api-ref/placement/#list-traits
func (s *Shim) HandleListTraits(w http.ResponseWriter, r *http.Request) {
@@ -67,15 +44,15 @@ func (s *Shim) HandleListTraits(w http.ResponseWriter, r *http.Request) {
s.forward(w, r)
return
case FeatureModeCRD:
- // Serve from local ConfigMaps.
+ // Serve from local ConfigMap.
default:
http.Error(w, "unknown feature mode", http.StatusInternalServerError)
return
}
- traitSet, err := s.getAllTraits(ctx)
+ traitSet, err := s.getTraits(ctx)
if err != nil {
- log.Error(err, "failed to list traits from configmaps")
+ log.Error(err, "failed to list traits from configmap")
http.Error(w, "failed to list traits", http.StatusInternalServerError)
return
}
@@ -124,8 +101,10 @@ func (s *Shim) HandleListTraits(w http.ResponseWriter, r *http.Request) {
// HandleShowTrait handles GET /traits/{name} requests.
//
-// Checks whether a trait with the given name exists in either the static
-// or dynamic ConfigMap. Returns 204 No Content if found, 404 Not Found otherwise.
+// Feature modes:
+// - passthrough: forwards to upstream placement.
+// - hybrid: forwards to upstream placement.
+// - crd: checks the local ConfigMap for the trait.
//
// See: https://docs.openstack.org/api-ref/placement/#show-traits
func (s *Shim) HandleShowTrait(w http.ResponseWriter, r *http.Request) {
@@ -137,7 +116,7 @@ func (s *Shim) HandleShowTrait(w http.ResponseWriter, r *http.Request) {
s.forward(w, r)
return
case FeatureModeCRD:
- // Serve from local ConfigMaps.
+ // Serve from local ConfigMap.
default:
http.Error(w, "unknown feature mode", http.StatusInternalServerError)
return
@@ -164,22 +143,26 @@ func (s *Shim) HandleShowTrait(w http.ResponseWriter, r *http.Request) {
// HandleUpdateTrait handles PUT /traits/{name} requests.
//
-// Creates a new custom trait in the dynamic ConfigMap. Only traits prefixed
-// with CUSTOM_ may be created. Returns 201 Created if the trait is newly
-// inserted, or 204 No Content if it already exists (in either ConfigMap).
-// Returns 400 Bad Request if the name does not carry the CUSTOM_ prefix.
+// Feature modes:
+// - passthrough: forwards to upstream placement.
+// - hybrid: forwards to upstream; on success, adds the trait to the local ConfigMap.
+// - crd: writes the trait to the local ConfigMap (CUSTOM_ prefix required).
//
// See: https://docs.openstack.org/api-ref/placement/#update-trait
func (s *Shim) HandleUpdateTrait(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := logf.FromContext(ctx)
- switch s.featureModeFromConfOrHeader(r, s.config.Features.Traits) {
- case FeatureModePassthrough, FeatureModeHybrid:
+ mode := s.featureModeFromConfOrHeader(r, s.config.Features.Traits)
+ switch mode {
+ case FeatureModePassthrough:
s.forward(w, r)
return
+ case FeatureModeHybrid:
+ s.handleUpdateTraitHybrid(w, r)
+ return
case FeatureModeCRD:
- // Serve from local ConfigMaps.
+ // Handle locally.
default:
http.Error(w, "unknown feature mode", http.StatusInternalServerError)
return
@@ -195,115 +178,69 @@ func (s *Shim) HandleUpdateTrait(w http.ResponseWriter, r *http.Request) {
return
}
- // Fast path: trait already exists in either ConfigMap (no lock needed).
- allTraits, err := s.getAllTraits(ctx)
+ created, err := s.addTraitToConfigMap(ctx, name)
if err != nil {
- log.Error(err, "failed to read traits for existence check", "trait", name)
+ log.Error(err, "failed to create trait", "trait", name)
http.Error(w, "failed to create trait", http.StatusInternalServerError)
return
}
- if _, exists := allTraits[name]; exists {
- log.Info("trait already exists, nothing to do", "trait", name)
+ if created {
+ w.WriteHeader(http.StatusCreated)
+ } else {
w.WriteHeader(http.StatusNoContent)
- return
}
+}
- // Slow path: acquire lock, read/create dynamic ConfigMap, add trait.
- host, err := os.Hostname()
- if err != nil {
- host = "unknown"
- }
- lockerID := fmt.Sprintf("shim-%s-%d", host, time.Now().UnixNano())
- if err := s.resourceLocker.AcquireLock(ctx, s.traitsLockName(), lockerID); err != nil {
- log.Error(err, "failed to acquire traits lock", "trait", name)
- http.Error(w, "failed to create trait", http.StatusInternalServerError)
+// handleUpdateTraitHybrid forwards PUT /traits/{name} to upstream, then
+// updates the local ConfigMap on success.
+func (s *Shim) handleUpdateTraitHybrid(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ name, ok := requiredPathParam(w, r, "name")
+ if !ok {
return
}
- defer func() {
- releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if err := s.resourceLocker.ReleaseLock(releaseCtx, s.traitsLockName(), lockerID); err != nil {
- log.Error(err, "failed to release traits lock")
- }
- }()
- cm := &corev1.ConfigMap{}
- err = s.Get(ctx, s.customTraitsConfigMapKey(), cm)
- if apierrors.IsNotFound(err) {
- // Dynamic ConfigMap does not exist yet — create it with the new trait.
- cm = &corev1.ConfigMap{
- ObjectMeta: metav1.ObjectMeta{
- Name: s.customTraitsConfigMapKey().Name,
- Namespace: s.customTraitsConfigMapKey().Namespace,
- },
- Data: map[string]string{configMapKeyTraits: "[]"},
- }
- current := map[string]struct{}{name: {}}
- if err := s.writeTraits(cm, current); err != nil {
- log.Error(err, "failed to serialize traits", "trait", name)
- http.Error(w, "failed to create trait", http.StatusInternalServerError)
- return
- }
- if err := s.Create(ctx, cm); err != nil {
- log.Error(err, "failed to create custom traits configmap", "trait", name)
- http.Error(w, "failed to create trait", http.StatusInternalServerError)
- return
+ s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) {
+ // Copy the upstream response to the caller.
+ for k, vs := range resp.Header {
+ for _, v := range vs {
+ w.Header().Add(k, v)
+ }
}
- log.Info("created custom traits configmap with new trait", "trait", name)
- s.syncTraitToUpstream(ctx, name, r.Header)
- w.WriteHeader(http.StatusCreated)
- return
- }
- if err != nil {
- log.Error(err, "failed to get custom traits configmap", "trait", name)
- http.Error(w, "failed to create trait", http.StatusInternalServerError)
- return
- }
+ w.WriteHeader(resp.StatusCode)
- current, err := parseTraits(cm)
- if err != nil {
- log.Error(err, "failed to parse custom traits configmap", "trait", name)
- http.Error(w, "failed to create trait", http.StatusInternalServerError)
- return
- }
- if _, exists := current[name]; exists {
- log.Info("trait already exists in custom configmap after lock acquisition", "trait", name)
- w.WriteHeader(http.StatusNoContent)
- return
- }
- current[name] = struct{}{}
- if err := s.writeTraits(cm, current); err != nil {
- log.Error(err, "failed to serialize traits", "trait", name)
- http.Error(w, "failed to create trait", http.StatusInternalServerError)
- return
- }
- if err := s.Update(ctx, cm); err != nil {
- log.Error(err, "failed to update custom traits configmap", "trait", name)
- http.Error(w, "failed to create trait", http.StatusInternalServerError)
- return
- }
- log.Info("added custom trait to configmap", "trait", name)
- s.syncTraitToUpstream(ctx, name, r.Header)
- w.WriteHeader(http.StatusCreated)
+ if resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusNoContent {
+ if _, err := s.addTraitToConfigMap(ctx, name); err != nil {
+ log.Error(err, "hybrid: failed to add trait to local configmap", "trait", name)
+ }
+ }
+ })
}
// HandleDeleteTrait handles DELETE /traits/{name} requests.
//
-// Deletes a custom trait from the dynamic ConfigMap. Standard traits (those
-// without the CUSTOM_ prefix) cannot be deleted and return 400 Bad Request.
-// Returns 404 if the trait does not exist. Returns 204 No Content on success.
+// Feature modes:
+// - passthrough: forwards to upstream placement.
+// - hybrid: forwards to upstream; on success, removes the trait from the local ConfigMap.
+// - crd: removes the trait from the local ConfigMap (CUSTOM_ prefix required).
//
// See: https://docs.openstack.org/api-ref/placement/#delete-traits
func (s *Shim) HandleDeleteTrait(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := logf.FromContext(ctx)
- switch s.featureModeFromConfOrHeader(r, s.config.Features.Traits) {
- case FeatureModePassthrough, FeatureModeHybrid:
+ mode := s.featureModeFromConfOrHeader(r, s.config.Features.Traits)
+ switch mode {
+ case FeatureModePassthrough:
s.forward(w, r)
return
+ case FeatureModeHybrid:
+ s.handleDeleteTraitHybrid(w, r)
+ return
case FeatureModeCRD:
- // Serve from local ConfigMaps.
+ // Handle locally.
default:
http.Error(w, "unknown feature mode", http.StatusInternalServerError)
return
@@ -319,101 +256,57 @@ func (s *Shim) HandleDeleteTrait(w http.ResponseWriter, r *http.Request) {
return
}
- host, err := os.Hostname()
+ removed, err := s.removeTraitFromConfigMap(ctx, name)
if err != nil {
- host = "unknown"
- }
- lockerID := fmt.Sprintf("shim-%s-%d", host, time.Now().UnixNano())
- if err := s.resourceLocker.AcquireLock(ctx, s.traitsLockName(), lockerID); err != nil {
- log.Error(err, "failed to acquire traits lock", "trait", name)
+ log.Error(err, "failed to delete trait", "trait", name)
http.Error(w, "failed to delete trait", http.StatusInternalServerError)
return
}
- defer func() {
- releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if err := s.resourceLocker.ReleaseLock(releaseCtx, s.traitsLockName(), lockerID); err != nil {
- log.Error(err, "failed to release traits lock")
- }
- }()
-
- cm := &corev1.ConfigMap{}
- err = s.Get(ctx, s.customTraitsConfigMapKey(), cm)
- if apierrors.IsNotFound(err) {
- log.Info("custom traits configmap not found, trait does not exist", "trait", name)
- http.Error(w, "trait not found", http.StatusNotFound)
- return
- }
- if err != nil {
- log.Error(err, "failed to get custom traits configmap", "trait", name)
- http.Error(w, "failed to delete trait", http.StatusInternalServerError)
- return
- }
- current, err := parseTraits(cm)
- if err != nil {
- log.Error(err, "failed to parse custom traits configmap", "trait", name)
- http.Error(w, "failed to delete trait", http.StatusInternalServerError)
- return
- }
- if _, exists := current[name]; !exists {
- log.Info("trait not found in custom configmap", "trait", name)
+ if !removed {
+ log.Info("trait not found in configmap", "trait", name)
http.Error(w, "trait not found", http.StatusNotFound)
return
}
- delete(current, name)
- if err := s.writeTraits(cm, current); err != nil {
- log.Error(err, "failed to serialize traits", "trait", name)
- http.Error(w, "failed to delete trait", http.StatusInternalServerError)
- return
- }
- if err := s.Update(ctx, cm); err != nil {
- log.Error(err, "failed to update custom traits configmap", "trait", name)
- http.Error(w, "failed to delete trait", http.StatusInternalServerError)
- return
- }
- log.Info("deleted custom trait from configmap", "trait", name)
+ log.Info("deleted trait from configmap", "trait", name)
w.WriteHeader(http.StatusNoContent)
}
-// getStaticTraits reads traits from the Helm-managed static ConfigMap.
-func (s *Shim) getStaticTraits(ctx context.Context) (map[string]struct{}, error) {
- cm := &corev1.ConfigMap{}
- if err := s.Get(ctx, s.staticTraitsConfigMapKey(), cm); err != nil {
- return nil, fmt.Errorf("get static configmap %s: %w", s.config.Traits.ConfigMapName, err)
+// handleDeleteTraitHybrid forwards DELETE /traits/{name} to upstream, then
+// updates the local ConfigMap on success.
+func (s *Shim) handleDeleteTraitHybrid(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ name, ok := requiredPathParam(w, r, "name")
+ if !ok {
+ return
}
- return parseTraits(cm)
+
+ s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) {
+ for k, vs := range resp.Header {
+ for _, v := range vs {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(resp.StatusCode)
+
+ if resp.StatusCode == http.StatusNoContent {
+ if _, err := s.removeTraitFromConfigMap(ctx, name); err != nil {
+ log.Error(err, "hybrid: failed to remove trait from local configmap", "trait", name)
+ }
+ }
+ })
}
-// getCustomTraits reads traits from the dynamic ConfigMap created by the shim.
-// Returns an empty set if the ConfigMap does not exist yet.
-func (s *Shim) getCustomTraits(ctx context.Context) (map[string]struct{}, error) {
+// getTraits reads traits from the single ConfigMap.
+func (s *Shim) getTraits(ctx context.Context) (map[string]struct{}, error) {
cm := &corev1.ConfigMap{}
- err := s.Get(ctx, s.customTraitsConfigMapKey(), cm)
- if apierrors.IsNotFound(err) {
- return make(map[string]struct{}), nil
- }
- if err != nil {
- return nil, fmt.Errorf("get custom configmap %s-custom: %w", s.config.Traits.ConfigMapName, err)
+ if err := s.Get(ctx, client.ObjectKey{Namespace: os.Getenv("POD_NAMESPACE"), Name: s.config.Traits.ConfigMapName}, cm); err != nil {
+ return nil, fmt.Errorf("get traits configmap %s: %w", s.config.Traits.ConfigMapName, err)
}
return parseTraits(cm)
}
-// getAllTraits merges static and custom traits into a single set.
-func (s *Shim) getAllTraits(ctx context.Context) (map[string]struct{}, error) {
- static, err := s.getStaticTraits(ctx)
- if err != nil {
- return nil, err
- }
- custom, err := s.getCustomTraits(ctx)
- if err != nil {
- return nil, err
- }
- for t := range custom {
- static[t] = struct{}{}
- }
- return static, nil
-}
-
// parseTraits extracts the trait set from a ConfigMap.
func parseTraits(cm *corev1.ConfigMap) (map[string]struct{}, error) {
raw, ok := cm.Data[configMapKeyTraits]
@@ -432,7 +325,7 @@ func parseTraits(cm *corev1.ConfigMap) (map[string]struct{}, error) {
}
func (s *Shim) hasTrait(ctx context.Context, name string) (bool, error) {
- traits, err := s.getAllTraits(ctx)
+ traits, err := s.getTraits(ctx)
if err != nil {
return false, err
}
@@ -440,8 +333,8 @@ func (s *Shim) hasTrait(ctx context.Context, name string) (bool, error) {
return ok, nil
}
-// writeTraits serializes the trait set into the ConfigMap's data field.
-func (s *Shim) writeTraits(cm *corev1.ConfigMap, traitSet map[string]struct{}) error {
+// writeTraitsToConfigMap serializes the trait set into the ConfigMap's data field.
+func writeTraitsToConfigMap(cm *corev1.ConfigMap, traitSet map[string]struct{}) error {
traits := make([]string, 0, len(traitSet))
for t := range traitSet {
traits = append(traits, t)
@@ -459,121 +352,110 @@ func (s *Shim) writeTraits(cm *corev1.ConfigMap, traitSet map[string]struct{}) e
return nil
}
-// syncTraitToUpstream best-effort creates the trait in upstream placement so
-// that endpoints forwarded to upstream (e.g. PUT /resource_providers/{uuid}/traits)
-// can reference locally-created custom traits. Errors are logged but never
-// propagated — upstream may be unreachable and that is acceptable.
-func (s *Shim) syncTraitToUpstream(ctx context.Context, name string, incomingHeader http.Header) {
- log := logf.FromContext(ctx)
- if s.httpClient == nil {
- log.V(1).Info("skipping upstream trait sync, no http client configured", "trait", name)
- return
- }
- u, err := url.Parse(s.config.PlacementURL)
+// addTraitToConfigMap adds a trait to the ConfigMap under the resource lock.
+// Returns true if the trait was newly created, false if it already existed.
+func (s *Shim) addTraitToConfigMap(ctx context.Context, name string) (bool, error) {
+ // Fast path: trait already exists (no lock needed).
+ traits, err := s.getTraits(ctx)
if err != nil {
- log.Error(err, "failed to parse placement URL for trait sync", "trait", name)
- return
+ return false, err
}
- u.Path, err = url.JoinPath(u.Path, "/traits/"+name)
- if err != nil {
- log.Error(err, "failed to build upstream trait URL", "trait", name)
- return
+ if _, exists := traits[name]; exists {
+ return false, nil
}
- req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), http.NoBody)
+
+ // Slow path: acquire lock, re-read, add trait.
+ host, err := os.Hostname()
if err != nil {
- log.Error(err, "failed to create upstream trait request", "trait", name)
- return
+ return false, fmt.Errorf("get hostname: %w", err)
}
- // Forward authentication headers so upstream placement accepts the request.
- req.Header = incomingHeader.Clone()
- resp, err := s.httpClient.Do(req)
- if err != nil {
- log.Info("best-effort upstream trait sync failed, upstream may be down", "trait", name, "error", err.Error())
- return
+ lockerID := fmt.Sprintf("shim-%s-%d", host, time.Now().UnixNano())
+ if err := s.resourceLocker.AcquireLock(ctx, s.config.Traits.ConfigMapName+"-lock", lockerID); err != nil {
+ return false, fmt.Errorf("acquire traits lock: %w", err)
}
- defer resp.Body.Close()
- log.Info("synced custom trait to upstream placement", "trait", name, "status", resp.StatusCode)
-}
+ defer func() {
+ releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ _ = s.resourceLocker.ReleaseLock(releaseCtx, s.config.Traits.ConfigMapName+"-lock", lockerID) //nolint:errcheck
+ }()
-// startTraitSyncLoop runs a periodic goroutine that fetches traits from
-// upstream placement and writes them into the static ConfigMap. Only active
-// when features.traits is hybrid. The loop exits when ctx is cancelled.
-func (s *Shim) startTraitSyncLoop(ctx context.Context) {
- if s.config.Features.Traits.orDefault() != FeatureModeHybrid {
- return
+ cm := &corev1.ConfigMap{}
+ key := client.ObjectKey{Namespace: os.Getenv("POD_NAMESPACE"), Name: s.config.Traits.ConfigMapName}
+ if err := s.Get(ctx, key, cm); err != nil {
+ if apierrors.IsNotFound(err) {
+ cm = &corev1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: key.Name,
+ Namespace: key.Namespace,
+ },
+ Data: map[string]string{configMapKeyTraits: "[]"},
+ }
+ current := map[string]struct{}{name: {}}
+ if err := writeTraitsToConfigMap(cm, current); err != nil {
+ return false, err
+ }
+ if err := s.Create(ctx, cm); err != nil {
+ return false, fmt.Errorf("create traits configmap: %w", err)
+ }
+ return true, nil
+ }
+ return false, fmt.Errorf("get traits configmap: %w", err)
}
- log := ctrl.Log.WithName("placement-shim").WithName("trait-sync")
- jitter := time.Duration(rand.Int63n(int64(30 * time.Second))) //nolint:gosec
- log.Info("starting trait sync loop", "jitter", jitter)
- select {
- case <-ctx.Done():
- return
- case <-time.After(jitter):
+ current, err := parseTraits(cm)
+ if err != nil {
+ return false, err
}
-
- s.syncTraitsFromUpstream(ctx, log)
-
- ticker := time.NewTicker(60 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-ticker.C:
- s.syncTraitsFromUpstream(ctx, log)
- }
+ if _, exists := current[name]; exists {
+ return false, nil
}
-}
-
-// syncTraitsFromUpstream fetches GET /traits from upstream placement and
-// writes the result into the static ConfigMap so that the shim's local
-// view stays in sync with upstream. Uses the gophercloud ServiceClient
-// for automatic token management (including reauth on 401).
-func (s *Shim) syncTraitsFromUpstream(ctx context.Context, log logr.Logger) {
- if s.placementServiceClient == nil {
- log.V(1).Info("skipping upstream trait sync, no placement service client configured")
- return
+ current[name] = struct{}{}
+ if err := writeTraitsToConfigMap(cm, current); err != nil {
+ return false, err
}
- u, err := url.JoinPath(s.placementServiceClient.Endpoint, "/traits")
- if err != nil {
- log.Error(err, "failed to build upstream traits URL")
- return
+ if err := s.Update(ctx, cm); err != nil {
+ return false, fmt.Errorf("update traits configmap: %w", err)
}
- resp, err := s.placementServiceClient.Request(ctx, http.MethodGet, u, &gophercloud.RequestOpts{
- OkCodes: []int{http.StatusOK},
- MoreHeaders: map[string]string{
- "OpenStack-API-Version": "placement 1.6",
- },
- KeepResponseBody: true,
- })
+ return true, nil
+}
+
+// removeTraitFromConfigMap removes a trait from the ConfigMap under the
+// resource lock. Returns true if the trait was found and removed.
+func (s *Shim) removeTraitFromConfigMap(ctx context.Context, name string) (bool, error) {
+ host, err := os.Hostname()
if err != nil {
- log.Info("upstream trait sync failed", "error", err.Error())
- return
+ return false, fmt.Errorf("get hostname: %w", err)
}
- defer resp.Body.Close()
- var body traitsListResponse
- if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
- log.Error(err, "failed to decode upstream trait list")
- return
+ lockerID := fmt.Sprintf("shim-%s-%d", host, time.Now().UnixNano())
+ if err := s.resourceLocker.AcquireLock(ctx, s.config.Traits.ConfigMapName+"-lock", lockerID); err != nil {
+ return false, fmt.Errorf("acquire traits lock: %w", err)
}
+ defer func() {
+ releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ _ = s.resourceLocker.ReleaseLock(releaseCtx, s.config.Traits.ConfigMapName+"-lock", lockerID) //nolint:errcheck
+ }()
cm := &corev1.ConfigMap{}
- if err := s.Get(ctx, s.staticTraitsConfigMapKey(), cm); err != nil {
- log.Error(err, "failed to get static traits configmap for sync")
- return
+ if err := s.Get(ctx, client.ObjectKey{Namespace: os.Getenv("POD_NAMESPACE"), Name: s.config.Traits.ConfigMapName}, cm); err != nil {
+ if apierrors.IsNotFound(err) {
+ return false, nil
+ }
+ return false, fmt.Errorf("get traits configmap: %w", err)
}
- traitSet := make(map[string]struct{}, len(body.Traits))
- for _, t := range body.Traits {
- traitSet[t] = struct{}{}
+ current, err := parseTraits(cm)
+ if err != nil {
+ return false, err
}
- if err := s.writeTraits(cm, traitSet); err != nil {
- log.Error(err, "failed to serialize synced traits")
- return
+ if _, exists := current[name]; !exists {
+ return false, nil
+ }
+ delete(current, name)
+ if err := writeTraitsToConfigMap(cm, current); err != nil {
+ return false, err
}
if err := s.Update(ctx, cm); err != nil {
- log.Error(err, "failed to update static traits configmap with upstream data")
- return
+ return false, fmt.Errorf("update traits configmap: %w", err)
}
- log.Info("synced traits from upstream placement", "count", len(body.Traits))
+ return true, nil
}
diff --git a/internal/shim/placement/handle_traits_e2e.go b/internal/shim/placement/handle_traits_e2e.go
index 4a5831f72..9c204e0f8 100644
--- a/internal/shim/placement/handle_traits_e2e.go
+++ b/internal/shim/placement/handle_traits_e2e.go
@@ -80,13 +80,10 @@ func e2eTestTraits(ctx context.Context, _ client.Client) error {
if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil {
return fmt.Errorf("failed to decode GET /traits response: %w", err)
}
- // When traits are served locally (hybrid or crd mode) the static list may
- // be empty. Only require at least one trait when forwarding to upstream
+ // When traits are served locally (hybrid or crd mode) the list may be
+ // empty. Only require at least one trait when forwarding to upstream
// placement, which always has standard traits.
traitsMode := e2eCurrentMode(ctx)
- if traitsMode == "" {
- traitsMode = config.Features.Traits.orDefault()
- }
if traitsMode == FeatureModePassthrough && len(listResp.Traits) == 0 {
return errors.New("GET /traits: expected at least one trait, got 0")
}
@@ -135,19 +132,9 @@ func e2eTestTraits(ctx context.Context, _ client.Client) error {
}
log.Info("Correctly received 404 for nonexistent trait")
- // ==================== Phase 2: CRUD tests (feature-gated) ====================
-
- // CRUD tests require traits ConfigMaps which are only created when the
- // configured traits mode is hybrid or crd. The override header changes
- // handler routing but cannot create ConfigMaps that don't exist.
- configuredTraitsMode := config.Features.Traits.orDefault()
- if traitsMode == FeatureModePassthrough || configuredTraitsMode == FeatureModePassthrough {
- log.Info("Skipping trait CRUD e2e tests",
- "overrideMode", traitsMode, "configuredMode", configuredTraitsMode)
- return nil
- }
+ // ==================== Phase 2: CRUD tests ====================
- log.Info("=== Phase 2: CRUD trait tests (traits mode non-passthrough) ===")
+ log.Info("=== Phase 2: CRUD trait tests ===")
const testTrait = "CUSTOM_CORTEX_E2E_TRAIT"
diff --git a/internal/shim/placement/handle_traits_test.go b/internal/shim/placement/handle_traits_test.go
index bf692fd41..c7531796a 100644
--- a/internal/shim/placement/handle_traits_test.go
+++ b/internal/shim/placement/handle_traits_test.go
@@ -7,7 +7,6 @@ import (
"context"
"encoding/json"
"net/http"
- "net/http/httptest"
"testing"
"github.com/cobaltcore-dev/cortex/pkg/resourcelock"
@@ -42,13 +41,12 @@ func newTestConfigMap(namespace, name string, traits []string) *corev1.ConfigMap
}
}
-func newTraitShim(t *testing.T, staticTraits []string, customTraits ...string) *Shim {
+func newTraitShim(t *testing.T, traits []string, extraTraits ...string) *Shim {
t.Helper()
t.Setenv("POD_NAMESPACE", "default")
- objs := []client.Object{newTestConfigMap("default", "test-cm", staticTraits)}
- if len(customTraits) > 0 {
- objs = append(objs, newTestConfigMap("default", "test-cm-custom", customTraits))
- }
+ all := append([]string{}, traits...)
+ all = append(all, extraTraits...)
+ objs := []client.Object{newTestConfigMap("default", "test-cm", all)}
cl := newFakeClientWithScheme(t, objs...)
down, up := newTestTimers()
return &Shim{
@@ -264,39 +262,11 @@ func TestHandleUpdateTraitLocalBadPrefix(t *testing.T) {
}
}
-func TestHandleUpdateTraitLocalSyncsToUpstream(t *testing.T) {
- var gotMethod, gotPath string
- upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- gotMethod = r.Method
- gotPath = r.URL.Path
- w.WriteHeader(http.StatusNoContent)
- }))
- t.Cleanup(upstream.Close)
+func TestHandleUpdateTraitLocalNoUpstreamContact(t *testing.T) {
s := newTraitShim(t, nil)
- s.config.PlacementURL = upstream.URL
- s.httpClient = upstream.Client()
-
- w := serveHandler(t, "PUT", "/traits/{name}", s.HandleUpdateTrait, "/traits/CUSTOM_NEW")
- if w.Code != http.StatusCreated {
- t.Fatalf("status = %d, want %d", w.Code, http.StatusCreated)
- }
- if gotMethod != "PUT" || gotPath != "/traits/CUSTOM_NEW" {
- t.Fatalf("upstream got %s %s, want PUT /traits/CUSTOM_NEW", gotMethod, gotPath)
- }
-}
-
-func TestHandleUpdateTraitLocalUpstreamDown(t *testing.T) {
- upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
- w.WriteHeader(http.StatusInternalServerError)
- }))
- t.Cleanup(upstream.Close)
- s := newTraitShim(t, nil)
- s.config.PlacementURL = upstream.URL
- s.httpClient = upstream.Client()
-
w := serveHandler(t, "PUT", "/traits/{name}", s.HandleUpdateTrait, "/traits/CUSTOM_NEW")
if w.Code != http.StatusCreated {
- t.Fatalf("status = %d, want %d; upstream failure should not block local creation", w.Code, http.StatusCreated)
+ t.Fatalf("status = %d, want %d; CRD mode should not contact upstream", w.Code, http.StatusCreated)
}
}
diff --git a/internal/shim/placement/shim.go b/internal/shim/placement/shim.go
index b46546b63..35cbd2303 100644
--- a/internal/shim/placement/shim.go
+++ b/internal/shim/placement/shim.go
@@ -247,15 +247,15 @@ func (c *config) validate() error {
}
}
traitsMode := c.Features.Traits.orDefault()
- if traitsMode == FeatureModeHybrid || traitsMode == FeatureModeCRD {
- if c.Traits == nil {
- return fmt.Errorf("traits config is required when features.traits is %s", traitsMode)
- }
+ if traitsMode != FeatureModePassthrough && c.Traits == nil {
+ return fmt.Errorf("traits config is required when features.traits is %s", traitsMode)
+ }
+ if c.Traits != nil {
if c.Traits.ConfigMapName == "" {
- return fmt.Errorf("traits.configMapName is required when features.traits is %s", traitsMode)
+ return errors.New("traits.configMapName is required when traits config is present")
}
- if traitsMode == FeatureModeCRD && os.Getenv("POD_NAMESPACE") == "" {
- return errors.New("pod namespace (POD_NAMESPACE) is required when features.traits is crd")
+ if os.Getenv("POD_NAMESPACE") == "" {
+ return errors.New("pod namespace (POD_NAMESPACE) is required when traits config is present")
}
}
if c.Auth != nil && c.KeystoneURL == "" {
@@ -303,14 +303,17 @@ type Shim struct {
tokenCache *tokenCache
// tokenIntrospector validates tokens against Keystone.
tokenIntrospector tokenIntrospector
- // resourceLocker serializes writes to the custom traits ConfigMap
- // across replicas using a Kubernetes Lease.
+ // resourceLocker serializes writes to ConfigMaps across replicas
+ // using a Kubernetes Lease.
resourceLocker *resourcelock.ResourceLocker
// placementServiceClient is an authenticated gophercloud service client
// used by background tasks (trait sync) to make requests to upstream
// placement with automatic token management (including reauth on 401).
// Nil when Keystone credentials are not configured.
placementServiceClient *gophercloud.ServiceClient
+ // syncers are background workers that manage ConfigMap-backed local
+ // stores (e.g. traits, resource classes). Started uniformly in Start.
+ syncers []Syncer
}
// Describe implements prometheus.Collector.
@@ -433,7 +436,22 @@ func (s *Shim) Start(ctx context.Context) error {
if err := s.initPlacementServiceClient(ctx); err != nil {
return err
}
- go s.startTraitSyncLoop(ctx)
+ if s.config.Traits != nil {
+ s.syncers = append(s.syncers, NewTraitSyncer(
+ s.Client,
+ s.config.Traits.ConfigMapName,
+ os.Getenv("POD_NAMESPACE"),
+ s.placementServiceClient,
+ ))
+ }
+ for _, syncer := range s.syncers {
+ if err := syncer.Init(ctx); err != nil {
+ return err
+ }
+ }
+ for _, syncer := range s.syncers {
+ go syncer.Run(ctx)
+ }
return nil
}
diff --git a/internal/shim/placement/syncer.go b/internal/shim/placement/syncer.go
new file mode 100644
index 000000000..f00487fe9
--- /dev/null
+++ b/internal/shim/placement/syncer.go
@@ -0,0 +1,19 @@
+// Copyright SAP SE
+// SPDX-License-Identifier: Apache-2.0
+
+package placement
+
+import "context"
+
+// Syncer manages the lifecycle of a ConfigMap-backed local store:
+// creating the ConfigMap on startup, and running a periodic background
+// sync from upstream placement.
+type Syncer interface {
+ // Init creates the ConfigMap if it does not exist. Called once during
+ // Shim.Start before any requests are served.
+ Init(ctx context.Context) error
+
+ // Run starts the periodic background sync from upstream. Blocks until
+ // ctx is cancelled. Called as a goroutine from Shim.Start.
+ Run(ctx context.Context)
+}
diff --git a/internal/shim/placement/syncer_traits.go b/internal/shim/placement/syncer_traits.go
new file mode 100644
index 000000000..735bcb825
--- /dev/null
+++ b/internal/shim/placement/syncer_traits.go
@@ -0,0 +1,155 @@
+// Copyright SAP SE
+// SPDX-License-Identifier: Apache-2.0
+
+package placement
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "math/rand"
+ "net/http"
+ "net/url"
+ "time"
+
+ "github.com/gophercloud/gophercloud/v2"
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+// TraitSyncer manages the lifecycle of the single traits ConfigMap.
+// It creates the ConfigMap on startup and periodically syncs from upstream.
+type TraitSyncer struct {
+ client client.Client
+ configMapName string
+ namespace string
+ placementClient *gophercloud.ServiceClient
+}
+
+func NewTraitSyncer(
+ cl client.Client,
+ configMapName string,
+ namespace string,
+ placementClient *gophercloud.ServiceClient,
+) *TraitSyncer {
+
+ return &TraitSyncer{
+ client: cl,
+ configMapName: configMapName,
+ namespace: namespace,
+ placementClient: placementClient,
+ }
+}
+
+// Init creates the traits ConfigMap if it does not already exist.
+func (ts *TraitSyncer) Init(ctx context.Context) error {
+ log := ctrl.Log.WithName("placement-shim").WithName("trait-syncer")
+ cm := &corev1.ConfigMap{}
+ key := client.ObjectKey{Namespace: ts.namespace, Name: ts.configMapName}
+ err := ts.client.Get(ctx, key, cm)
+ if err == nil {
+ log.Info("Traits ConfigMap already exists", "name", ts.configMapName)
+ return nil
+ }
+ if !apierrors.IsNotFound(err) {
+ return fmt.Errorf("checking traits configmap: %w", err)
+ }
+ cm = &corev1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: ts.configMapName,
+ Namespace: ts.namespace,
+ },
+ Data: map[string]string{configMapKeyTraits: "[]"},
+ }
+ if err := ts.client.Create(ctx, cm); err != nil {
+ if apierrors.IsAlreadyExists(err) {
+ log.Info("Traits ConfigMap was created concurrently", "name", ts.configMapName)
+ return nil
+ }
+ return fmt.Errorf("creating traits configmap: %w", err)
+ }
+ log.Info("Created traits ConfigMap", "name", ts.configMapName)
+ return nil
+}
+
+// Run starts the periodic background sync from upstream placement.
+// Blocks until ctx is cancelled.
+func (ts *TraitSyncer) Run(ctx context.Context) {
+ log := ctrl.Log.WithName("placement-shim").WithName("trait-syncer")
+ if ts.placementClient == nil {
+ log.Info("No placement service client configured, trait sync loop will not run")
+ return
+ }
+
+ jitter := time.Duration(rand.Int63n(int64(30 * time.Second))) //nolint:gosec
+ log.Info("Starting trait sync loop", "jitter", jitter)
+
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(jitter):
+ }
+
+ ts.sync(ctx)
+
+ ticker := time.NewTicker(60 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ ts.sync(ctx)
+ }
+ }
+}
+
+// sync fetches GET /traits from upstream placement and writes the result
+// into the ConfigMap.
+func (ts *TraitSyncer) sync(ctx context.Context) {
+ log := ctrl.Log.WithName("placement-shim").WithName("trait-syncer")
+ u, err := url.JoinPath(ts.placementClient.Endpoint, "/traits")
+ if err != nil {
+ log.Error(err, "Failed to build upstream traits URL")
+ return
+ }
+ resp, err := ts.placementClient.Request(ctx, http.MethodGet, u, &gophercloud.RequestOpts{
+ OkCodes: []int{http.StatusOK},
+ MoreHeaders: map[string]string{
+ "OpenStack-API-Version": "placement 1.6",
+ },
+ KeepResponseBody: true,
+ })
+ if err != nil {
+ log.Info("Upstream trait sync failed", "error", err.Error())
+ return
+ }
+ defer resp.Body.Close()
+ var body traitsListResponse
+ if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
+ log.Error(err, "Failed to decode upstream trait list")
+ return
+ }
+
+ cm := &corev1.ConfigMap{}
+ if err := ts.client.Get(ctx, client.ObjectKey{Namespace: ts.namespace, Name: ts.configMapName}, cm); err != nil {
+ log.Error(err, "Failed to get traits ConfigMap for sync")
+ return
+ }
+ traitSet := make(map[string]struct{}, len(body.Traits))
+ for _, t := range body.Traits {
+ traitSet[t] = struct{}{}
+ }
+ if err := writeTraitsToConfigMap(cm, traitSet); err != nil {
+ log.Error(err, "Failed to serialize synced traits")
+ return
+ }
+ if err := ts.client.Update(ctx, cm); err != nil {
+ log.Error(err, "Failed to update traits ConfigMap with upstream data")
+ return
+ }
+ log.Info("Synced traits from upstream placement", "count", len(body.Traits))
+}
diff --git a/internal/shim/placement/syncer_traits_test.go b/internal/shim/placement/syncer_traits_test.go
new file mode 100644
index 000000000..c3038c668
--- /dev/null
+++ b/internal/shim/placement/syncer_traits_test.go
@@ -0,0 +1,144 @@
+// Copyright SAP SE
+// SPDX-License-Identifier: Apache-2.0
+
+package placement
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ "github.com/gophercloud/gophercloud/v2"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+func TestTraitSyncerInitCreatesConfigMap(t *testing.T) {
+ cl := newFakeClientWithScheme(t)
+ ts := NewTraitSyncer(cl, "test-traits", "default", nil)
+
+ if err := ts.Init(context.Background()); err != nil {
+ t.Fatalf("Init: %v", err)
+ }
+
+ cm := &corev1.ConfigMap{}
+ if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-traits"}, cm); err != nil {
+ t.Fatalf("get ConfigMap: %v", err)
+ }
+ if cm.Data[configMapKeyTraits] != "[]" {
+ t.Fatalf("expected empty traits array, got %q", cm.Data[configMapKeyTraits])
+ }
+}
+
+func TestTraitSyncerInitIdempotent(t *testing.T) {
+ existing := &corev1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{Name: "test-traits", Namespace: "default"},
+ Data: map[string]string{configMapKeyTraits: `["CUSTOM_EXISTING"]`},
+ }
+ cl := newFakeClientWithScheme(t, existing)
+ ts := NewTraitSyncer(cl, "test-traits", "default", nil)
+
+ if err := ts.Init(context.Background()); err != nil {
+ t.Fatalf("Init: %v", err)
+ }
+
+ cm := &corev1.ConfigMap{}
+ if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-traits"}, cm); err != nil {
+ t.Fatalf("get ConfigMap: %v", err)
+ }
+ if cm.Data[configMapKeyTraits] != `["CUSTOM_EXISTING"]` {
+ t.Fatalf("Init overwrote existing data: got %q", cm.Data[configMapKeyTraits])
+ }
+}
+
+func TestTraitSyncerRunNoClient(t *testing.T) {
+ cl := newFakeClientWithScheme(t)
+ ts := NewTraitSyncer(cl, "test-traits", "default", nil)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+ defer cancel()
+ ts.Run(ctx)
+}
+
+func TestTraitSyncerSyncWritesUpstreamTraits(t *testing.T) {
+ upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/traits" {
+ t.Errorf("unexpected path: %s", r.URL.Path)
+ }
+ w.Header().Set("Content-Type", "application/json")
+ if err := json.NewEncoder(w).Encode(traitsListResponse{
+ Traits: []string{"HW_CPU_X86_AVX2", "CUSTOM_SYNCED"},
+ }); err != nil {
+ t.Errorf("encode response: %v", err)
+ }
+ }))
+ t.Cleanup(upstream.Close)
+
+ existing := &corev1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{Name: "test-traits", Namespace: "default"},
+ Data: map[string]string{configMapKeyTraits: "[]"},
+ }
+ cl := newFakeClientWithScheme(t, existing)
+
+ sc := &gophercloud.ServiceClient{
+ ProviderClient: &gophercloud.ProviderClient{},
+ Endpoint: upstream.URL,
+ }
+ sc.HTTPClient = *upstream.Client()
+
+ ts := NewTraitSyncer(cl, "test-traits", "default", sc)
+ ts.sync(context.Background())
+
+ cm := &corev1.ConfigMap{}
+ if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-traits"}, cm); err != nil {
+ t.Fatalf("get ConfigMap: %v", err)
+ }
+
+ var traits []string
+ if err := json.Unmarshal([]byte(cm.Data[configMapKeyTraits]), &traits); err != nil {
+ t.Fatalf("unmarshal: %v", err)
+ }
+ if len(traits) != 2 {
+ t.Fatalf("expected 2 traits, got %d: %v", len(traits), traits)
+ }
+ want := map[string]bool{"CUSTOM_SYNCED": true, "HW_CPU_X86_AVX2": true}
+ for _, tr := range traits {
+ if !want[tr] {
+ t.Errorf("unexpected trait: %s", tr)
+ }
+ }
+}
+
+func TestTraitSyncerSyncUpstreamError(t *testing.T) {
+ upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ }))
+ t.Cleanup(upstream.Close)
+
+ existing := &corev1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{Name: "test-traits", Namespace: "default"},
+ Data: map[string]string{configMapKeyTraits: `["CUSTOM_ORIGINAL"]`},
+ }
+ cl := newFakeClientWithScheme(t, existing)
+
+ sc := &gophercloud.ServiceClient{
+ ProviderClient: &gophercloud.ProviderClient{},
+ Endpoint: upstream.URL,
+ }
+ sc.HTTPClient = *upstream.Client()
+
+ ts := NewTraitSyncer(cl, "test-traits", "default", sc)
+ ts.sync(context.Background())
+
+ cm := &corev1.ConfigMap{}
+ if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-traits"}, cm); err != nil {
+ t.Fatalf("get ConfigMap: %v", err)
+ }
+ if cm.Data[configMapKeyTraits] != `["CUSTOM_ORIGINAL"]` {
+ t.Fatalf("sync should not have modified ConfigMap on error, got %q", cm.Data[configMapKeyTraits])
+ }
+}
From 891e77d715bcfcdf23eb79e3ca21c97dc5325922 Mon Sep 17 00:00:00 2001
From: Philipp Matthes
Date: Wed, 29 Apr 2026 11:40:15 +0200
Subject: [PATCH 2/2] Address review feedback: body copy, sync gating, lock
serialization
- Copy upstream response body in hybrid mode hooks so error payloads
are preserved for the caller.
- Only start TraitSyncer.Run in hybrid/passthrough modes to prevent
the background sync from overwriting local CRD-mode state.
- Acquire the same resource lock in TraitSyncer.sync to serialize
with concurrent CRUD operations on the ConfigMap.
---
internal/shim/placement/handle_traits.go | 8 +++++++-
internal/shim/placement/shim.go | 6 +++++-
internal/shim/placement/syncer_traits.go | 20 ++++++++++++++++++-
internal/shim/placement/syncer_traits_test.go | 11 +++++-----
4 files changed, 37 insertions(+), 8 deletions(-)
diff --git a/internal/shim/placement/handle_traits.go b/internal/shim/placement/handle_traits.go
index 3e10ab6fc..d815b9d55 100644
--- a/internal/shim/placement/handle_traits.go
+++ b/internal/shim/placement/handle_traits.go
@@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
+ "io"
"net/http"
"os"
"sort"
@@ -203,13 +204,15 @@ func (s *Shim) handleUpdateTraitHybrid(w http.ResponseWriter, r *http.Request) {
}
s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) {
- // Copy the upstream response to the caller.
for k, vs := range resp.Header {
for _, v := range vs {
w.Header().Add(k, v)
}
}
w.WriteHeader(resp.StatusCode)
+ if resp.Body != nil {
+ io.Copy(w, resp.Body) //nolint:errcheck
+ }
if resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusNoContent {
if _, err := s.addTraitToConfigMap(ctx, name); err != nil {
@@ -289,6 +292,9 @@ func (s *Shim) handleDeleteTraitHybrid(w http.ResponseWriter, r *http.Request) {
}
}
w.WriteHeader(resp.StatusCode)
+ if resp.Body != nil {
+ io.Copy(w, resp.Body) //nolint:errcheck
+ }
if resp.StatusCode == http.StatusNoContent {
if _, err := s.removeTraitFromConfigMap(ctx, name); err != nil {
diff --git a/internal/shim/placement/shim.go b/internal/shim/placement/shim.go
index 35cbd2303..f601788a4 100644
--- a/internal/shim/placement/shim.go
+++ b/internal/shim/placement/shim.go
@@ -442,6 +442,7 @@ func (s *Shim) Start(ctx context.Context) error {
s.config.Traits.ConfigMapName,
os.Getenv("POD_NAMESPACE"),
s.placementServiceClient,
+ s.resourceLocker,
))
}
for _, syncer := range s.syncers {
@@ -449,8 +450,11 @@ func (s *Shim) Start(ctx context.Context) error {
return err
}
}
+ traitsMode := s.config.Features.Traits.orDefault()
for _, syncer := range s.syncers {
- go syncer.Run(ctx)
+ if traitsMode == FeatureModeHybrid || traitsMode == FeatureModePassthrough {
+ go syncer.Run(ctx)
+ }
}
return nil
}
diff --git a/internal/shim/placement/syncer_traits.go b/internal/shim/placement/syncer_traits.go
index 735bcb825..d8067e1f3 100644
--- a/internal/shim/placement/syncer_traits.go
+++ b/internal/shim/placement/syncer_traits.go
@@ -10,8 +10,10 @@ import (
"math/rand"
"net/http"
"net/url"
+ "os"
"time"
+ "github.com/cobaltcore-dev/cortex/pkg/resourcelock"
"github.com/gophercloud/gophercloud/v2"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -27,6 +29,7 @@ type TraitSyncer struct {
configMapName string
namespace string
placementClient *gophercloud.ServiceClient
+ resourceLocker *resourcelock.ResourceLocker
}
func NewTraitSyncer(
@@ -34,6 +37,7 @@ func NewTraitSyncer(
configMapName string,
namespace string,
placementClient *gophercloud.ServiceClient,
+ resourceLocker *resourcelock.ResourceLocker,
) *TraitSyncer {
return &TraitSyncer{
@@ -41,6 +45,7 @@ func NewTraitSyncer(
configMapName: configMapName,
namespace: namespace,
placementClient: placementClient,
+ resourceLocker: resourceLocker,
}
}
@@ -108,7 +113,7 @@ func (ts *TraitSyncer) Run(ctx context.Context) {
}
// sync fetches GET /traits from upstream placement and writes the result
-// into the ConfigMap.
+// into the ConfigMap under the resource lock.
func (ts *TraitSyncer) sync(ctx context.Context) {
log := ctrl.Log.WithName("placement-shim").WithName("trait-syncer")
u, err := url.JoinPath(ts.placementClient.Endpoint, "/traits")
@@ -134,6 +139,19 @@ func (ts *TraitSyncer) sync(ctx context.Context) {
return
}
+ host, _ := os.Hostname() //nolint:errcheck
+ lockerID := fmt.Sprintf("syncer-%s-%d", host, time.Now().UnixNano())
+ lockName := ts.configMapName + "-lock"
+ if err := ts.resourceLocker.AcquireLock(ctx, lockName, lockerID); err != nil {
+ log.Error(err, "Failed to acquire lock for trait sync")
+ return
+ }
+ defer func() {
+ releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ _ = ts.resourceLocker.ReleaseLock(releaseCtx, lockName, lockerID) //nolint:errcheck
+ }()
+
cm := &corev1.ConfigMap{}
if err := ts.client.Get(ctx, client.ObjectKey{Namespace: ts.namespace, Name: ts.configMapName}, cm); err != nil {
log.Error(err, "Failed to get traits ConfigMap for sync")
diff --git a/internal/shim/placement/syncer_traits_test.go b/internal/shim/placement/syncer_traits_test.go
index c3038c668..23cada155 100644
--- a/internal/shim/placement/syncer_traits_test.go
+++ b/internal/shim/placement/syncer_traits_test.go
@@ -11,6 +11,7 @@ import (
"testing"
"time"
+ "github.com/cobaltcore-dev/cortex/pkg/resourcelock"
"github.com/gophercloud/gophercloud/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -19,7 +20,7 @@ import (
func TestTraitSyncerInitCreatesConfigMap(t *testing.T) {
cl := newFakeClientWithScheme(t)
- ts := NewTraitSyncer(cl, "test-traits", "default", nil)
+ ts := NewTraitSyncer(cl, "test-traits", "default", nil, resourcelock.NewResourceLocker(cl, "default"))
if err := ts.Init(context.Background()); err != nil {
t.Fatalf("Init: %v", err)
@@ -40,7 +41,7 @@ func TestTraitSyncerInitIdempotent(t *testing.T) {
Data: map[string]string{configMapKeyTraits: `["CUSTOM_EXISTING"]`},
}
cl := newFakeClientWithScheme(t, existing)
- ts := NewTraitSyncer(cl, "test-traits", "default", nil)
+ ts := NewTraitSyncer(cl, "test-traits", "default", nil, resourcelock.NewResourceLocker(cl, "default"))
if err := ts.Init(context.Background()); err != nil {
t.Fatalf("Init: %v", err)
@@ -57,7 +58,7 @@ func TestTraitSyncerInitIdempotent(t *testing.T) {
func TestTraitSyncerRunNoClient(t *testing.T) {
cl := newFakeClientWithScheme(t)
- ts := NewTraitSyncer(cl, "test-traits", "default", nil)
+ ts := NewTraitSyncer(cl, "test-traits", "default", nil, resourcelock.NewResourceLocker(cl, "default"))
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
@@ -90,7 +91,7 @@ func TestTraitSyncerSyncWritesUpstreamTraits(t *testing.T) {
}
sc.HTTPClient = *upstream.Client()
- ts := NewTraitSyncer(cl, "test-traits", "default", sc)
+ ts := NewTraitSyncer(cl, "test-traits", "default", sc, resourcelock.NewResourceLocker(cl, "default"))
ts.sync(context.Background())
cm := &corev1.ConfigMap{}
@@ -131,7 +132,7 @@ func TestTraitSyncerSyncUpstreamError(t *testing.T) {
}
sc.HTTPClient = *upstream.Client()
- ts := NewTraitSyncer(cl, "test-traits", "default", sc)
+ ts := NewTraitSyncer(cl, "test-traits", "default", sc, resourcelock.NewResourceLocker(cl, "default"))
ts.sync(context.Background())
cm := &corev1.ConfigMap{}