From 0c0d4cc18787806b30dfeee06a0c9bbc1974a421 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Wed, 29 Apr 2026 11:59:32 +0200 Subject: [PATCH 1/4] feat: Feature-gated /resource_classes API with ConfigMap storage Implement the /resource_classes endpoints following the same pattern established by the traits refactor: a ResourceClassSyncer that owns a single ConfigMap, three feature modes (passthrough, hybrid, crd), and E2E tests that exercise all modes via header override. --- .../bundles/cortex-placement-shim/values.yaml | 2 + .../shim/placement/handle_resource_classes.go | 508 ++++++++++++++++-- .../placement/handle_resource_classes_e2e.go | 271 ++++++---- .../placement/handle_resource_classes_test.go | 269 ++++++---- internal/shim/placement/shim.go | 38 +- internal/shim/placement/shim_test.go | 36 ++ .../shim/placement/syncer_resource_classes.go | 184 +++++++ .../placement/syncer_resource_classes_test.go | 149 +++++ 8 files changed, 1226 insertions(+), 231 deletions(-) create mode 100644 internal/shim/placement/syncer_resource_classes.go create mode 100644 internal/shim/placement/syncer_resource_classes_test.go diff --git a/helm/bundles/cortex-placement-shim/values.yaml b/helm/bundles/cortex-placement-shim/values.yaml index 54805cee3..eaf725d07 100644 --- a/helm/bundles/cortex-placement-shim/values.yaml +++ b/helm/bundles/cortex-placement-shim/values.yaml @@ -62,6 +62,8 @@ cortex-shim: status: "CURRENT" traits: configMapName: "cortex-placement-shim-traits" + resourceClasses: + configMapName: "cortex-placement-shim-resource-classes" auth: tokenCacheTTL: "5m" policies: diff --git a/internal/shim/placement/handle_resource_classes.go b/internal/shim/placement/handle_resource_classes.go index 9067079fd..2ae5ae049 100644 --- a/internal/shim/placement/handle_resource_classes.go +++ b/internal/shim/placement/handle_resource_classes.go @@ -4,68 +4,512 @@ package placement import ( + "context" + "encoding/json" + "fmt" + "io" "net/http" + "os" + "sort" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" ) // HandleListResourceClasses handles GET /resource_classes requests. // -// Returns the complete list of all resource classes, including both standard -// classes (e.g. VCPU, MEMORY_MB, DISK_GB, PCI_DEVICE, SRIOV_NET_VF) and -// deployer-defined custom classes prefixed with CUSTOM_. Resource classes -// categorize the types of resources that resource providers can offer as -// inventory. Available since microversion 1.2. +// Feature modes: +// - passthrough: forwards to upstream placement. +// - hybrid: forwards to upstream placement. +// - crd: serves the resource class list from the local ConfigMap. +// +// See: https://docs.openstack.org/api-ref/placement/#list-resource-classes func (s *Shim) HandleListResourceClasses(w http.ResponseWriter, r *http.Request) { - s.dispatchPassthroughOnly(w, r, s.config.Features.ResourceClasses) + ctx := r.Context() + log := logf.FromContext(ctx) + + switch s.featureModeFromConfOrHeader(r, s.config.Features.ResourceClasses) { + case FeatureModePassthrough, FeatureModeHybrid: + s.forward(w, r) + return + case FeatureModeCRD: + // Serve from local ConfigMap. + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + return + } + + rcSet, err := s.getResourceClasses(ctx) + if err != nil { + log.Error(err, "failed to list resource classes from configmap") + http.Error(w, "failed to list resource classes", http.StatusInternalServerError) + return + } + + entries := make([]resourceClassEntry, 0, len(rcSet)) + for name := range rcSet { + entries = append(entries, resourceClassEntry{Name: name}) + } + sort.Slice(entries, func(i, j int) bool { return entries[i].Name < entries[j].Name }) + + log.Info("listing all resource classes", "count", len(entries)) + s.writeJSON(w, http.StatusOK, resourceClassesListResponse{ResourceClasses: entries}) } // HandleCreateResourceClass handles POST /resource_classes requests. // -// Creates a new custom resource class. The name must be prefixed with CUSTOM_ -// to distinguish it from standard resource classes. Returns 201 Created with -// a Location header on success. Returns 400 Bad Request if the CUSTOM_ prefix -// is missing, and 409 Conflict if a class with the same name already exists. -// Available since microversion 1.2. +// Feature modes: +// - passthrough: forwards to upstream placement. +// - hybrid: forwards to upstream; on success, adds the class to the local ConfigMap. +// - crd: writes the class to the local ConfigMap (CUSTOM_ prefix required). +// +// See: https://docs.openstack.org/api-ref/placement/#create-resource-class func (s *Shim) HandleCreateResourceClass(w http.ResponseWriter, r *http.Request) { - s.dispatchPassthroughOnly(w, r, s.config.Features.ResourceClasses) + ctx := r.Context() + log := logf.FromContext(ctx) + + mode := s.featureModeFromConfOrHeader(r, s.config.Features.ResourceClasses) + switch mode { + case FeatureModePassthrough: + s.forward(w, r) + return + case FeatureModeHybrid: + s.handleCreateResourceClassHybrid(w, r) + return + case FeatureModeCRD: + // Handle locally. + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + return + } + + var body struct { + Name string `json:"name"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Name == "" { + http.Error(w, "request body must contain a valid 'name' field", http.StatusBadRequest) + return + } + if !strings.HasPrefix(body.Name, "CUSTOM_") { + log.Info("rejected resource class without CUSTOM_ prefix", "class", body.Name) + http.Error(w, "resource class name must start with CUSTOM_", http.StatusBadRequest) + return + } + + exists, err := s.hasResourceClass(ctx, body.Name) + if err != nil { + log.Error(err, "failed to check resource class", "class", body.Name) + http.Error(w, "failed to check resource class", http.StatusInternalServerError) + return + } + if exists { + http.Error(w, "resource class already exists", http.StatusConflict) + return + } + + if _, err := s.addResourceClassToConfigMap(ctx, body.Name); err != nil { + log.Error(err, "failed to create resource class", "class", body.Name) + http.Error(w, "failed to create resource class", http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusCreated) +} + +// handleCreateResourceClassHybrid forwards POST /resource_classes to upstream, +// then updates the local ConfigMap on success. +func (s *Shim) handleCreateResourceClassHybrid(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { + body, _ := io.ReadAll(resp.Body) //nolint:errcheck + for k, vs := range resp.Header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + w.Write(body) //nolint:errcheck + + if resp.StatusCode == http.StatusCreated { + var created struct { + Name string `json:"name"` + } + if err := json.Unmarshal(body, &created); err == nil && created.Name != "" { + if _, err := s.addResourceClassToConfigMap(ctx, created.Name); err != nil { + log.Error(err, "hybrid: failed to add resource class to local configmap", "class", created.Name) + } + } + } + }) } // HandleShowResourceClass handles GET /resource_classes/{name} requests. // -// Returns a representation of a single resource class identified by name. -// This can be used to verify the existence of a resource class. Returns 404 -// if the class does not exist. Available since microversion 1.2. +// Feature modes: +// - passthrough: forwards to upstream placement. +// - hybrid: forwards to upstream placement. +// - crd: checks the local ConfigMap for the resource class. +// +// See: https://docs.openstack.org/api-ref/placement/#show-resource-class func (s *Shim) HandleShowResourceClass(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredPathParam(w, r, "name"); !ok { + ctx := r.Context() + log := logf.FromContext(ctx) + + switch s.featureModeFromConfOrHeader(r, s.config.Features.ResourceClasses) { + case FeatureModePassthrough, FeatureModeHybrid: + s.forward(w, r) + return + case FeatureModeCRD: + // Serve from local ConfigMap. + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + return + } + + name, ok := requiredPathParam(w, r, "name") + if !ok { + return + } + found, err := s.hasResourceClass(ctx, name) + if err != nil { + log.Error(err, "failed to check resource class", "class", name) + http.Error(w, "failed to check resource class", http.StatusInternalServerError) return } - s.dispatchPassthroughOnly(w, r, s.config.Features.ResourceClasses) + if !found { + log.Info("resource class not found", "class", name) + http.Error(w, "resource class not found", http.StatusNotFound) + return + } + log.Info("resource class found", "class", name) + s.writeJSON(w, http.StatusOK, resourceClassEntry{Name: name}) } // HandleUpdateResourceClass handles PUT /resource_classes/{name} requests. // -// Behavior differs by microversion. Since microversion 1.7, this endpoint -// creates or validates the existence of a single resource class: it returns -// 201 Created for a new class or 204 No Content if the class already exists. -// The name must carry the CUSTOM_ prefix. In earlier versions (1.2-1.6), the -// endpoint allowed renaming a class via a request body, but this usage is -// discouraged. Returns 400 Bad Request if the CUSTOM_ prefix is missing. +// Feature modes: +// - passthrough: forwards to upstream placement. +// - hybrid: forwards to upstream; on success, adds the class to the local ConfigMap. +// - crd: writes the class to the local ConfigMap (CUSTOM_ prefix required). +// +// See: https://docs.openstack.org/api-ref/placement/#update-resource-class func (s *Shim) HandleUpdateResourceClass(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredPathParam(w, r, "name"); !ok { + ctx := r.Context() + log := logf.FromContext(ctx) + + mode := s.featureModeFromConfOrHeader(r, s.config.Features.ResourceClasses) + switch mode { + case FeatureModePassthrough: + s.forward(w, r) + return + case FeatureModeHybrid: + s.handleUpdateResourceClassHybrid(w, r) + return + case FeatureModeCRD: + // Handle locally. + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + return + } + + name, ok := requiredPathParam(w, r, "name") + if !ok { + return + } + if !strings.HasPrefix(name, "CUSTOM_") { + log.Info("rejected resource class without CUSTOM_ prefix", "class", name) + http.Error(w, "resource class name must start with CUSTOM_", http.StatusBadRequest) + return + } + + created, err := s.addResourceClassToConfigMap(ctx, name) + if err != nil { + log.Error(err, "failed to create resource class", "class", name) + http.Error(w, "failed to create resource class", http.StatusInternalServerError) + return + } + if created { + w.WriteHeader(http.StatusCreated) + } else { + w.WriteHeader(http.StatusNoContent) + } +} + +// handleUpdateResourceClassHybrid forwards PUT /resource_classes/{name} to +// upstream, then updates the local ConfigMap on success. +func (s *Shim) handleUpdateResourceClassHybrid(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + name, ok := requiredPathParam(w, r, "name") + if !ok { return } - s.dispatchPassthroughOnly(w, r, s.config.Features.ResourceClasses) + + s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { + for k, vs := range resp.Header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + if resp.Body != nil { + io.Copy(w, resp.Body) //nolint:errcheck + } + + if resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusNoContent { + if _, err := s.addResourceClassToConfigMap(ctx, name); err != nil { + log.Error(err, "hybrid: failed to add resource class to local configmap", "class", name) + } + } + }) } // HandleDeleteResourceClass handles DELETE /resource_classes/{name} requests. // -// Deletes a custom resource class. Only custom classes (prefixed with CUSTOM_) -// may be deleted; attempting to delete a standard class returns 400 Bad -// Request. Returns 409 Conflict if any resource provider has inventory of this -// class, and 404 if the class does not exist. Returns 204 No Content on -// success. Available since microversion 1.2. +// Feature modes: +// - passthrough: forwards to upstream placement. +// - hybrid: forwards to upstream; on success, removes the class from the local ConfigMap. +// - crd: removes the class from the local ConfigMap (CUSTOM_ prefix required). +// +// See: https://docs.openstack.org/api-ref/placement/#delete-resource-class func (s *Shim) HandleDeleteResourceClass(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredPathParam(w, r, "name"); !ok { + ctx := r.Context() + log := logf.FromContext(ctx) + + mode := s.featureModeFromConfOrHeader(r, s.config.Features.ResourceClasses) + switch mode { + case FeatureModePassthrough: + s.forward(w, r) + return + case FeatureModeHybrid: + s.handleDeleteResourceClassHybrid(w, r) + return + case FeatureModeCRD: + // Handle locally. + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + return + } + + name, ok := requiredPathParam(w, r, "name") + if !ok { + return + } + if !strings.HasPrefix(name, "CUSTOM_") { + log.Info("rejected deletion of standard resource class", "class", name) + http.Error(w, "cannot delete standard resource classes", http.StatusBadRequest) + return + } + + removed, err := s.removeResourceClassFromConfigMap(ctx, name) + if err != nil { + log.Error(err, "failed to delete resource class", "class", name) + http.Error(w, "failed to delete resource class", http.StatusInternalServerError) + return + } + if !removed { + log.Info("resource class not found in configmap", "class", name) + http.Error(w, "resource class not found", http.StatusNotFound) return } - s.dispatchPassthroughOnly(w, r, s.config.Features.ResourceClasses) + log.Info("deleted resource class from configmap", "class", name) + w.WriteHeader(http.StatusNoContent) +} + +// handleDeleteResourceClassHybrid forwards DELETE /resource_classes/{name} to +// upstream, then updates the local ConfigMap on success. +func (s *Shim) handleDeleteResourceClassHybrid(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + name, ok := requiredPathParam(w, r, "name") + if !ok { + return + } + + s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { + for k, vs := range resp.Header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + if resp.Body != nil { + io.Copy(w, resp.Body) //nolint:errcheck + } + + if resp.StatusCode == http.StatusNoContent { + if _, err := s.removeResourceClassFromConfigMap(ctx, name); err != nil { + log.Error(err, "hybrid: failed to remove resource class from local configmap", "class", name) + } + } + }) +} + +// getResourceClasses reads resource classes from the single ConfigMap. +func (s *Shim) getResourceClasses(ctx context.Context) (map[string]struct{}, error) { + cm := &corev1.ConfigMap{} + if err := s.Get(ctx, client.ObjectKey{Namespace: os.Getenv("POD_NAMESPACE"), Name: s.config.ResourceClasses.ConfigMapName}, cm); err != nil { + return nil, fmt.Errorf("get resource classes configmap %s: %w", s.config.ResourceClasses.ConfigMapName, err) + } + return parseResourceClasses(cm) +} + +// parseResourceClasses extracts the resource class set from a ConfigMap. +func parseResourceClasses(cm *corev1.ConfigMap) (map[string]struct{}, error) { + raw, ok := cm.Data[configMapKeyResourceClasses] + if !ok || raw == "" { + return make(map[string]struct{}), nil + } + var classes []string + if err := json.Unmarshal([]byte(raw), &classes); err != nil { + return nil, fmt.Errorf("unmarshal resource classes from configmap: %w", err) + } + m := make(map[string]struct{}, len(classes)) + for _, c := range classes { + m[c] = struct{}{} + } + return m, nil +} + +func (s *Shim) hasResourceClass(ctx context.Context, name string) (bool, error) { + classes, err := s.getResourceClasses(ctx) + if err != nil { + return false, err + } + _, ok := classes[name] + return ok, nil +} + +// writeResourceClassesToConfigMap serializes the resource class set into the ConfigMap. +func writeResourceClassesToConfigMap(cm *corev1.ConfigMap, rcSet map[string]struct{}) error { + classes := make([]string, 0, len(rcSet)) + for c := range rcSet { + classes = append(classes, c) + } + sort.Strings(classes) + + data, err := json.Marshal(classes) + if err != nil { + return fmt.Errorf("marshal resource classes: %w", err) + } + if cm.Data == nil { + cm.Data = make(map[string]string) + } + cm.Data[configMapKeyResourceClasses] = string(data) + return nil +} + +// addResourceClassToConfigMap adds a resource class to the ConfigMap under the +// resource lock. Returns true if the class was newly created, false if it +// already existed. +func (s *Shim) addResourceClassToConfigMap(ctx context.Context, name string) (bool, error) { + classes, err := s.getResourceClasses(ctx) + if err != nil { + return false, err + } + if _, exists := classes[name]; exists { + return false, nil + } + + host, err := os.Hostname() + if err != nil { + return false, fmt.Errorf("get hostname: %w", err) + } + lockerID := fmt.Sprintf("shim-%s-%d", host, time.Now().UnixNano()) + if err := s.resourceLocker.AcquireLock(ctx, s.config.ResourceClasses.ConfigMapName+"-lock", lockerID); err != nil { + return false, fmt.Errorf("acquire resource classes lock: %w", err) + } + defer func() { + releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = s.resourceLocker.ReleaseLock(releaseCtx, s.config.ResourceClasses.ConfigMapName+"-lock", lockerID) //nolint:errcheck + }() + + cm := &corev1.ConfigMap{} + key := client.ObjectKey{Namespace: os.Getenv("POD_NAMESPACE"), Name: s.config.ResourceClasses.ConfigMapName} + if err := s.Get(ctx, key, cm); err != nil { + if apierrors.IsNotFound(err) { + cm = &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: key.Name, + Namespace: key.Namespace, + }, + Data: map[string]string{configMapKeyResourceClasses: "[]"}, + } + current := map[string]struct{}{name: {}} + if err := writeResourceClassesToConfigMap(cm, current); err != nil { + return false, err + } + if err := s.Create(ctx, cm); err != nil { + return false, fmt.Errorf("create resource classes configmap: %w", err) + } + return true, nil + } + return false, fmt.Errorf("get resource classes configmap: %w", err) + } + + current, err := parseResourceClasses(cm) + if err != nil { + return false, err + } + if _, exists := current[name]; exists { + return false, nil + } + current[name] = struct{}{} + if err := writeResourceClassesToConfigMap(cm, current); err != nil { + return false, err + } + if err := s.Update(ctx, cm); err != nil { + return false, fmt.Errorf("update resource classes configmap: %w", err) + } + return true, nil +} + +// removeResourceClassFromConfigMap removes a resource class from the ConfigMap +// under the resource lock. Returns true if the class was found and removed. +func (s *Shim) removeResourceClassFromConfigMap(ctx context.Context, name string) (bool, error) { + host, err := os.Hostname() + if err != nil { + return false, fmt.Errorf("get hostname: %w", err) + } + lockerID := fmt.Sprintf("shim-%s-%d", host, time.Now().UnixNano()) + if err := s.resourceLocker.AcquireLock(ctx, s.config.ResourceClasses.ConfigMapName+"-lock", lockerID); err != nil { + return false, fmt.Errorf("acquire resource classes lock: %w", err) + } + defer func() { + releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = s.resourceLocker.ReleaseLock(releaseCtx, s.config.ResourceClasses.ConfigMapName+"-lock", lockerID) //nolint:errcheck + }() + + cm := &corev1.ConfigMap{} + if err := s.Get(ctx, client.ObjectKey{Namespace: os.Getenv("POD_NAMESPACE"), Name: s.config.ResourceClasses.ConfigMapName}, cm); err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("get resource classes configmap: %w", err) + } + current, err := parseResourceClasses(cm) + if err != nil { + return false, err + } + if _, exists := current[name]; !exists { + return false, nil + } + delete(current, name) + if err := writeResourceClassesToConfigMap(cm, current); err != nil { + return false, err + } + if err := s.Update(ctx, cm); err != nil { + return false, fmt.Errorf("update resource classes configmap: %w", err) + } + return true, nil } diff --git a/internal/shim/placement/handle_resource_classes_e2e.go b/internal/shim/placement/handle_resource_classes_e2e.go index e848ee034..6bc302cfe 100644 --- a/internal/shim/placement/handle_resource_classes_e2e.go +++ b/internal/shim/placement/handle_resource_classes_e2e.go @@ -6,6 +6,7 @@ package placement import ( "context" "encoding/json" + "errors" "fmt" "net/http" @@ -17,13 +18,24 @@ import ( // e2eTestResourceClasses tests the /resource_classes and // /resource_classes/{name} endpoints. // -// 1. Pre-cleanup: DELETE any leftover custom resource class (ignore 404). -// 2. GET /resource_classes — list all classes and verify the response. -// 3. GET /resource_classes/VCPU — confirm a standard class is retrievable. -// 4. PUT /resource_classes/{name} — create a custom test class. -// 5. GET /resource_classes/{name} — verify the custom class now exists. -// 6. DELETE /resource_classes/{name} — remove the custom class. -// 7. GET /resource_classes/{name} — confirm deletion returns 404. +// Phase 1 — read-only (always runs): +// +// 1. GET /resource_classes — list all resource classes; when mode is +// passthrough (forwarding to upstream) verify at least one class exists. +// 2. GET /resource_classes/VCPU — verify a standard class is retrievable +// (skipped when the list is empty). +// 3. GET /resource_classes/{name} — show a nonexistent class and verify 404. +// +// Phase 2 — CRUD (only when mode is non-passthrough): +// +// 1. Pre-cleanup: DELETE any leftover test class (ignore 404). +// 2. PUT /resource_classes/{name} — create a custom test class → 201. +// 3. PUT /resource_classes/{name} — idempotent create → 204. +// 4. GET /resource_classes/{name} — verify the custom class exists → 200. +// 5. DELETE /resource_classes/{name} — remove the custom class → 204. +// 6. GET /resource_classes/{name} — confirm deletion → 404. +// 7. PUT /resource_classes/{name} — bad prefix → 400. +// 8. DELETE /resource_classes/{name} — bad prefix → 400. func e2eTestResourceClasses(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running resource classes endpoint e2e test") @@ -40,197 +52,250 @@ func e2eTestResourceClasses(ctx context.Context, _ client.Client) error { } log.Info("Successfully created openstack client for resource classes e2e test") - const testRC = "CUSTOM_CORTEX_E2E_RC" + // ==================== Phase 1: read-only tests ==================== - // Probe: for non-passthrough modes, verify endpoint returns 501. - unimplemented, err := e2eProbeUnimplemented(ctx, sc, sc.Endpoint+"/resource_classes") - if err != nil { - return fmt.Errorf("probe: %w", err) - } - if unimplemented { - return nil - } + log.Info("=== Phase 1: read-only resource class tests ===") - // Pre-cleanup: delete any leftover test resource class from a prior run. - log.Info("Pre-cleanup: deleting leftover test resource class", "class", testRC) + rcMode := e2eCurrentMode(ctx) + + // Test GET /resource_classes + log.Info("Testing GET /resource_classes endpoint") req, err := http.NewRequestWithContext(ctx, - http.MethodDelete, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody) + http.MethodGet, sc.Endpoint+"/resource_classes", http.NoBody) if err != nil { - log.Error(err, "failed to create pre-cleanup request") - return err + return fmt.Errorf("failed to create GET /resource_classes request: %w", err) } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.7") + req.Header.Set("Accept", "application/json") resp, err := sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send pre-cleanup request") - return err + return fmt.Errorf("failed to send GET /resource_classes request: %w", err) } defer resp.Body.Close() - // Ignore 404 (not found) — that's expected if no leftover exists. - if resp.StatusCode != http.StatusNotFound && - (resp.StatusCode < 200 || resp.StatusCode >= 300) { - err := fmt.Errorf("unexpected status code during pre-cleanup: %d", resp.StatusCode) - log.Error(err, "pre-cleanup failed") - return err + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("GET /resource_classes: expected 200, got %d", resp.StatusCode) } - log.Info("Pre-cleanup completed", "status", resp.StatusCode) + var listResp resourceClassesListResponse + if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil { + return fmt.Errorf("failed to decode GET /resource_classes response: %w", err) + } + if rcMode == FeatureModePassthrough && len(listResp.ResourceClasses) == 0 { + return errors.New("GET /resource_classes: expected at least one class when forwarding to upstream, got 0") + } + log.Info("Successfully retrieved resource classes", "count", len(listResp.ResourceClasses)) - // Test GET /resource_classes - log.Info("Testing GET /resource_classes endpoint of placement shim") + // Test GET /resource_classes/{name} for a known class (skip when list is empty). + if len(listResp.ResourceClasses) > 0 { + knownClass := listResp.ResourceClasses[0].Name + log.Info("Testing GET /resource_classes/{name} for known class", "class", knownClass) + req, err = http.NewRequestWithContext(ctx, + http.MethodGet, sc.Endpoint+"/resource_classes/"+knownClass, http.NoBody) + if err != nil { + return fmt.Errorf("failed to create GET request for class %s: %w", knownClass, err) + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.7") + req.Header.Set("Accept", "application/json") + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send GET request for class %s: %w", knownClass, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("GET /resource_classes/%s: expected 200, got %d", knownClass, resp.StatusCode) + } + log.Info("Successfully verified known class exists", "class", knownClass) + } else { + log.Info("Skipping GET /resource_classes/{name} for known class, list is empty") + } + + // Test GET /resource_classes/{name} for a nonexistent class. + log.Info("Testing GET /resource_classes/{name} for nonexistent class") req, err = http.NewRequestWithContext(ctx, - http.MethodGet, sc.Endpoint+"/resource_classes", http.NoBody) + http.MethodGet, sc.Endpoint+"/resource_classes/CUSTOM_CORTEX_E2E_NONEXISTENT", http.NoBody) if err != nil { - log.Error(err, "failed to create request for resource_classes endpoint") - return err + return fmt.Errorf("failed to create GET request for nonexistent class: %w", err) } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.7") req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send request to /resource_classes endpoint") - return err + return fmt.Errorf("failed to send GET request for nonexistent class: %w", err) } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "/resource_classes endpoint returned an error") - return err + if resp.StatusCode != http.StatusNotFound { + return fmt.Errorf("GET /resource_classes/CUSTOM_CORTEX_E2E_NONEXISTENT: expected 404, got %d", resp.StatusCode) } - var list struct { - ResourceClasses []struct { - Name string `json:"name"` - } `json:"resource_classes"` + log.Info("Correctly received 404 for nonexistent resource class") + + // ==================== Phase 2: CRUD tests ==================== + + log.Info("=== Phase 2: CRUD resource class tests ===") + + const testRC = "CUSTOM_CORTEX_E2E_RC" + + // Pre-cleanup: delete any leftover test class from a prior run. + log.Info("Pre-cleanup: deleting leftover test resource class", "class", testRC) + req, err = http.NewRequestWithContext(ctx, + http.MethodDelete, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody) + if err != nil { + return fmt.Errorf("failed to create pre-cleanup request: %w", err) } - err = json.NewDecoder(resp.Body).Decode(&list) + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.7") + resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to decode response from /resource_classes endpoint") - return err + return fmt.Errorf("failed to send pre-cleanup request: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNotFound && resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("pre-cleanup DELETE /resource_classes/%s: unexpected status %d", testRC, resp.StatusCode) } - log.Info("Successfully retrieved resource classes from placement shim", - "count", len(list.ResourceClasses)) + log.Info("Pre-cleanup completed", "status", resp.StatusCode) - // Test GET /resource_classes/{name} for a standard class - log.Info("Testing GET /resource_classes/VCPU endpoint of placement shim") + // Test PUT /resource_classes/{name} — create → 201. + log.Info("Testing PUT /resource_classes/{name} to create custom class", "class", testRC) req, err = http.NewRequestWithContext(ctx, - http.MethodGet, sc.Endpoint+"/resource_classes/VCPU", http.NoBody) + http.MethodPut, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody) if err != nil { - log.Error(err, "failed to create request for resource_classes/VCPU endpoint") - return err + return fmt.Errorf("failed to create PUT request for class %s: %w", testRC, err) } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.7") - req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send request to /resource_classes/VCPU endpoint") - return err + return fmt.Errorf("failed to send PUT request for class %s: %w", testRC, err) } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "/resource_classes/VCPU endpoint returned an error") - return err + if resp.StatusCode != http.StatusCreated { + return fmt.Errorf("PUT /resource_classes/%s (create): expected 201, got %d", testRC, resp.StatusCode) } - log.Info("Successfully retrieved standard resource class VCPU from placement shim") + log.Info("Successfully created custom resource class", "class", testRC) - // Test PUT /resource_classes/{name} (create custom class) - log.Info("Testing PUT /resource_classes/{name} to create custom class", "class", testRC) + // Test PUT /resource_classes/{name} — idempotent → 204. + log.Info("Testing PUT /resource_classes/{name} idempotent create", "class", testRC) req, err = http.NewRequestWithContext(ctx, http.MethodPut, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody) if err != nil { - log.Error(err, "failed to create PUT request for resource_classes", "class", testRC) - return err + return fmt.Errorf("failed to create idempotent PUT request: %w", err) } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.7") - req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send PUT request to /resource_classes", "class", testRC) - return err + return fmt.Errorf("failed to send idempotent PUT request: %w", err) } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "PUT /resource_classes returned an error", "class", testRC) - return err + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("PUT /resource_classes/%s (idempotent): expected 204, got %d", testRC, resp.StatusCode) } - log.Info("Successfully created custom resource class", "class", testRC, - "status", resp.StatusCode) + log.Info("Successfully verified idempotent PUT", "class", testRC) - // Test GET /resource_classes/{name} for the custom class + // Test GET /resource_classes/{name} — verify exists → 200. log.Info("Testing GET /resource_classes/{name} for custom class", "class", testRC) req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for custom resource class", "class", testRC) - return err + return fmt.Errorf("failed to create GET request for class %s: %w", testRC, err) } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.7") req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for custom resource class", "class", testRC) - return err + return fmt.Errorf("failed to send GET request for class %s: %w", testRC, err) } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET custom resource class returned an error", "class", testRC) - return err + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("GET /resource_classes/%s: expected 200, got %d", testRC, resp.StatusCode) } log.Info("Successfully verified custom resource class exists", "class", testRC) - // Cleanup: Test DELETE /resource_classes/{name} - log.Info("Cleaning up test resource class from placement shim", "class", testRC) + // Cleanup: DELETE /resource_classes/{name} → 204. + log.Info("Cleaning up test resource class", "class", testRC) req, err = http.NewRequestWithContext(ctx, http.MethodDelete, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody) if err != nil { - log.Error(err, "failed to create DELETE request for resource class", "class", testRC) - return err + return fmt.Errorf("failed to create DELETE request for class %s: %w", testRC, err) } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.7") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send DELETE request for resource class", "class", testRC) - return err + return fmt.Errorf("failed to send DELETE request for class %s: %w", testRC, err) } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "DELETE resource class returned an error", "class", testRC) - return err + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("DELETE /resource_classes/%s: expected 204, got %d", testRC, resp.StatusCode) } log.Info("Successfully deleted test resource class", "class", testRC) - // Verify deletion: GET should return 404 + // Verify deletion: GET → 404. log.Info("Verifying test resource class was deleted", "class", testRC) req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody) if err != nil { - log.Error(err, "failed to create verification GET request", "class", testRC) - return err + return fmt.Errorf("failed to create verification GET request: %w", err) } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.7") req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send verification GET request", "class", testRC) - return err + return fmt.Errorf("failed to send verification GET request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusNotFound { - err := fmt.Errorf("expected 404 after deletion, got: %d", resp.StatusCode) - log.Error(err, "resource class still exists after deletion", "class", testRC) - return err + return fmt.Errorf("GET /resource_classes/%s after deletion: expected 404, got %d", + testRC, resp.StatusCode) } log.Info("Verified test resource class was deleted", "class", testRC) + // Bad-prefix validation is only enforced by the shim in crd mode. + if rcMode == FeatureModeCRD { + // Test PUT /resource_classes/{name} with bad prefix → 400. + log.Info("Testing PUT /resource_classes/{name} with non-CUSTOM_ prefix") + req, err = http.NewRequestWithContext(ctx, + http.MethodPut, sc.Endpoint+"/resource_classes/VCPU_CORTEX_E2E_BAD", http.NoBody) + if err != nil { + return fmt.Errorf("failed to create bad-prefix PUT request: %w", err) + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.7") + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send bad-prefix PUT request: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + return fmt.Errorf("PUT /resource_classes/VCPU_CORTEX_E2E_BAD: expected 400, got %d", resp.StatusCode) + } + log.Info("Correctly received 400 for PUT with non-CUSTOM_ prefix") + + // Test DELETE /resource_classes/{name} with bad prefix → 400. + log.Info("Testing DELETE /resource_classes/{name} with non-CUSTOM_ prefix") + req, err = http.NewRequestWithContext(ctx, + http.MethodDelete, sc.Endpoint+"/resource_classes/VCPU_CORTEX_E2E_BAD", http.NoBody) + if err != nil { + return fmt.Errorf("failed to create bad-prefix DELETE request: %w", err) + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.7") + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send bad-prefix DELETE request: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + return fmt.Errorf("DELETE /resource_classes/VCPU_CORTEX_E2E_BAD: expected 400, got %d", resp.StatusCode) + } + log.Info("Correctly received 400 for DELETE with non-CUSTOM_ prefix") + } else { + log.Info("Skipping bad-prefix validation tests (only enforced in crd mode)") + } + return nil } diff --git a/internal/shim/placement/handle_resource_classes_test.go b/internal/shim/placement/handle_resource_classes_test.go index 330cae9be..37e46cac1 100644 --- a/internal/shim/placement/handle_resource_classes_test.go +++ b/internal/shim/placement/handle_resource_classes_test.go @@ -4,11 +4,52 @@ package placement import ( + "bytes" + "context" + "encoding/json" "net/http" "testing" + + "github.com/cobaltcore-dev/cortex/pkg/resourcelock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) -func TestHandleListResourceClasses(t *testing.T) { +func newTestResourceClassConfigMap(namespace, name string, classes []string) *corev1.ConfigMap { + b, err := json.Marshal(classes) + if err != nil { + panic("marshal resource classes: " + err.Error()) + } + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, + Data: map[string]string{configMapKeyResourceClasses: string(b)}, + } +} + +func newResourceClassShim(t *testing.T, classes []string) *Shim { + t.Helper() + t.Setenv("POD_NAMESPACE", "default") + objs := []client.Object{newTestResourceClassConfigMap("default", "test-rc-cm", classes)} + cl := newFakeClientWithScheme(t, objs...) + down, up := newTestTimers() + return &Shim{ + Client: cl, + config: config{ + PlacementURL: "http://should-not-be-called:1234", + Features: featuresConfig{ResourceClasses: FeatureModeCRD}, + ResourceClasses: &resourceClassesConfig{ConfigMapName: "test-rc-cm"}, + }, + maxBodyLogSize: 4096, + downstreamRequestTimer: down, + upstreamRequestTimer: up, + resourceLocker: resourcelock.NewResourceLocker(cl, "default"), + } +} + +// --- Passthrough mode tests --- + +func TestHandleListResourceClassesPassthrough(t *testing.T) { var gotPath string s := newTestShim(t, http.StatusOK, `{"resource_classes":[]}`, &gotPath) w := serveHandler(t, "GET", "/resource_classes", s.HandleListResourceClasses, "/resource_classes") @@ -20,7 +61,7 @@ func TestHandleListResourceClasses(t *testing.T) { } } -func TestHandleCreateResourceClass(t *testing.T) { +func TestHandleCreateResourceClassPassthrough(t *testing.T) { s := newTestShim(t, http.StatusCreated, "{}", nil) w := serveHandler(t, "POST", "/resource_classes", s.HandleCreateResourceClass, "/resource_classes") if w.Code != http.StatusCreated { @@ -28,7 +69,7 @@ func TestHandleCreateResourceClass(t *testing.T) { } } -func TestHandleShowResourceClass(t *testing.T) { +func TestHandleShowResourceClassPassthrough(t *testing.T) { var gotPath string s := newTestShim(t, http.StatusOK, "{}", &gotPath) w := serveHandler(t, "GET", "/resource_classes/{name}", s.HandleShowResourceClass, "/resource_classes/VCPU") @@ -40,7 +81,7 @@ func TestHandleShowResourceClass(t *testing.T) { } } -func TestHandleUpdateResourceClass(t *testing.T) { +func TestHandleUpdateResourceClassPassthrough(t *testing.T) { s := newTestShim(t, http.StatusNoContent, "", nil) w := serveHandler(t, "PUT", "/resource_classes/{name}", s.HandleUpdateResourceClass, "/resource_classes/CUSTOM_FOO") if w.Code != http.StatusNoContent { @@ -48,7 +89,7 @@ func TestHandleUpdateResourceClass(t *testing.T) { } } -func TestHandleDeleteResourceClass(t *testing.T) { +func TestHandleDeleteResourceClassPassthrough(t *testing.T) { s := newTestShim(t, http.StatusNoContent, "", nil) w := serveHandler(t, "DELETE", "/resource_classes/{name}", s.HandleDeleteResourceClass, "/resource_classes/CUSTOM_BAR") if w.Code != http.StatusNoContent { @@ -56,98 +97,138 @@ func TestHandleDeleteResourceClass(t *testing.T) { } } -func TestHandleResourceClasses_HybridMode(t *testing.T) { - down, up := newTestTimers() - s := &Shim{ - config: config{ - PlacementURL: "http://should-not-be-called:1234", - Features: featuresConfig{ResourceClasses: FeatureModeHybrid}, - }, - maxBodyLogSize: 4096, - downstreamRequestTimer: down, - upstreamRequestTimer: up, +// --- CRD mode handler tests --- + +func TestHandleListResourceClassesLocal(t *testing.T) { + s := newResourceClassShim(t, []string{"CUSTOM_FOO", "MEMORY_MB", "VCPU"}) + + w := serveHandler(t, "GET", "/resource_classes", s.HandleListResourceClasses, "/resource_classes") + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) } - t.Run("GET list returns 501", func(t *testing.T) { - w := serveHandler(t, "GET", "/resource_classes", - s.HandleListResourceClasses, "/resource_classes") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("POST returns 501", func(t *testing.T) { - w := serveHandler(t, "POST", "/resource_classes", - s.HandleCreateResourceClass, "/resource_classes") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("GET show returns 501", func(t *testing.T) { - w := serveHandler(t, "GET", "/resource_classes/{name}", - s.HandleShowResourceClass, "/resource_classes/VCPU") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("PUT returns 501", func(t *testing.T) { - w := serveHandler(t, "PUT", "/resource_classes/{name}", - s.HandleUpdateResourceClass, "/resource_classes/CUSTOM_FOO") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("DELETE returns 501", func(t *testing.T) { - w := serveHandler(t, "DELETE", "/resource_classes/{name}", - s.HandleDeleteResourceClass, "/resource_classes/CUSTOM_BAR") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) + var resp resourceClassesListResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if len(resp.ResourceClasses) != 3 { + t.Fatalf("got %d classes, want 3: %v", len(resp.ResourceClasses), resp.ResourceClasses) + } + want := []string{"CUSTOM_FOO", "MEMORY_MB", "VCPU"} + for i, rc := range resp.ResourceClasses { + if rc.Name != want[i] { + t.Errorf("class[%d] = %q, want %q", i, rc.Name, want[i]) } - }) + } } -func TestHandleResourceClasses_CRDMode(t *testing.T) { - down, up := newTestTimers() - s := &Shim{ - config: config{ - PlacementURL: "http://should-not-be-called:1234", - Features: featuresConfig{ResourceClasses: FeatureModeCRD}, - }, - maxBodyLogSize: 4096, - downstreamRequestTimer: down, - upstreamRequestTimer: up, +func TestHandleShowResourceClassLocalFound(t *testing.T) { + s := newResourceClassShim(t, []string{"VCPU", "MEMORY_MB"}) + w := serveHandler(t, "GET", "/resource_classes/{name}", s.HandleShowResourceClass, "/resource_classes/VCPU") + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } +} + +func TestHandleShowResourceClassLocalNotFound(t *testing.T) { + s := newResourceClassShim(t, []string{"VCPU"}) + w := serveHandler(t, "GET", "/resource_classes/{name}", s.HandleShowResourceClass, "/resource_classes/NONEXISTENT") + if w.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d", w.Code, http.StatusNotFound) + } +} + +func TestHandleUpdateResourceClassLocalCreated(t *testing.T) { + s := newResourceClassShim(t, nil) + w := serveHandler(t, "PUT", "/resource_classes/{name}", s.HandleUpdateResourceClass, "/resource_classes/CUSTOM_NEW") + if w.Code != http.StatusCreated { + t.Fatalf("status = %d, want %d", w.Code, http.StatusCreated) + } + found, err := s.hasResourceClass(context.Background(), "CUSTOM_NEW") + if err != nil { + t.Fatalf("hasResourceClass: %v", err) + } + if !found { + t.Error("expected resource class to be in store") + } +} + +func TestHandleUpdateResourceClassLocalAlreadyExists(t *testing.T) { + s := newResourceClassShim(t, []string{"CUSTOM_EXISTING"}) + w := serveHandler(t, "PUT", "/resource_classes/{name}", s.HandleUpdateResourceClass, "/resource_classes/CUSTOM_EXISTING") + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d", w.Code, http.StatusNoContent) + } +} + +func TestHandleUpdateResourceClassLocalBadPrefix(t *testing.T) { + s := newResourceClassShim(t, nil) + w := serveHandler(t, "PUT", "/resource_classes/{name}", s.HandleUpdateResourceClass, "/resource_classes/VCPU") + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) + } +} + +func TestHandleCreateResourceClassLocalCreated(t *testing.T) { + s := newResourceClassShim(t, nil) + body := bytes.NewBufferString(`{"name":"CUSTOM_NEW"}`) + w := serveHandlerWithBody(t, "POST", "/resource_classes", s.HandleCreateResourceClass, "/resource_classes", body) + if w.Code != http.StatusCreated { + t.Fatalf("status = %d, want %d", w.Code, http.StatusCreated) + } + found, err := s.hasResourceClass(context.Background(), "CUSTOM_NEW") + if err != nil { + t.Fatalf("hasResourceClass: %v", err) + } + if !found { + t.Error("expected resource class to be in store") + } +} + +func TestHandleCreateResourceClassLocalConflict(t *testing.T) { + s := newResourceClassShim(t, []string{"CUSTOM_EXISTING"}) + body := bytes.NewBufferString(`{"name":"CUSTOM_EXISTING"}`) + w := serveHandlerWithBody(t, "POST", "/resource_classes", s.HandleCreateResourceClass, "/resource_classes", body) + if w.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d", w.Code, http.StatusConflict) + } +} + +func TestHandleCreateResourceClassLocalBadPrefix(t *testing.T) { + s := newResourceClassShim(t, nil) + body := bytes.NewBufferString(`{"name":"VCPU"}`) + w := serveHandlerWithBody(t, "POST", "/resource_classes", s.HandleCreateResourceClass, "/resource_classes", body) + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) + } +} + +func TestHandleDeleteResourceClassLocal(t *testing.T) { + s := newResourceClassShim(t, []string{"CUSTOM_DEL"}) + w := serveHandler(t, "DELETE", "/resource_classes/{name}", s.HandleDeleteResourceClass, "/resource_classes/CUSTOM_DEL") + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d", w.Code, http.StatusNoContent) + } + found, err := s.hasResourceClass(context.Background(), "CUSTOM_DEL") + if err != nil { + t.Fatalf("hasResourceClass: %v", err) + } + if found { + t.Error("expected resource class to be deleted") + } +} + +func TestHandleDeleteResourceClassLocalNotFound(t *testing.T) { + s := newResourceClassShim(t, nil) + w := serveHandler(t, "DELETE", "/resource_classes/{name}", s.HandleDeleteResourceClass, "/resource_classes/CUSTOM_GONE") + if w.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d", w.Code, http.StatusNotFound) + } +} + +func TestHandleDeleteResourceClassLocalBadPrefix(t *testing.T) { + s := newResourceClassShim(t, []string{"VCPU"}) + w := serveHandler(t, "DELETE", "/resource_classes/{name}", s.HandleDeleteResourceClass, "/resource_classes/VCPU") + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) } - t.Run("GET list returns 501", func(t *testing.T) { - w := serveHandler(t, "GET", "/resource_classes", - s.HandleListResourceClasses, "/resource_classes") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("POST returns 501", func(t *testing.T) { - w := serveHandler(t, "POST", "/resource_classes", - s.HandleCreateResourceClass, "/resource_classes") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("GET show returns 501", func(t *testing.T) { - w := serveHandler(t, "GET", "/resource_classes/{name}", - s.HandleShowResourceClass, "/resource_classes/VCPU") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("PUT returns 501", func(t *testing.T) { - w := serveHandler(t, "PUT", "/resource_classes/{name}", - s.HandleUpdateResourceClass, "/resource_classes/CUSTOM_FOO") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("DELETE returns 501", func(t *testing.T) { - w := serveHandler(t, "DELETE", "/resource_classes/{name}", - s.HandleDeleteResourceClass, "/resource_classes/CUSTOM_BAR") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) } diff --git a/internal/shim/placement/shim.go b/internal/shim/placement/shim.go index f601788a4..09777defe 100644 --- a/internal/shim/placement/shim.go +++ b/internal/shim/placement/shim.go @@ -127,7 +127,7 @@ func (s *Shim) featureModeFromConfOrHeader(r *http.Request, configured FeatureMo } resolved := override.orDefault() if resolved == FeatureModeHybrid || resolved == FeatureModeCRD { - if s.config.Versioning == nil && s.config.Traits == nil { + if s.config.Versioning == nil && s.config.Traits == nil && s.config.ResourceClasses == nil { return configured.orDefault() } } @@ -167,6 +167,14 @@ type traitsConfig struct { ConfigMapName string `json:"configMapName"` } +// resourceClassesConfig configures the local resource class store used when +// features.resourceClasses is hybrid or crd. +type resourceClassesConfig struct { + // ConfigMapName is the name of the ConfigMap used to persist resource classes. + // Must exist in the same namespace as the shim pod. + ConfigMapName string `json:"configMapName"` +} + // config holds configuration for the placement shim. type config struct { // SSO is an optional configuration for the certificates the http client @@ -212,6 +220,9 @@ type config struct { // Traits configures the local trait store used when // features.traits is hybrid or crd. Traits *traitsConfig `json:"traits,omitempty"` + // ResourceClasses configures the local resource class store used when + // features.resourceClasses is hybrid or crd. + ResourceClasses *resourceClassesConfig `json:"resourceClasses,omitempty"` } // validate checks the config for required fields and returns an error if the @@ -258,6 +269,18 @@ func (c *config) validate() error { return errors.New("pod namespace (POD_NAMESPACE) is required when traits config is present") } } + rcMode := c.Features.ResourceClasses.orDefault() + if rcMode != FeatureModePassthrough && c.ResourceClasses == nil { + return fmt.Errorf("resourceClasses config is required when features.resourceClasses is %s", rcMode) + } + if c.ResourceClasses != nil { + if c.ResourceClasses.ConfigMapName == "" { + return errors.New("resourceClasses.configMapName is required when resourceClasses config is present") + } + if os.Getenv("POD_NAMESPACE") == "" { + return errors.New("pod namespace (POD_NAMESPACE) is required when resourceClasses config is present") + } + } if c.Auth != nil && c.KeystoneURL == "" { return errors.New("keystoneURL is required when auth is configured") } @@ -445,14 +468,25 @@ func (s *Shim) Start(ctx context.Context) error { s.resourceLocker, )) } + if s.config.ResourceClasses != nil { + s.syncers = append(s.syncers, NewResourceClassSyncer( + s.Client, + s.config.ResourceClasses.ConfigMapName, + os.Getenv("POD_NAMESPACE"), + s.placementServiceClient, + s.resourceLocker, + )) + } for _, syncer := range s.syncers { if err := syncer.Init(ctx); err != nil { return err } } traitsMode := s.config.Features.Traits.orDefault() + rcMode := s.config.Features.ResourceClasses.orDefault() for _, syncer := range s.syncers { - if traitsMode == FeatureModeHybrid || traitsMode == FeatureModePassthrough { + if traitsMode == FeatureModeHybrid || traitsMode == FeatureModePassthrough || + rcMode == FeatureModeHybrid || rcMode == FeatureModePassthrough { go syncer.Run(ctx) } } diff --git a/internal/shim/placement/shim_test.go b/internal/shim/placement/shim_test.go index ffc31e954..46805676a 100644 --- a/internal/shim/placement/shim_test.go +++ b/internal/shim/placement/shim_test.go @@ -83,6 +83,18 @@ func serveHandler(t *testing.T, method, pattern string, handler http.HandlerFunc return w } +// serveHandlerWithBody is like serveHandler but allows providing a request body. +func serveHandlerWithBody(t *testing.T, method, pattern string, handler http.HandlerFunc, reqPath string, body io.Reader) *httptest.ResponseRecorder { + t.Helper() + mux := http.NewServeMux() + mux.HandleFunc(method+" "+pattern, handler) + req := httptest.NewRequest(method, reqPath, body) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + return w +} + func TestForward(t *testing.T) { tests := []struct { name string @@ -482,6 +494,30 @@ func TestConfigValidateTraitsCRDRequiresConfig(t *testing.T) { } } +func TestConfigValidateResourceClassesCRDRequiresConfig(t *testing.T) { + t.Setenv("POD_NAMESPACE", "") + + c := config{ + PlacementURL: "http://placement:8778", + Features: featuresConfig{ResourceClasses: FeatureModeCRD}, + } + if err := c.validate(); err == nil { + t.Fatal("expected error when resourceClasses mode is crd without config") + } + c.ResourceClasses = &resourceClassesConfig{} + if err := c.validate(); err == nil { + t.Fatal("expected error when resourceClasses.configMapName is empty") + } + c.ResourceClasses.ConfigMapName = "cortex-placement-shim-resource-classes" + if err := c.validate(); err == nil { + t.Fatal("expected error when POD_NAMESPACE is not set") + } + t.Setenv("POD_NAMESPACE", "default") + if err := c.validate(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + func TestWrapHandlerWithAuth(t *testing.T) { upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) diff --git a/internal/shim/placement/syncer_resource_classes.go b/internal/shim/placement/syncer_resource_classes.go new file mode 100644 index 000000000..526d2a085 --- /dev/null +++ b/internal/shim/placement/syncer_resource_classes.go @@ -0,0 +1,184 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package placement + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "net/url" + "os" + "time" + + "github.com/cobaltcore-dev/cortex/pkg/resourcelock" + "github.com/gophercloud/gophercloud/v2" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const configMapKeyResourceClasses = "resource_classes" + +// resourceClassesListResponse matches the OpenStack Placement GET /resource_classes response. +type resourceClassesListResponse struct { + ResourceClasses []resourceClassEntry `json:"resource_classes"` +} + +type resourceClassEntry struct { + Name string `json:"name"` +} + +// ResourceClassSyncer manages the lifecycle of the resource classes ConfigMap. +// It creates the ConfigMap on startup and periodically syncs from upstream. +type ResourceClassSyncer struct { + client client.Client + configMapName string + namespace string + placementClient *gophercloud.ServiceClient + resourceLocker *resourcelock.ResourceLocker +} + +func NewResourceClassSyncer( + cl client.Client, + configMapName string, + namespace string, + placementClient *gophercloud.ServiceClient, + resourceLocker *resourcelock.ResourceLocker, +) *ResourceClassSyncer { + + return &ResourceClassSyncer{ + client: cl, + configMapName: configMapName, + namespace: namespace, + placementClient: placementClient, + resourceLocker: resourceLocker, + } +} + +// Init creates the resource classes ConfigMap if it does not already exist. +func (rs *ResourceClassSyncer) Init(ctx context.Context) error { + log := ctrl.Log.WithName("placement-shim").WithName("resource-class-syncer") + cm := &corev1.ConfigMap{} + key := client.ObjectKey{Namespace: rs.namespace, Name: rs.configMapName} + err := rs.client.Get(ctx, key, cm) + if err == nil { + log.Info("Resource classes ConfigMap already exists", "name", rs.configMapName) + return nil + } + if !apierrors.IsNotFound(err) { + return fmt.Errorf("checking resource classes configmap: %w", err) + } + cm = &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: rs.configMapName, + Namespace: rs.namespace, + }, + Data: map[string]string{configMapKeyResourceClasses: "[]"}, + } + if err := rs.client.Create(ctx, cm); err != nil { + if apierrors.IsAlreadyExists(err) { + log.Info("Resource classes ConfigMap was created concurrently", "name", rs.configMapName) + return nil + } + return fmt.Errorf("creating resource classes configmap: %w", err) + } + log.Info("Created resource classes ConfigMap", "name", rs.configMapName) + return nil +} + +// Run starts the periodic background sync from upstream placement. +// Blocks until ctx is cancelled. +func (rs *ResourceClassSyncer) Run(ctx context.Context) { + log := ctrl.Log.WithName("placement-shim").WithName("resource-class-syncer") + if rs.placementClient == nil { + log.Info("No placement service client configured, resource class sync loop will not run") + return + } + + jitter := time.Duration(rand.Int63n(int64(30 * time.Second))) //nolint:gosec + log.Info("Starting resource class sync loop", "jitter", jitter) + + select { + case <-ctx.Done(): + return + case <-time.After(jitter): + } + + rs.sync(ctx) + + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + rs.sync(ctx) + } + } +} + +// sync fetches GET /resource_classes from upstream placement and writes the +// result into the ConfigMap under the resource lock. +func (rs *ResourceClassSyncer) sync(ctx context.Context) { + log := ctrl.Log.WithName("placement-shim").WithName("resource-class-syncer") + u, err := url.JoinPath(rs.placementClient.Endpoint, "/resource_classes") + if err != nil { + log.Error(err, "Failed to build upstream resource classes URL") + return + } + resp, err := rs.placementClient.Request(ctx, http.MethodGet, u, &gophercloud.RequestOpts{ + OkCodes: []int{http.StatusOK}, + MoreHeaders: map[string]string{ + "OpenStack-API-Version": "placement 1.7", + }, + KeepResponseBody: true, + }) + if err != nil { + log.Info("Upstream resource class sync failed", "error", err.Error()) + return + } + defer resp.Body.Close() + var body resourceClassesListResponse + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + log.Error(err, "Failed to decode upstream resource class list") + return + } + + host, _ := os.Hostname() //nolint:errcheck + lockerID := fmt.Sprintf("syncer-%s-%d", host, time.Now().UnixNano()) + lockName := rs.configMapName + "-lock" + if err := rs.resourceLocker.AcquireLock(ctx, lockName, lockerID); err != nil { + log.Error(err, "Failed to acquire lock for resource class sync") + return + } + defer func() { + releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = rs.resourceLocker.ReleaseLock(releaseCtx, lockName, lockerID) //nolint:errcheck + }() + + cm := &corev1.ConfigMap{} + if err := rs.client.Get(ctx, client.ObjectKey{Namespace: rs.namespace, Name: rs.configMapName}, cm); err != nil { + log.Error(err, "Failed to get resource classes ConfigMap for sync") + return + } + rcSet := make(map[string]struct{}, len(body.ResourceClasses)) + for _, rc := range body.ResourceClasses { + rcSet[rc.Name] = struct{}{} + } + if err := writeResourceClassesToConfigMap(cm, rcSet); err != nil { + log.Error(err, "Failed to serialize synced resource classes") + return + } + if err := rs.client.Update(ctx, cm); err != nil { + log.Error(err, "Failed to update resource classes ConfigMap with upstream data") + return + } + log.Info("Synced resource classes from upstream placement", "count", len(body.ResourceClasses)) +} diff --git a/internal/shim/placement/syncer_resource_classes_test.go b/internal/shim/placement/syncer_resource_classes_test.go new file mode 100644 index 000000000..83d4d3ad0 --- /dev/null +++ b/internal/shim/placement/syncer_resource_classes_test.go @@ -0,0 +1,149 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package placement + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/cobaltcore-dev/cortex/pkg/resourcelock" + "github.com/gophercloud/gophercloud/v2" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestResourceClassSyncerInitCreatesConfigMap(t *testing.T) { + cl := newFakeClientWithScheme(t) + rs := NewResourceClassSyncer(cl, "test-rc", "default", nil, resourcelock.NewResourceLocker(cl, "default")) + + if err := rs.Init(context.Background()); err != nil { + t.Fatalf("Init: %v", err) + } + + cm := &corev1.ConfigMap{} + if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-rc"}, cm); err != nil { + t.Fatalf("get ConfigMap: %v", err) + } + if cm.Data[configMapKeyResourceClasses] != "[]" { + t.Fatalf("expected empty resource classes array, got %q", cm.Data[configMapKeyResourceClasses]) + } +} + +func TestResourceClassSyncerInitIdempotent(t *testing.T) { + existing := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test-rc", Namespace: "default"}, + Data: map[string]string{configMapKeyResourceClasses: `["CUSTOM_EXISTING"]`}, + } + cl := newFakeClientWithScheme(t, existing) + rs := NewResourceClassSyncer(cl, "test-rc", "default", nil, resourcelock.NewResourceLocker(cl, "default")) + + if err := rs.Init(context.Background()); err != nil { + t.Fatalf("Init: %v", err) + } + + cm := &corev1.ConfigMap{} + if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-rc"}, cm); err != nil { + t.Fatalf("get ConfigMap: %v", err) + } + if cm.Data[configMapKeyResourceClasses] != `["CUSTOM_EXISTING"]` { + t.Fatalf("Init overwrote existing data: got %q", cm.Data[configMapKeyResourceClasses]) + } +} + +func TestResourceClassSyncerRunNoClient(t *testing.T) { + cl := newFakeClientWithScheme(t) + rs := NewResourceClassSyncer(cl, "test-rc", "default", nil, resourcelock.NewResourceLocker(cl, "default")) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + rs.Run(ctx) +} + +func TestResourceClassSyncerSyncWritesUpstreamClasses(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/resource_classes" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(resourceClassesListResponse{ + ResourceClasses: []resourceClassEntry{ + {Name: "VCPU"}, + {Name: "MEMORY_MB"}, + {Name: "CUSTOM_SYNCED"}, + }, + }); err != nil { + t.Errorf("encode response: %v", err) + } + })) + t.Cleanup(upstream.Close) + + existing := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test-rc", Namespace: "default"}, + Data: map[string]string{configMapKeyResourceClasses: "[]"}, + } + cl := newFakeClientWithScheme(t, existing) + + sc := &gophercloud.ServiceClient{ + ProviderClient: &gophercloud.ProviderClient{}, + Endpoint: upstream.URL, + } + sc.HTTPClient = *upstream.Client() + + rs := NewResourceClassSyncer(cl, "test-rc", "default", sc, resourcelock.NewResourceLocker(cl, "default")) + rs.sync(context.Background()) + + cm := &corev1.ConfigMap{} + if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-rc"}, cm); err != nil { + t.Fatalf("get ConfigMap: %v", err) + } + + var classes []string + if err := json.Unmarshal([]byte(cm.Data[configMapKeyResourceClasses]), &classes); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(classes) != 3 { + t.Fatalf("expected 3 classes, got %d: %v", len(classes), classes) + } + want := map[string]bool{"VCPU": true, "MEMORY_MB": true, "CUSTOM_SYNCED": true} + for _, c := range classes { + if !want[c] { + t.Errorf("unexpected class: %s", c) + } + } +} + +func TestResourceClassSyncerSyncUpstreamError(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + t.Cleanup(upstream.Close) + + existing := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test-rc", Namespace: "default"}, + Data: map[string]string{configMapKeyResourceClasses: `["CUSTOM_ORIGINAL"]`}, + } + cl := newFakeClientWithScheme(t, existing) + + sc := &gophercloud.ServiceClient{ + ProviderClient: &gophercloud.ProviderClient{}, + Endpoint: upstream.URL, + } + sc.HTTPClient = *upstream.Client() + + rs := NewResourceClassSyncer(cl, "test-rc", "default", sc, resourcelock.NewResourceLocker(cl, "default")) + rs.sync(context.Background()) + + cm := &corev1.ConfigMap{} + if err := cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "test-rc"}, cm); err != nil { + t.Fatalf("get ConfigMap: %v", err) + } + if cm.Data[configMapKeyResourceClasses] != `["CUSTOM_ORIGINAL"]` { + t.Fatalf("sync should not have modified ConfigMap on error, got %q", cm.Data[configMapKeyResourceClasses]) + } +} From d3d8928b5717aa8dbd0bb2b7946d60c43b116f11 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Wed, 29 Apr 2026 12:16:00 +0200 Subject: [PATCH 2/4] Replace //nolint:errcheck with proper error handling Check and log errors from os.Hostname, ReleaseLock, io.Copy, io.ReadAll, and w.Write in both traits and resource classes handlers/syncers. --- .../shim/placement/handle_resource_classes.go | 26 ++++++++++++++----- internal/shim/placement/handle_traits.go | 17 +++++++++--- .../shim/placement/syncer_resource_classes.go | 10 +++++-- internal/shim/placement/syncer_traits.go | 10 +++++-- 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/internal/shim/placement/handle_resource_classes.go b/internal/shim/placement/handle_resource_classes.go index 2ae5ae049..d3fb84b95 100644 --- a/internal/shim/placement/handle_resource_classes.go +++ b/internal/shim/placement/handle_resource_classes.go @@ -17,6 +17,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -127,14 +128,19 @@ func (s *Shim) handleCreateResourceClassHybrid(w http.ResponseWriter, r *http.Re log := logf.FromContext(ctx) s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { - body, _ := io.ReadAll(resp.Body) //nolint:errcheck + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Error(err, "hybrid: failed to read upstream response body") + } for k, vs := range resp.Header { for _, v := range vs { w.Header().Add(k, v) } } w.WriteHeader(resp.StatusCode) - w.Write(body) //nolint:errcheck + if _, err := w.Write(body); err != nil { + log.Error(err, "hybrid: failed to write response body") + } if resp.StatusCode == http.StatusCreated { var created struct { @@ -260,7 +266,9 @@ func (s *Shim) handleUpdateResourceClassHybrid(w http.ResponseWriter, r *http.Re } w.WriteHeader(resp.StatusCode) if resp.Body != nil { - io.Copy(w, resp.Body) //nolint:errcheck + if _, err := io.Copy(w, resp.Body); err != nil { + log.Error(err, "hybrid: failed to copy upstream response body") + } } if resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusNoContent { @@ -342,7 +350,9 @@ func (s *Shim) handleDeleteResourceClassHybrid(w http.ResponseWriter, r *http.Re } w.WriteHeader(resp.StatusCode) if resp.Body != nil { - io.Copy(w, resp.Body) //nolint:errcheck + if _, err := io.Copy(w, resp.Body); err != nil { + log.Error(err, "hybrid: failed to copy upstream response body") + } } if resp.StatusCode == http.StatusNoContent { @@ -430,7 +440,9 @@ func (s *Shim) addResourceClassToConfigMap(ctx context.Context, name string) (bo defer func() { releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _ = s.resourceLocker.ReleaseLock(releaseCtx, s.config.ResourceClasses.ConfigMapName+"-lock", lockerID) //nolint:errcheck + if err := s.resourceLocker.ReleaseLock(releaseCtx, s.config.ResourceClasses.ConfigMapName+"-lock", lockerID); err != nil { + ctrl.Log.WithName("placement-shim").Error(err, "failed to release resource classes lock") + } }() cm := &corev1.ConfigMap{} @@ -487,7 +499,9 @@ func (s *Shim) removeResourceClassFromConfigMap(ctx context.Context, name string defer func() { releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _ = s.resourceLocker.ReleaseLock(releaseCtx, s.config.ResourceClasses.ConfigMapName+"-lock", lockerID) //nolint:errcheck + if err := s.resourceLocker.ReleaseLock(releaseCtx, s.config.ResourceClasses.ConfigMapName+"-lock", lockerID); err != nil { + ctrl.Log.WithName("placement-shim").Error(err, "failed to release resource classes lock") + } }() cm := &corev1.ConfigMap{} diff --git a/internal/shim/placement/handle_traits.go b/internal/shim/placement/handle_traits.go index d815b9d55..0cd4e659e 100644 --- a/internal/shim/placement/handle_traits.go +++ b/internal/shim/placement/handle_traits.go @@ -17,6 +17,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -211,7 +212,9 @@ func (s *Shim) handleUpdateTraitHybrid(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(resp.StatusCode) if resp.Body != nil { - io.Copy(w, resp.Body) //nolint:errcheck + if _, err := io.Copy(w, resp.Body); err != nil { + log.Error(err, "hybrid: failed to copy upstream response body") + } } if resp.StatusCode == http.StatusCreated || resp.StatusCode == http.StatusNoContent { @@ -293,7 +296,9 @@ func (s *Shim) handleDeleteTraitHybrid(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(resp.StatusCode) if resp.Body != nil { - io.Copy(w, resp.Body) //nolint:errcheck + if _, err := io.Copy(w, resp.Body); err != nil { + log.Error(err, "hybrid: failed to copy upstream response body") + } } if resp.StatusCode == http.StatusNoContent { @@ -382,7 +387,9 @@ func (s *Shim) addTraitToConfigMap(ctx context.Context, name string) (bool, erro defer func() { releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _ = s.resourceLocker.ReleaseLock(releaseCtx, s.config.Traits.ConfigMapName+"-lock", lockerID) //nolint:errcheck + if err := s.resourceLocker.ReleaseLock(releaseCtx, s.config.Traits.ConfigMapName+"-lock", lockerID); err != nil { + ctrl.Log.WithName("placement-shim").Error(err, "failed to release traits lock") + } }() cm := &corev1.ConfigMap{} @@ -439,7 +446,9 @@ func (s *Shim) removeTraitFromConfigMap(ctx context.Context, name string) (bool, defer func() { releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _ = s.resourceLocker.ReleaseLock(releaseCtx, s.config.Traits.ConfigMapName+"-lock", lockerID) //nolint:errcheck + if err := s.resourceLocker.ReleaseLock(releaseCtx, s.config.Traits.ConfigMapName+"-lock", lockerID); err != nil { + ctrl.Log.WithName("placement-shim").Error(err, "failed to release traits lock") + } }() cm := &corev1.ConfigMap{} diff --git a/internal/shim/placement/syncer_resource_classes.go b/internal/shim/placement/syncer_resource_classes.go index 526d2a085..3a27608a9 100644 --- a/internal/shim/placement/syncer_resource_classes.go +++ b/internal/shim/placement/syncer_resource_classes.go @@ -150,7 +150,11 @@ func (rs *ResourceClassSyncer) sync(ctx context.Context) { return } - host, _ := os.Hostname() //nolint:errcheck + host, err := os.Hostname() + if err != nil { + log.Error(err, "Failed to get hostname for resource class sync lock") + return + } lockerID := fmt.Sprintf("syncer-%s-%d", host, time.Now().UnixNano()) lockName := rs.configMapName + "-lock" if err := rs.resourceLocker.AcquireLock(ctx, lockName, lockerID); err != nil { @@ -160,7 +164,9 @@ func (rs *ResourceClassSyncer) sync(ctx context.Context) { defer func() { releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _ = rs.resourceLocker.ReleaseLock(releaseCtx, lockName, lockerID) //nolint:errcheck + if err := rs.resourceLocker.ReleaseLock(releaseCtx, lockName, lockerID); err != nil { + log.Error(err, "Failed to release lock after resource class sync") + } }() cm := &corev1.ConfigMap{} diff --git a/internal/shim/placement/syncer_traits.go b/internal/shim/placement/syncer_traits.go index d8067e1f3..f9748d66d 100644 --- a/internal/shim/placement/syncer_traits.go +++ b/internal/shim/placement/syncer_traits.go @@ -139,7 +139,11 @@ func (ts *TraitSyncer) sync(ctx context.Context) { return } - host, _ := os.Hostname() //nolint:errcheck + host, err := os.Hostname() + if err != nil { + log.Error(err, "Failed to get hostname for trait sync lock") + return + } lockerID := fmt.Sprintf("syncer-%s-%d", host, time.Now().UnixNano()) lockName := ts.configMapName + "-lock" if err := ts.resourceLocker.AcquireLock(ctx, lockName, lockerID); err != nil { @@ -149,7 +153,9 @@ func (ts *TraitSyncer) sync(ctx context.Context) { defer func() { releaseCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _ = ts.resourceLocker.ReleaseLock(releaseCtx, lockName, lockerID) //nolint:errcheck + if err := ts.resourceLocker.ReleaseLock(releaseCtx, lockName, lockerID); err != nil { + log.Error(err, "Failed to release lock after trait sync") + } }() cm := &corev1.ConfigMap{} From 3f95c01dd45d5d80a6d6ef04e0b5779a84982fc9 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Wed, 29 Apr 2026 12:28:47 +0200 Subject: [PATCH 3/4] Add response links, hybrid tests, and always-start sync Add OpenStack-compliant links array to resource class list/show responses. Add hybrid mode unit tests verifying local ConfigMap updates on upstream success and no-op on upstream failure. Unconditionally start all syncer goroutines regardless of configured mode so the ConfigMap stays warm for header overrides. --- .../shim/placement/handle_resource_classes.go | 10 ++- .../placement/handle_resource_classes_test.go | 89 +++++++++++++++++++ internal/shim/placement/shim.go | 7 +- .../shim/placement/syncer_resource_classes.go | 8 +- 4 files changed, 105 insertions(+), 9 deletions(-) diff --git a/internal/shim/placement/handle_resource_classes.go b/internal/shim/placement/handle_resource_classes.go index d3fb84b95..9a3a9d4c2 100644 --- a/internal/shim/placement/handle_resource_classes.go +++ b/internal/shim/placement/handle_resource_classes.go @@ -54,7 +54,10 @@ func (s *Shim) HandleListResourceClasses(w http.ResponseWriter, r *http.Request) entries := make([]resourceClassEntry, 0, len(rcSet)) for name := range rcSet { - entries = append(entries, resourceClassEntry{Name: name}) + entries = append(entries, resourceClassEntry{ + Name: name, + Links: []resourceClassLink{{Rel: "self", Href: "/resource_classes/" + name}}, + }) } sort.Slice(entries, func(i, j int) bool { return entries[i].Name < entries[j].Name }) @@ -194,7 +197,10 @@ func (s *Shim) HandleShowResourceClass(w http.ResponseWriter, r *http.Request) { return } log.Info("resource class found", "class", name) - s.writeJSON(w, http.StatusOK, resourceClassEntry{Name: name}) + s.writeJSON(w, http.StatusOK, resourceClassEntry{ + Name: name, + Links: []resourceClassLink{{Rel: "self", Href: "/resource_classes/" + name}}, + }) } // HandleUpdateResourceClass handles PUT /resource_classes/{name} requests. diff --git a/internal/shim/placement/handle_resource_classes_test.go b/internal/shim/placement/handle_resource_classes_test.go index 37e46cac1..d3f1c4ee1 100644 --- a/internal/shim/placement/handle_resource_classes_test.go +++ b/internal/shim/placement/handle_resource_classes_test.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "net/http" + "net/http/httptest" "testing" "github.com/cobaltcore-dev/cortex/pkg/resourcelock" @@ -118,6 +119,9 @@ func TestHandleListResourceClassesLocal(t *testing.T) { if rc.Name != want[i] { t.Errorf("class[%d] = %q, want %q", i, rc.Name, want[i]) } + if len(rc.Links) != 1 || rc.Links[0].Rel != "self" || rc.Links[0].Href != "/resource_classes/"+rc.Name { + t.Errorf("class[%d] links = %v, want self link", i, rc.Links) + } } } @@ -232,3 +236,88 @@ func TestHandleDeleteResourceClassLocalBadPrefix(t *testing.T) { t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) } } + +// --- Hybrid mode tests --- + +func newHybridResourceClassShim(t *testing.T, upstreamStatus int, upstreamBody string, classes []string) *Shim { + t.Helper() + t.Setenv("POD_NAMESPACE", "default") + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(upstreamStatus) + if upstreamBody != "" { + if _, err := w.Write([]byte(upstreamBody)); err != nil { + t.Errorf("failed to write upstream body: %v", err) + } + } + })) + t.Cleanup(upstream.Close) + objs := []client.Object{newTestResourceClassConfigMap("default", "test-rc-cm", classes)} + cl := newFakeClientWithScheme(t, objs...) + down, up := newTestTimers() + return &Shim{ + Client: cl, + config: config{ + PlacementURL: upstream.URL, + Features: featuresConfig{ResourceClasses: FeatureModeHybrid}, + ResourceClasses: &resourceClassesConfig{ConfigMapName: "test-rc-cm"}, + }, + httpClient: upstream.Client(), + maxBodyLogSize: 4096, + downstreamRequestTimer: down, + upstreamRequestTimer: up, + resourceLocker: resourcelock.NewResourceLocker(cl, "default"), + } +} + +func TestHandleListResourceClassesHybridForwards(t *testing.T) { + s := newHybridResourceClassShim(t, http.StatusOK, `{"resource_classes":[{"name":"VCPU"}]}`, nil) + w := serveHandler(t, "GET", "/resource_classes", s.HandleListResourceClasses, "/resource_classes") + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } +} + +func TestHandleUpdateResourceClassHybridUpdatesLocal(t *testing.T) { + s := newHybridResourceClassShim(t, http.StatusCreated, "", nil) + w := serveHandler(t, "PUT", "/resource_classes/{name}", s.HandleUpdateResourceClass, "/resource_classes/CUSTOM_HYB") + if w.Code != http.StatusCreated { + t.Fatalf("status = %d, want %d", w.Code, http.StatusCreated) + } + found, err := s.hasResourceClass(context.Background(), "CUSTOM_HYB") + if err != nil { + t.Fatalf("hasResourceClass: %v", err) + } + if !found { + t.Error("expected resource class to be added to local configmap in hybrid mode") + } +} + +func TestHandleDeleteResourceClassHybridUpdatesLocal(t *testing.T) { + s := newHybridResourceClassShim(t, http.StatusNoContent, "", []string{"CUSTOM_DEL"}) + w := serveHandler(t, "DELETE", "/resource_classes/{name}", s.HandleDeleteResourceClass, "/resource_classes/CUSTOM_DEL") + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d", w.Code, http.StatusNoContent) + } + found, err := s.hasResourceClass(context.Background(), "CUSTOM_DEL") + if err != nil { + t.Fatalf("hasResourceClass: %v", err) + } + if found { + t.Error("expected resource class to be removed from local configmap in hybrid mode") + } +} + +func TestHandleUpdateResourceClassHybridUpstreamFailure(t *testing.T) { + s := newHybridResourceClassShim(t, http.StatusInternalServerError, "upstream error", nil) + w := serveHandler(t, "PUT", "/resource_classes/{name}", s.HandleUpdateResourceClass, "/resource_classes/CUSTOM_FAIL") + if w.Code != http.StatusInternalServerError { + t.Fatalf("status = %d, want %d", w.Code, http.StatusInternalServerError) + } + found, err := s.hasResourceClass(context.Background(), "CUSTOM_FAIL") + if err != nil { + t.Fatalf("hasResourceClass: %v", err) + } + if found { + t.Error("expected resource class NOT to be added when upstream fails") + } +} diff --git a/internal/shim/placement/shim.go b/internal/shim/placement/shim.go index 09777defe..c06bc9c60 100644 --- a/internal/shim/placement/shim.go +++ b/internal/shim/placement/shim.go @@ -482,13 +482,8 @@ func (s *Shim) Start(ctx context.Context) error { return err } } - traitsMode := s.config.Features.Traits.orDefault() - rcMode := s.config.Features.ResourceClasses.orDefault() for _, syncer := range s.syncers { - if traitsMode == FeatureModeHybrid || traitsMode == FeatureModePassthrough || - rcMode == FeatureModeHybrid || rcMode == FeatureModePassthrough { - go syncer.Run(ctx) - } + go syncer.Run(ctx) } return nil } diff --git a/internal/shim/placement/syncer_resource_classes.go b/internal/shim/placement/syncer_resource_classes.go index 3a27608a9..a6aa854b5 100644 --- a/internal/shim/placement/syncer_resource_classes.go +++ b/internal/shim/placement/syncer_resource_classes.go @@ -30,7 +30,13 @@ type resourceClassesListResponse struct { } type resourceClassEntry struct { - Name string `json:"name"` + Name string `json:"name"` + Links []resourceClassLink `json:"links,omitempty"` +} + +type resourceClassLink struct { + Rel string `json:"rel"` + Href string `json:"href"` } // ResourceClassSyncer manages the lifecycle of the resource classes ConfigMap. From 4a497137c3f19beedd356ac18c70aaa7ef373220 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Wed, 29 Apr 2026 12:32:58 +0200 Subject: [PATCH 4/4] Skip syncer Run() in CRD mode (no upstream to sync from) In CRD-only mode, there is no upstream placement service to sync from, so starting the background sync goroutine is unnecessary. Gate syncer.Run() on mode != CRD and remove the now-unused syncers slice field from the Shim struct. --- internal/shim/placement/shim.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/internal/shim/placement/shim.go b/internal/shim/placement/shim.go index c06bc9c60..53de85056 100644 --- a/internal/shim/placement/shim.go +++ b/internal/shim/placement/shim.go @@ -334,9 +334,6 @@ type Shim struct { // placement with automatic token management (including reauth on 401). // Nil when Keystone credentials are not configured. placementServiceClient *gophercloud.ServiceClient - // syncers are background workers that manage ConfigMap-backed local - // stores (e.g. traits, resource classes). Started uniformly in Start. - syncers []Syncer } // Describe implements prometheus.Collector. @@ -460,30 +457,34 @@ func (s *Shim) Start(ctx context.Context) error { return err } if s.config.Traits != nil { - s.syncers = append(s.syncers, NewTraitSyncer( + ts := NewTraitSyncer( s.Client, s.config.Traits.ConfigMapName, os.Getenv("POD_NAMESPACE"), s.placementServiceClient, s.resourceLocker, - )) + ) + if err := ts.Init(ctx); err != nil { + return err + } + if s.config.Features.Traits.orDefault() != FeatureModeCRD { + go ts.Run(ctx) + } } if s.config.ResourceClasses != nil { - s.syncers = append(s.syncers, NewResourceClassSyncer( + rs := NewResourceClassSyncer( s.Client, s.config.ResourceClasses.ConfigMapName, os.Getenv("POD_NAMESPACE"), s.placementServiceClient, s.resourceLocker, - )) - } - for _, syncer := range s.syncers { - if err := syncer.Init(ctx); err != nil { + ) + if err := rs.Init(ctx); err != nil { return err } - } - for _, syncer := range s.syncers { - go syncer.Run(ctx) + if s.config.Features.ResourceClasses.orDefault() != FeatureModeCRD { + go rs.Run(ctx) + } } return nil }