From 3937144bfca29f2fe3c827e453e429ce8e0b3b3d Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Mon, 20 Apr 2026 16:43:08 +0200 Subject: [PATCH 1/5] Rough implementation to check if this works --- .../placement/handle_resource_providers.go | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/internal/shim/placement/handle_resource_providers.go b/internal/shim/placement/handle_resource_providers.go index 51a4966dc..e715b86ad 100644 --- a/internal/shim/placement/handle_resource_providers.go +++ b/internal/shim/placement/handle_resource_providers.go @@ -4,7 +4,13 @@ package placement import ( + "encoding/json" "net/http" + + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" ) // HandleListResourceProviders handles GET /resource_providers requests. @@ -49,10 +55,40 @@ func (s *Shim) HandleCreateResourceProvider(w http.ResponseWriter, r *http.Reque // provider's position in a hierarchical tree. Returns 404 if the provider // does not exist. func (s *Shim) HandleShowResourceProvider(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredUUIDPathParam(w, r, "uuid"); !ok { + ctx := r.Context() + log := logf.FromContext(ctx) + uuid, ok := requiredUUIDPathParam(w, r, "uuid") + if !ok { + return + } + // Try to find the hypervisor in kubernetes. + var hv hv1.HypervisorList + err := s.List(ctx, &hv, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if apierrors.IsNotFound(err) || len(hv.Items) == 0 { + // Forward the request to placement if the hypervisor doesn't exist. + s.forward(w, r) + return + } + if err != nil { + // Something else is wrong. + log.Error(err, "failed to list hypervisors with OpenStack ID index") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if len(hv.Items) > 1 { + log.Error(nil, "multiple hypervisors found with the same OpenStack ID", "uuid", uuid) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + // TODO: For now, just dump the hypervisor's status as JSON, but eventually we may want to transform it. + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + jsonEncoder := json.NewEncoder(w) + if err := jsonEncoder.Encode(hv.Items[0].Status); err != nil { + log.Error(err, "failed to encode hypervisor status as JSON") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } - s.forward(w, r) } // HandleUpdateResourceProvider handles PUT /resource_providers/{uuid} requests. From 8157cfcbd855932e0956d83f5cd0d9dd54e782a5 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Wed, 22 Apr 2026 11:23:26 +0200 Subject: [PATCH 2/5] Implement all endpoints --- internal/shim/placement/field_index.go | 20 - internal/shim/placement/field_index_test.go | 58 -- .../placement/handle_resource_providers.go | 614 ++++++++++++- .../handle_resource_providers_test.go | 866 +++++++++++++++++- internal/shim/placement/shim.go | 27 +- internal/shim/placement/shim_io.go | 11 + internal/shim/placement/shim_io_test.go | 36 + internal/shim/placement/shim_test.go | 6 +- 8 files changed, 1484 insertions(+), 154 deletions(-) diff --git a/internal/shim/placement/field_index.go b/internal/shim/placement/field_index.go index 5a7ed04b4..6dd678ec1 100644 --- a/internal/shim/placement/field_index.go +++ b/internal/shim/placement/field_index.go @@ -19,9 +19,6 @@ const ( // of the hypervisor in OpenStack. This also corresponds to the uuid of the // resource provider in OpenStack Placement. idxHypervisorOpenStackId = "status.hypervisorId" - // idxStatusAggUUIDs is the name of the index for looking up hypervisors - // by the UUIDs of the aggregates they belong to. - idxStatusAggUUIDs = "status.aggregates[*].uuid" // idxHypervisorKubernetesId is the name of the index for looking up // hypervisors by their uid in Kubernetes. idxHypervisorKubernetesId = "metadata.uid" @@ -80,22 +77,5 @@ func indexFields(ctx context.Context, mcl *multicluster.Client) error { } log.Info("Successfully set up index for hypervisor name") - if err := mcl.IndexField(ctx, h, hl, idxStatusAggUUIDs, func(obj client.Object) []string { - hv, ok := obj.(*hv1.Hypervisor) - if !ok { - log.Error(errors.New("unexpected type"), "object", obj) - return nil - } - var uuids []string - for _, agg := range hv.Status.Aggregates { - uuids = append(uuids, agg.UUID) - } - return uuids - }); err != nil { - log.Error(err, "failed to set up index for hypervisor aggregates") - return err - } - log.Info("Successfully set up index for hypervisor aggregates") - return nil } diff --git a/internal/shim/placement/field_index_test.go b/internal/shim/placement/field_index_test.go index ad0618c1b..f9721601e 100644 --- a/internal/shim/placement/field_index_test.go +++ b/internal/shim/placement/field_index_test.go @@ -118,7 +118,6 @@ func TestIndexFields_RegistersAllIndexes(t *testing.T) { idxHypervisorOpenStackId, idxHypervisorKubernetesId, idxHypervisorName, - idxStatusAggUUIDs, } if len(cc.calls) != len(wantFields) { t.Fatalf("got %d IndexField calls, want %d", len(cc.calls), len(wantFields)) @@ -271,63 +270,6 @@ func TestExtractor_HypervisorName(t *testing.T) { } } -func TestExtractor_StatusAggUUIDs(t *testing.T) { - cc := &captureCache{} - mcl := buildClient(t, cc) - if err := indexFields(context.Background(), mcl); err != nil { - t.Fatalf("indexFields: %v", err) - } - fn := extractorByField(t, cc.calls, idxStatusAggUUIDs) - - tests := []struct { - name string - obj client.Object - want []string - }{ - { - name: "multiple aggregates", - obj: &hv1.Hypervisor{ - Status: hv1.HypervisorStatus{ - Aggregates: []hv1.Aggregate{ - {Name: "agg-a", UUID: "uuid-a"}, - {Name: "agg-b", UUID: "uuid-b"}, - }, - }, - }, - want: []string{"uuid-a", "uuid-b"}, - }, - { - name: "single aggregate", - obj: &hv1.Hypervisor{ - Status: hv1.HypervisorStatus{ - Aggregates: []hv1.Aggregate{ - {Name: "agg-x", UUID: "uuid-x"}, - }, - }, - }, - want: []string{"uuid-x"}, - }, - { - name: "no aggregates", - obj: &hv1.Hypervisor{}, - want: nil, - }, - { - name: "wrong type", - obj: &corev1.ConfigMap{}, - want: nil, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := fn(tt.obj) - if !strSliceEqual(got, tt.want) { - t.Errorf("got %v, want %v", got, tt.want) - } - }) - } -} - func strSliceEqual(a, b []string) bool { if len(a) == 0 && len(b) == 0 { return true diff --git a/internal/shim/placement/handle_resource_providers.go b/internal/shim/placement/handle_resource_providers.go index e715b86ad..44a0a316a 100644 --- a/internal/shim/placement/handle_resource_providers.go +++ b/internal/shim/placement/handle_resource_providers.go @@ -4,30 +4,97 @@ package placement import ( + "bytes" + "context" "encoding/json" + "fmt" + "io" "net/http" + "strconv" + "strings" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) -// HandleListResourceProviders handles GET /resource_providers requests. -// -// Returns a filtered list of resource providers. Resource providers are -// entities that provide consumable inventory of one or more classes of -// resources (e.g. a compute node providing VCPU, MEMORY_MB, DISK_GB). -// -// Supports numerous filter parameters including name, uuid, member_of -// (aggregate membership), resources (capacity filtering), in_tree (provider -// tree membership), and required (trait filtering). Multiple filters are -// combined with boolean AND logic. Many of these filters were added in later -// microversions: resources filtering at 1.3, tree queries at 1.14, trait -// requirements at 1.18, forbidden traits at 1.22, forbidden aggregates at -// 1.32, and the in: syntax for required at 1.39. -func (s *Shim) HandleListResourceProviders(w http.ResponseWriter, r *http.Request) { - s.forward(w, r) +// resourceProvider appears in the body of a successful +// /resource_providers/{uuid} response. +type resourceProvider struct { + // A consistent view marker that assists with the management of concurrent + // resource provider updates. + Generation int64 `json:"generation"` + // The uuid of a resource provider. + UUID string `json:"uuid"` + // A list of links associated with one resource provider. + Links []resourceProviderLink `json:"links"` + // The name of a resource provider. + Name string `json:"name"` + // The uuid of the parent resource provider, if any. + ParentProviderUUID *string `json:"parent_provider_uuid,omitempty"` + // The uuid of the root resource provider in the tree, if any. + RootProviderUUID *string `json:"root_provider_uuid,omitempty"` +} + +// resourceProviderLink describes a link to a related object in the +// response to /resource_providers/{uuid}. +type resourceProviderLink struct { + // The relation of the linked object to the resource provider. + Rel string `json:"rel"` + // The URL of the linked object. + Href string `json:"href"` +} + +// translateToResourceProvider constructs a resourceProvider from a Hypervisor. +// For the purposes of this shim, we treat each hypervisor as a root +// provider with no parent, so both ParentProviderUUID and RootProviderUUID +// are set to the hypervisor's UUID. +func translateToResourceProvider(hv hv1.Hypervisor) resourceProvider { + return resourceProvider{ + Generation: hv.Generation, + UUID: hv.Status.HypervisorID, + Name: hv.Name, + ParentProviderUUID: &hv.Status.HypervisorID, + RootProviderUUID: &hv.Status.HypervisorID, + Links: []resourceProviderLink{ + { + Rel: "self", + Href: "/resource_providers/" + hv.Status.HypervisorID, + }, + { + Rel: "aggregates", + Href: "/resource_providers/" + hv.Status.HypervisorID + "/aggregates", + }, + { + Rel: "inventories", + Href: "/resource_providers/" + hv.Status.HypervisorID + "/inventories", + }, + { + Rel: "allocations", + Href: "/resource_providers/" + hv.Status.HypervisorID + "/allocations", + }, + { + Rel: "traits", + Href: "/resource_providers/" + hv.Status.HypervisorID + "/traits", + }, + { + Rel: "usages", + Href: "/resource_providers/" + hv.Status.HypervisorID + "/usages", + }, + }, + } +} + +// createResourceProviderRequest is the expected JSON body for +// POST /resource_providers. +type createResourceProviderRequest struct { + Name string `json:"name"` + + // Other fields are not relevant for the shim since we don't actually + // create resource providers in Kubernetes. We just check for name + // collisions with openstack placement. } // HandleCreateResourceProvider handles POST /resource_providers requests. @@ -42,7 +109,58 @@ func (s *Shim) HandleListResourceProviders(w http.ResponseWriter, r *http.Reques // an HTTP 201 with a Location header, while 1.20+ returns the full resource // provider object in the body. Returns 409 Conflict if a provider with the // same name or UUID already exists. +// +// If the name matches a KVM hypervisor already managed by Kubernetes, the +// shim returns 409 Conflict to prevent shadow resource providers from being +// created in upstream placement. +// +// See: https://docs.openstack.org/api-ref/placement/#create-resource-provider func (s *Shim) HandleCreateResourceProvider(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + // Buffer the body so we can decode it and still forward the original + // bytes to upstream placement. + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + log.Error(err, "failed to read request body") + http.Error(w, "failed to read request body", http.StatusBadRequest) + return + } + + var req createResourceProviderRequest + if err := json.Unmarshal(bodyBytes, &req); err != nil { + http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest) + return + } + if req.Name == "" { + http.Error(w, "missing required field: name", http.StatusBadRequest) + return + } + + // Look up by name — the fast path using the name index. + // Names are unique, so we expect at most one result. + var hvs hv1.HypervisorList + err = s.List(ctx, &hvs, client.MatchingFields{idxHypervisorName: req.Name}) + if err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "failed to list hypervisors with name index") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if len(hvs.Items) > 1 { + log.Error(nil, "multiple hypervisors found with the same name", "name", req.Name) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if len(hvs.Items) == 1 { + log.Error(nil, "attempt to create a resource provider that conflicts with a kvm hypervisor", + "name", req.Name, "hypervisorID", hvs.Items[0].Status.HypervisorID) + http.Error(w, "conflict with an existing kvm hypervisor resource provider", http.StatusConflict) + return + } + + // No conflict — restore the body and forward to upstream placement. + r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) s.forward(w, r) } @@ -54,6 +172,8 @@ func (s *Shim) HandleCreateResourceProvider(w http.ResponseWriter, r *http.Reque // also includes parent_provider_uuid and root_provider_uuid to describe the // provider's position in a hierarchical tree. Returns 404 if the provider // does not exist. +// +// See: https://docs.openstack.org/api-ref/placement/#show-resource-provider func (s *Shim) HandleShowResourceProvider(w http.ResponseWriter, r *http.Request) { ctx := r.Context() log := logf.FromContext(ctx) @@ -61,10 +181,11 @@ func (s *Shim) HandleShowResourceProvider(w http.ResponseWriter, r *http.Request if !ok { return } + // Try to find the hypervisor in kubernetes. - var hv hv1.HypervisorList - err := s.List(ctx, &hv, client.MatchingFields{idxHypervisorOpenStackId: uuid}) - if apierrors.IsNotFound(err) || len(hv.Items) == 0 { + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if apierrors.IsNotFound(err) || len(hvs.Items) == 0 { // Forward the request to placement if the hypervisor doesn't exist. s.forward(w, r) return @@ -75,20 +196,21 @@ func (s *Shim) HandleShowResourceProvider(w http.ResponseWriter, r *http.Request http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } - if len(hv.Items) > 1 { + if len(hvs.Items) > 1 { log.Error(nil, "multiple hypervisors found with the same OpenStack ID", "uuid", uuid) http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } - // TODO: For now, just dump the hypervisor's status as JSON, but eventually we may want to transform it. - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - jsonEncoder := json.NewEncoder(w) - if err := jsonEncoder.Encode(hv.Items[0].Status); err != nil { - log.Error(err, "failed to encode hypervisor status as JSON") - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return - } + + // Translate the hypervisor to a resource provider response. + s.writeJSON(w, http.StatusOK, translateToResourceProvider(hvs.Items[0])) +} + +// updateResourceProviderRequest is the expected JSON body for +// PUT /resource_providers/{uuid}. +type updateResourceProviderRequest struct { + Name string `json:"name"` + ParentProviderUUID *string `json:"parent_provider_uuid,omitempty"` } // HandleUpdateResourceProvider handles PUT /resource_providers/{uuid} requests. @@ -98,11 +220,72 @@ func (s *Shim) HandleShowResourceProvider(w http.ResponseWriter, r *http.Request // any existing provider UUID that would not create a loop in the tree, or set // to null to make the provider a root. Returns 409 Conflict if another // provider already has the requested name. +// +// See: https://docs.openstack.org/api-ref/placement/#update-resource-provider func (s *Shim) HandleUpdateResourceProvider(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredUUIDPathParam(w, r, "uuid"); !ok { + ctx := r.Context() + log := logf.FromContext(ctx) + uuid, ok := requiredUUIDPathParam(w, r, "uuid") + if !ok { return } - s.forward(w, r) + + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + log.Error(err, "failed to read request body") + http.Error(w, "failed to read request body", http.StatusBadRequest) + return + } + + var req updateResourceProviderRequest + if err := json.Unmarshal(bodyBytes, &req); err != nil { + http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest) + return + } + if req.Name == "" { + http.Error(w, "missing required field: name", http.StatusBadRequest) + return + } + + // Try to find the hypervisor in kubernetes. + var hvs hv1.HypervisorList + err = s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if apierrors.IsNotFound(err) || len(hvs.Items) == 0 { + // Forward the request to placement if the hypervisor doesn't exist. + r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + s.forward(w, r) + return + } + if err != nil { + log.Error(err, "failed to list hypervisors with OpenStack ID index") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if len(hvs.Items) > 1 { + log.Error(nil, "multiple hypervisors found with the same OpenStack ID", "uuid", uuid) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + hv := hvs.Items[0] + + // The hypervisor's name is immutable, so if the request tries to change it, + // we return a 409 Conflict to match the behavior of placement. + if hv.Name != req.Name { + log.Error(nil, "attempt to change the name of a kvm hypervisor resource provider", "uuid", uuid, "currentName", hv.Name, "requestedName", req.Name) + http.Error(w, "cannot change the name of a kvm hypervisor resource provider", http.StatusConflict) + return + } + // The hypervisor's parent is also immutable, so if the request tries to + // change it, we return a 409 Conflict. + if req.ParentProviderUUID != nil && *req.ParentProviderUUID != hv.Status.HypervisorID { + log.Error(nil, "attempt to change the parent of a kvm hypervisor resource provider", "uuid", uuid, "currentParent", hv.Status.HypervisorID, "requestedParent", *req.ParentProviderUUID) + http.Error(w, "cannot change the parent of a kvm hypervisor resource provider", http.StatusConflict) + return + } + + // If we get here, the request is valid but doesn't actually change anything, + // so we can just return the current state of the resource provider. + s.writeJSON(w, http.StatusOK, translateToResourceProvider(hv)) } // HandleDeleteResourceProvider handles DELETE /resource_providers/{uuid} requests. @@ -111,9 +294,376 @@ func (s *Shim) HandleUpdateResourceProvider(w http.ResponseWriter, r *http.Reque // inventories. The operation fails with 409 Conflict if there are any // allocations against the provider's inventories or if the provider has // child providers in a tree hierarchy. Returns 204 No Content on success. +// +// See: https://docs.openstack.org/api-ref/placement/#delete-resource-provider func (s *Shim) HandleDeleteResourceProvider(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredUUIDPathParam(w, r, "uuid"); !ok { + ctx := r.Context() + log := logf.FromContext(ctx) + uuid, ok := requiredUUIDPathParam(w, r, "uuid") + if !ok { return } - s.forward(w, r) + + // Try to find the hypervisor in kubernetes. + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if apierrors.IsNotFound(err) || len(hvs.Items) == 0 { + // Forward the request to placement if the hypervisor doesn't exist. + s.forward(w, r) + return + } + if err != nil { + // Something else is wrong. + log.Error(err, "failed to list hypervisors with OpenStack ID index") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if len(hvs.Items) > 1 { + log.Error(nil, "multiple hypervisors found with the same OpenStack ID", "uuid", uuid) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + // KVM hypervisor resources are immutable to the extent that they cannot be + // deleted, so we return a 409 Conflict to match the behavior of placement. + http.Error(w, "cannot delete a kvm hypervisor resource provider", http.StatusConflict) +} + +// listResourceProvidersResponse is the JSON envelope returned by +// GET /resource_providers. +type listResourceProvidersResponse struct { + ResourceProviders []resourceProvider `json:"resource_providers"` +} + +// HandleListResourceProviders handles GET /resource_providers requests. +// +// Returns a filtered list of resource providers. Resource providers are +// entities that provide consumable inventory of one or more classes of +// resources (e.g. a compute node providing VCPU, MEMORY_MB, DISK_GB). +// +// Supports numerous filter parameters including name, uuid, member_of +// (aggregate membership), resources (capacity filtering), in_tree (provider +// tree membership), and required (trait filtering). Multiple filters are +// combined with boolean AND logic. Many of these filters were added in later +// microversions: resources filtering at 1.3, tree queries at 1.14, trait +// requirements at 1.18, forbidden traits at 1.22, forbidden aggregates at +// 1.32, and the in: syntax for required at 1.39. +// +// The shim fetches resource providers from upstream placement, then merges +// in KVM hypervisors managed by Kubernetes. On uuid or name collisions the +// Kubernetes version wins and a warning is logged. +// +// See: https://docs.openstack.org/api-ref/placement/#list-resource-providers +func (s *Shim) HandleListResourceProviders(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { + if resp.StatusCode != http.StatusOK { + for k, vs := range resp.Header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + io.Copy(w, resp.Body) //nolint:errcheck + return + } + + var upstreamList listResourceProvidersResponse + if err := json.NewDecoder(resp.Body).Decode(&upstreamList); err != nil { + log.Error(err, "failed to decode upstream resource providers response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + // Fetch all KVM hypervisors from Kubernetes. + query := r.URL.Query() + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs); err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "failed to list hypervisors") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + // Post-filter by query parameters. + filtered := hvs.Items + if v := query.Get("uuid"); v != "" { + filtered = filterHypervisorsByUUID(ctx, filtered, v) + } + if v := query.Get("name"); v != "" { + filtered = filterHypervisorsByName(ctx, filtered, v) + } + if vals := query["member_of"]; len(vals) > 0 { + filtered = filterHypervisorsByMemberOf(ctx, filtered, vals) + } + if v := query.Get("in_tree"); v != "" { + filtered = filterHypervisorsByInTree(ctx, filtered, v) + } + if vals := query["required"]; len(vals) > 0 { + filtered = filterHypervisorsByRequired(ctx, filtered, vals) + } + if v := query.Get("resources"); v != "" { + var err error + filtered, err = filterHypervisorsByResources(ctx, filtered, v) + if err != nil { + log.Error(err, "failed to filter hypervisors by resources") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + } + + // Build collision sets from filtered k8s hypervisors. + k8sByUUID := make(map[string]resourceProvider, len(filtered)) + k8sByName := make(map[string]resourceProvider, len(filtered)) + for _, hv := range filtered { + rp := translateToResourceProvider(hv) + k8sByUUID[rp.UUID] = rp + k8sByName[rp.Name] = rp + } + + // Merge: keep upstream entries that don't collide; k8s wins. + merged := make([]resourceProvider, 0, len(upstreamList.ResourceProviders)+len(k8sByUUID)) + for _, rp := range upstreamList.ResourceProviders { + if _, ok := k8sByUUID[rp.UUID]; ok { + log.Info("upstream resource provider uuid collides with kvm hypervisor, using kubernetes version", + "uuid", rp.UUID, "name", rp.Name) + continue + } + if _, ok := k8sByName[rp.Name]; ok { + log.Info("upstream resource provider name collides with kvm hypervisor, using kubernetes version", + "name", rp.Name, "uuid", rp.UUID) + continue + } + merged = append(merged, rp) + } + for _, rp := range k8sByUUID { + merged = append(merged, rp) + } + + s.writeJSON(w, http.StatusOK, listResourceProvidersResponse{ + ResourceProviders: merged, + }) + }) +} + +func filterHypervisorsByUUID(ctx context.Context, hvs []hv1.Hypervisor, uuid string) []hv1.Hypervisor { + log := logf.FromContext(ctx) + out := make([]hv1.Hypervisor, 0, 1) + for _, hv := range hvs { + if hv.Status.HypervisorID == uuid { + out = append(out, hv) + } else { + log.V(1).Info("hypervisor filtered out by uuid", + "hypervisor", hv.Name, "hypervisorID", hv.Status.HypervisorID, "wantUUID", uuid) + } + } + return out +} + +func filterHypervisorsByName(ctx context.Context, hvs []hv1.Hypervisor, name string) []hv1.Hypervisor { + log := logf.FromContext(ctx) + out := make([]hv1.Hypervisor, 0, 1) + for _, hv := range hvs { + if hv.Name == name { + out = append(out, hv) + } else { + log.V(1).Info("hypervisor filtered out by name", + "hypervisor", hv.Name, "wantName", name) + } + } + return out +} + +// filterHypervisorsByMemberOf applies AND logic across repeated member_of +// params. Each value can be: +// - bare UUID +// - in:uuid1,uuid2 (any-of) +// - !uuid or !in:uuid1,uuid2 (forbidden) +func filterHypervisorsByMemberOf(ctx context.Context, hvs []hv1.Hypervisor, memberOf []string) []hv1.Hypervisor { + log := logf.FromContext(ctx) + for _, expr := range memberOf { + forbidden := strings.HasPrefix(expr, "!") + if forbidden { + expr = expr[1:] + } + var uuids []string + if strings.HasPrefix(expr, "in:") { + uuids = strings.Split(expr[3:], ",") + } else { + uuids = []string{expr} + } + uuidSet := make(map[string]struct{}, len(uuids)) + for _, u := range uuids { + uuidSet[u] = struct{}{} + } + + out := make([]hv1.Hypervisor, 0, len(hvs)) + for _, hv := range hvs { + member := false + for _, agg := range hv.Status.Aggregates { + if _, ok := uuidSet[agg.UUID]; ok { + member = true + break + } + } + switch { + case forbidden && !member: + out = append(out, hv) + case !forbidden && member: + out = append(out, hv) + default: + log.V(1).Info("hypervisor filtered out by member_of", + "hypervisor", hv.Name, "forbidden", forbidden, "member", member) + } + } + hvs = out + } + return hvs +} + +// filterHypervisorsByInTree keeps hypervisors whose UUID matches in_tree. +// KVM hypervisors are flat 1-element trees (root == self). +func filterHypervisorsByInTree(ctx context.Context, hvs []hv1.Hypervisor, inTree string) []hv1.Hypervisor { + log := logf.FromContext(ctx) + out := make([]hv1.Hypervisor, 0, 1) + for _, hv := range hvs { + if hv.Status.HypervisorID == inTree { + out = append(out, hv) + } else { + log.V(1).Info("hypervisor filtered out by in_tree", + "hypervisor", hv.Name, "hypervisorID", hv.Status.HypervisorID, "wantInTree", inTree) + } + } + return out +} + +// filterHypervisorsByRequired applies AND logic across repeated required +// params. Each value is a comma-separated list of traits: +// - TRAIT_A,TRAIT_B — all must be present +// - !TRAIT_C — must NOT be present +// - in:TRAIT_X,TRAIT_Y — at least one must be present +func filterHypervisorsByRequired(ctx context.Context, hvs []hv1.Hypervisor, required []string) []hv1.Hypervisor { + log := logf.FromContext(ctx) + for _, expr := range required { + parts := strings.Split(expr, ",") + out := make([]hv1.Hypervisor, 0, len(hvs)) + for _, hv := range hvs { + traitSet := make(map[string]struct{}, len(hv.Status.Traits)) + for _, t := range hv.Status.Traits { + traitSet[t] = struct{}{} + } + if matchesTraitExpr(traitSet, parts) { + out = append(out, hv) + } else { + log.V(1).Info("hypervisor filtered out by required", + "hypervisor", hv.Name, "expr", expr) + } + } + hvs = out + } + return hvs +} + +// matchesTraitExpr checks whether a single repeated required parameter +// (already split on comma) is satisfied by the given trait set. +func matchesTraitExpr(traitSet map[string]struct{}, parts []string) bool { + i := 0 + for i < len(parts) { + p := parts[i] + switch { + case strings.HasPrefix(p, "!"): + // Forbidden trait. + if _, ok := traitSet[p[1:]]; ok { + return false + } + i++ + case strings.HasPrefix(p, "in:"): + // Any-of group: collect all tokens until the next non-plain token. + anyOf := []string{p[3:]} + for i+1 < len(parts) && !strings.HasPrefix(parts[i+1], "!") && !strings.HasPrefix(parts[i+1], "in:") { + i++ + anyOf = append(anyOf, parts[i]) + } + found := false + for _, t := range anyOf { + if _, ok := traitSet[t]; ok { + found = true + break + } + } + if !found { + return false + } + i++ + default: + // Required trait (must be present). + if _, ok := traitSet[p]; !ok { + return false + } + i++ + } + } + return true +} + +// filterHypervisorsByResources keeps only hypervisors whose effective +// capacity meets or exceeds every requested resource amount. +func filterHypervisorsByResources(ctx context.Context, hvs []hv1.Hypervisor, raw string) ([]hv1.Hypervisor, error) { + log := logf.FromContext(ctx) + requested := make(map[string]resource.Quantity) + for part := range strings.SplitSeq(raw, ",") { + kv := strings.SplitN(part, ":", 2) + if len(kv) != 2 { + return nil, fmt.Errorf("invalid resources token %q", part) + } + amount, err := strconv.ParseInt(kv[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid amount in resources token %q: %w", part, err) + } + mappedResourceName := "" + var mappedResourceQuantity *resource.Quantity + switch kv[0] { + case "VCPU": + mappedResourceName = "cpu" + mappedResourceQuantity = resource.NewQuantity(amount, resource.DecimalSI) + case "MEMORY_MB": + mappedResourceName = "memory" + mappedResourceQuantity = resource.NewQuantity(amount*1024*1024, resource.DecimalSI) + case "DISK_GB": + mappedResourceName = "disk" + mappedResourceQuantity = resource.NewQuantity(amount*1024*1024*1024, resource.DecimalSI) + default: + return nil, fmt.Errorf("invalid resource class in resources token %q", part) + } + requested[mappedResourceName] = *mappedResourceQuantity + } + + out := make([]hv1.Hypervisor, 0, len(hvs)) + for _, hv := range hvs { + // Check that all requested resources are satisfied by the + // hypervisor's capacity. + satisfied := true + for r, amount := range requested { + provided, ok := hv.Status.EffectiveCapacity[hv1.ResourceName(r)] + if !ok { + // Fallback to the physical capacity. + provided, ok = hv.Status.Capacity[hv1.ResourceName(r)] + if !ok { + provided = *resource.NewQuantity(0, resource.DecimalSI) + } + } + if provided.Cmp(amount) < 0 { + satisfied = false + break + } + } + if satisfied { + out = append(out, hv) + } else { + log.V(1).Info("hypervisor filtered out by resources", + "hypervisor", hv.Name, "resources", raw) + } + } + return out, nil } diff --git a/internal/shim/placement/handle_resource_providers_test.go b/internal/shim/placement/handle_resource_providers_test.go index 520a32c0b..a4d6dbf76 100644 --- a/internal/shim/placement/handle_resource_providers_test.go +++ b/internal/shim/placement/handle_resource_providers_test.go @@ -4,43 +4,738 @@ package placement import ( + "context" + "encoding/json" "net/http" + "net/http/httptest" + "strings" "testing" + + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" ) +// --------------------------------------------------------------------------- +// Test helper factories +// --------------------------------------------------------------------------- + +func testScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + if err := hv1.AddToScheme(s); err != nil { + t.Fatalf("hv1 scheme: %v", err) + } + return s +} + +func testHypervisor(name, openstackID string) hv1.Hypervisor { + return hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Status: hv1.HypervisorStatus{HypervisorID: openstackID}, + } +} + +func testHypervisorFull( + name, openstackID string, + aggregates []hv1.Aggregate, + traits []string, + capacity map[hv1.ResourceName]resource.Quantity, +) hv1.Hypervisor { + + return hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Status: hv1.HypervisorStatus{ + HypervisorID: openstackID, + Aggregates: aggregates, + Traits: traits, + EffectiveCapacity: capacity, + }, + } +} + +func newFakeClient(t *testing.T, objs ...client.Object) client.Client { + t.Helper() + s := testScheme(t) + builder := fake.NewClientBuilder().WithScheme(s) + if len(objs) > 0 { + builder = builder.WithObjects(objs...) + } + builder = builder.WithIndex(&hv1.Hypervisor{}, idxHypervisorOpenStackId, func(obj client.Object) []string { + hv, ok := obj.(*hv1.Hypervisor) + if !ok { + return nil + } + if hv.Status.HypervisorID == "" { + return nil + } + return []string{hv.Status.HypervisorID} + }) + builder = builder.WithIndex(&hv1.Hypervisor{}, idxHypervisorName, func(obj client.Object) []string { + hv, ok := obj.(*hv1.Hypervisor) + if !ok { + return nil + } + return []string{hv.Name} + }) + return builder.Build() +} + +func newTestShimWithHypervisors(t *testing.T, upstreamStatus int, upstreamBody string, hvs ...client.Object) *Shim { + t.Helper() + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(upstreamStatus) + if _, err := w.Write([]byte(upstreamBody)); err != nil { + t.Errorf("failed to write upstream body: %v", err) + } + })) + t.Cleanup(upstream.Close) + down, up := newTestTimers() + return &Shim{ + Client: newFakeClient(t, hvs...), + config: config{PlacementURL: upstream.URL}, + httpClient: upstream.Client(), + maxBodyLogSize: 4096, + downstreamRequestTimer: down, + upstreamRequestTimer: up, + } +} + +// --------------------------------------------------------------------------- +// Filter unit tests +// --------------------------------------------------------------------------- + +func TestFilterHypervisorsByUUID(t *testing.T) { + ctx := context.Background() + hvs := []hv1.Hypervisor{ + testHypervisor("hv1", "uuid-1"), + testHypervisor("hv2", "uuid-2"), + testHypervisor("hv3", "uuid-3"), + } + t.Run("match one", func(t *testing.T) { + got := filterHypervisorsByUUID(ctx, hvs, "uuid-2") + if len(got) != 1 || got[0].Name != "hv2" { + t.Errorf("got %v, want [hv2]", names(got)) + } + }) + t.Run("no match", func(t *testing.T) { + got := filterHypervisorsByUUID(ctx, hvs, "uuid-999") + if len(got) != 0 { + t.Errorf("got %v, want empty", names(got)) + } + }) + t.Run("empty input", func(t *testing.T) { + got := filterHypervisorsByUUID(ctx, nil, "uuid-1") + if len(got) != 0 { + t.Errorf("got %v, want empty", names(got)) + } + }) +} + +func TestFilterHypervisorsByName(t *testing.T) { + ctx := context.Background() + hvs := []hv1.Hypervisor{ + testHypervisor("node-01", "id-1"), + testHypervisor("node-02", "id-2"), + } + t.Run("match", func(t *testing.T) { + got := filterHypervisorsByName(ctx, hvs, "node-01") + if len(got) != 1 || got[0].Name != "node-01" { + t.Errorf("got %v, want [node-01]", names(got)) + } + }) + t.Run("no match", func(t *testing.T) { + got := filterHypervisorsByName(ctx, hvs, "node-99") + if len(got) != 0 { + t.Errorf("got %v, want empty", names(got)) + } + }) +} + +func TestFilterHypervisorsByMemberOf(t *testing.T) { + ctx := context.Background() + agg1 := hv1.Aggregate{Name: "az1", UUID: "agg-uuid-1"} + agg2 := hv1.Aggregate{Name: "az2", UUID: "agg-uuid-2"} + hvs := []hv1.Hypervisor{ + testHypervisorFull("hv1", "id-1", []hv1.Aggregate{agg1}, nil, nil), + testHypervisorFull("hv2", "id-2", []hv1.Aggregate{agg1, agg2}, nil, nil), + testHypervisorFull("hv3", "id-3", []hv1.Aggregate{agg2}, nil, nil), + testHypervisorFull("hv4", "id-4", nil, nil, nil), + } + + t.Run("bare UUID match", func(t *testing.T) { + got := filterHypervisorsByMemberOf(ctx, hvs, []string{"agg-uuid-1"}) + if len(got) != 2 { + t.Fatalf("got %v, want [hv1, hv2]", names(got)) + } + }) + t.Run("in: any-of", func(t *testing.T) { + got := filterHypervisorsByMemberOf(ctx, hvs, []string{"in:agg-uuid-1,agg-uuid-2"}) + if len(got) != 3 { + t.Fatalf("got %v, want [hv1, hv2, hv3]", names(got)) + } + }) + t.Run("forbidden", func(t *testing.T) { + got := filterHypervisorsByMemberOf(ctx, hvs, []string{"!agg-uuid-1"}) + if len(got) != 2 { + t.Fatalf("got %v, want [hv3, hv4]", names(got)) + } + }) + t.Run("forbidden in:", func(t *testing.T) { + got := filterHypervisorsByMemberOf(ctx, hvs, []string{"!in:agg-uuid-1,agg-uuid-2"}) + if len(got) != 1 || got[0].Name != "hv4" { + t.Fatalf("got %v, want [hv4]", names(got)) + } + }) + t.Run("AND across repeated params", func(t *testing.T) { + got := filterHypervisorsByMemberOf(ctx, hvs, []string{"agg-uuid-1", "agg-uuid-2"}) + if len(got) != 1 || got[0].Name != "hv2" { + t.Fatalf("got %v, want [hv2]", names(got)) + } + }) + t.Run("no match", func(t *testing.T) { + got := filterHypervisorsByMemberOf(ctx, hvs, []string{"nonexistent-uuid"}) + if len(got) != 0 { + t.Fatalf("got %v, want empty", names(got)) + } + }) +} + +func TestFilterHypervisorsByInTree(t *testing.T) { + ctx := context.Background() + hvs := []hv1.Hypervisor{ + testHypervisor("hv1", "uuid-1"), + testHypervisor("hv2", "uuid-2"), + } + t.Run("match", func(t *testing.T) { + got := filterHypervisorsByInTree(ctx, hvs, "uuid-1") + if len(got) != 1 || got[0].Name != "hv1" { + t.Errorf("got %v, want [hv1]", names(got)) + } + }) + t.Run("no match", func(t *testing.T) { + got := filterHypervisorsByInTree(ctx, hvs, "uuid-999") + if len(got) != 0 { + t.Errorf("got %v, want empty", names(got)) + } + }) +} + +func TestFilterHypervisorsByRequired(t *testing.T) { + ctx := context.Background() + hvs := []hv1.Hypervisor{ + testHypervisorFull("hv1", "id-1", nil, []string{"CUSTOM_A", "CUSTOM_B"}, nil), + testHypervisorFull("hv2", "id-2", nil, []string{"CUSTOM_A"}, nil), + testHypervisorFull("hv3", "id-3", nil, []string{"CUSTOM_C"}, nil), + testHypervisorFull("hv4", "id-4", nil, nil, nil), + } + + t.Run("single required trait", func(t *testing.T) { + got := filterHypervisorsByRequired(ctx, hvs, []string{"CUSTOM_A"}) + if len(got) != 2 { + t.Fatalf("got %v, want [hv1, hv2]", names(got)) + } + }) + t.Run("multiple required traits (AND)", func(t *testing.T) { + got := filterHypervisorsByRequired(ctx, hvs, []string{"CUSTOM_A,CUSTOM_B"}) + if len(got) != 1 || got[0].Name != "hv1" { + t.Fatalf("got %v, want [hv1]", names(got)) + } + }) + t.Run("forbidden trait", func(t *testing.T) { + got := filterHypervisorsByRequired(ctx, hvs, []string{"!CUSTOM_A"}) + if len(got) != 2 { + t.Fatalf("got %v, want [hv3, hv4]", names(got)) + } + }) + t.Run("any-of (in:)", func(t *testing.T) { + got := filterHypervisorsByRequired(ctx, hvs, []string{"in:CUSTOM_B,CUSTOM_C"}) + if len(got) != 2 { + t.Fatalf("got %v, want [hv1, hv3]", names(got)) + } + }) + t.Run("AND across repeated required params", func(t *testing.T) { + got := filterHypervisorsByRequired(ctx, hvs, []string{"CUSTOM_A", "CUSTOM_B"}) + if len(got) != 1 || got[0].Name != "hv1" { + t.Fatalf("got %v, want [hv1]", names(got)) + } + }) + t.Run("no match", func(t *testing.T) { + got := filterHypervisorsByRequired(ctx, hvs, []string{"CUSTOM_Z"}) + if len(got) != 0 { + t.Fatalf("got %v, want empty", names(got)) + } + }) +} + +func TestMatchesTraitExpr(t *testing.T) { + traits := map[string]struct{}{ + "CUSTOM_A": {}, + "CUSTOM_B": {}, + } + tests := []struct { + name string + parts []string + want bool + }{ + {"required present", []string{"CUSTOM_A"}, true}, + {"required absent", []string{"CUSTOM_Z"}, false}, + {"forbidden present", []string{"!CUSTOM_A"}, false}, + {"forbidden absent", []string{"!CUSTOM_Z"}, true}, + {"any-of hit", []string{"in:CUSTOM_Z", "CUSTOM_A"}, true}, + {"any-of miss", []string{"in:CUSTOM_X", "CUSTOM_Z"}, false}, + {"mixed pass", []string{"CUSTOM_A", "!CUSTOM_Z"}, true}, + {"mixed fail on forbidden", []string{"CUSTOM_A", "!CUSTOM_B"}, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := matchesTraitExpr(traits, tt.parts); got != tt.want { + t.Errorf("matchesTraitExpr(%v) = %v, want %v", tt.parts, got, tt.want) + } + }) + } +} + +func TestFilterHypervisorsByResources(t *testing.T) { + ctx := context.Background() + cpu16 := resource.MustParse("16") + mem64Gi := resource.MustParse("64Gi") + hvs := []hv1.Hypervisor{ + testHypervisorFull("big", "id-1", nil, nil, map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceCPU: cpu16, + hv1.ResourceMemory: mem64Gi, + }), + testHypervisorFull("small", "id-2", nil, nil, map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceCPU: resource.MustParse("2"), + hv1.ResourceMemory: resource.MustParse("4Gi"), + }), + testHypervisorFull("empty", "id-3", nil, nil, nil), + } + + t.Run("VCPU filter matches big", func(t *testing.T) { + got, err := filterHypervisorsByResources(ctx, hvs, "VCPU:4") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 1 || got[0].Name != "big" { + t.Errorf("got %v, want [big]", names(got)) + } + }) + t.Run("MEMORY_MB filter", func(t *testing.T) { + // 64 GiB = 65536 MiB + got, err := filterHypervisorsByResources(ctx, hvs, "MEMORY_MB:65536") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 1 || got[0].Name != "big" { + t.Errorf("got %v, want [big]", names(got)) + } + }) + t.Run("combined VCPU and MEMORY_MB", func(t *testing.T) { + got, err := filterHypervisorsByResources(ctx, hvs, "VCPU:2,MEMORY_MB:4096") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 2 { + t.Errorf("got %v, want [big, small]", names(got)) + } + }) + t.Run("DISK_GB:0 matches all", func(t *testing.T) { + got, err := filterHypervisorsByResources(ctx, hvs, "DISK_GB:0") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != len(hvs) { + t.Errorf("got %d, want %d", len(got), len(hvs)) + } + }) + t.Run("DISK_GB:1 matches none", func(t *testing.T) { + got, err := filterHypervisorsByResources(ctx, hvs, "DISK_GB:1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 0 { + t.Errorf("got %v, want empty", names(got)) + } + }) + t.Run("invalid format", func(t *testing.T) { + _, err := filterHypervisorsByResources(ctx, hvs, "VCPU") + if err == nil { + t.Fatal("expected error for missing colon") + } + }) + t.Run("non-numeric amount", func(t *testing.T) { + _, err := filterHypervisorsByResources(ctx, hvs, "VCPU:abc") + if err == nil { + t.Fatal("expected error for non-numeric amount") + } + }) + t.Run("unknown resource class", func(t *testing.T) { + _, err := filterHypervisorsByResources(ctx, hvs, "CUSTOM_WIDGETS:5") + if err == nil { + t.Fatal("expected error for unknown resource class") + } + }) + t.Run("fallback to Capacity when EffectiveCapacity missing", func(t *testing.T) { + hv := hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "cap-only"}, + Status: hv1.HypervisorStatus{ + HypervisorID: "id-cap", + Capacity: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceCPU: resource.MustParse("8"), + }, + }, + } + got, err := filterHypervisorsByResources(ctx, []hv1.Hypervisor{hv}, "VCPU:4") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 1 { + t.Errorf("got %v, want [cap-only]", names(got)) + } + }) +} + +// --------------------------------------------------------------------------- +// translateToResourceProvider +// --------------------------------------------------------------------------- + +func TestTranslateToResourceProvider(t *testing.T) { + hv := hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-node-01"}, + Status: hv1.HypervisorStatus{ + HypervisorID: validUUID, + }, + } + hv.Generation = 3 + rp := translateToResourceProvider(hv) + + if rp.Name != "hv-node-01" { + t.Errorf("Name = %q, want %q", rp.Name, "hv-node-01") + } + if rp.UUID != validUUID { + t.Errorf("UUID = %q, want %q", rp.UUID, validUUID) + } + if rp.Generation != 3 { + t.Errorf("Generation = %d, want 3", rp.Generation) + } + if rp.ParentProviderUUID == nil || *rp.ParentProviderUUID != validUUID { + t.Errorf("ParentProviderUUID = %v, want %q", rp.ParentProviderUUID, validUUID) + } + if rp.RootProviderUUID == nil || *rp.RootProviderUUID != validUUID { + t.Errorf("RootProviderUUID = %v, want %q", rp.RootProviderUUID, validUUID) + } + wantRels := []string{"self", "aggregates", "inventories", "allocations", "traits", "usages"} + if len(rp.Links) != len(wantRels) { + t.Fatalf("Links count = %d, want %d", len(rp.Links), len(wantRels)) + } + for i, rel := range wantRels { + if rp.Links[i].Rel != rel { + t.Errorf("Links[%d].Rel = %q, want %q", i, rp.Links[i].Rel, rel) + } + if !strings.Contains(rp.Links[i].Href, validUUID) { + t.Errorf("Links[%d].Href = %q, missing UUID", i, rp.Links[i].Href) + } + } +} + +// --------------------------------------------------------------------------- +// Handler integration tests +// --------------------------------------------------------------------------- + func TestHandleListResourceProviders(t *testing.T) { - var gotPath string - s := newTestShim(t, http.StatusOK, `{"resource_providers":[]}`, &gotPath) - w := serveHandler(t, "GET", "/resource_providers", s.HandleListResourceProviders, "/resource_providers") - if w.Code != http.StatusOK { - t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + hv1Obj := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-node-01"}, + Status: hv1.HypervisorStatus{HypervisorID: validUUID}, } - if gotPath != "/resource_providers" { - t.Fatalf("upstream path = %q, want /resource_providers", gotPath) + + t.Run("merges upstream and k8s providers", func(t *testing.T) { + upstreamBody := `{"resource_providers":[{"uuid":"upstream-uuid","name":"upstream-rp","generation":1,"links":[]}]}` + s := newTestShimWithHypervisors(t, http.StatusOK, upstreamBody, hv1Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers") + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + var resp listResourceProvidersResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.ResourceProviders) != 2 { + t.Fatalf("got %d providers, want 2", len(resp.ResourceProviders)) + } + }) + + t.Run("k8s wins on UUID collision", func(t *testing.T) { + upstreamBody := `{"resource_providers":[{"uuid":"` + validUUID + `","name":"upstream-name","generation":1,"links":[]}]}` + s := newTestShimWithHypervisors(t, http.StatusOK, upstreamBody, hv1Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers") + var resp listResourceProvidersResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.ResourceProviders) != 1 { + t.Fatalf("got %d providers, want 1", len(resp.ResourceProviders)) + } + if resp.ResourceProviders[0].Name != "hv-node-01" { + t.Errorf("name = %q, want %q", resp.ResourceProviders[0].Name, "hv-node-01") + } + }) + + t.Run("k8s wins on name collision", func(t *testing.T) { + upstreamBody := `{"resource_providers":[{"uuid":"other-uuid","name":"hv-node-01","generation":1,"links":[]}]}` + s := newTestShimWithHypervisors(t, http.StatusOK, upstreamBody, hv1Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers") + var resp listResourceProvidersResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.ResourceProviders) != 1 { + t.Fatalf("got %d providers, want 1", len(resp.ResourceProviders)) + } + if resp.ResourceProviders[0].UUID != validUUID { + t.Errorf("uuid = %q, want %q", resp.ResourceProviders[0].UUID, validUUID) + } + }) + + t.Run("upstream non-200 is forwarded", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusServiceUnavailable, "service down", hv1Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers") + if w.Code != http.StatusServiceUnavailable { + t.Fatalf("status = %d, want %d", w.Code, http.StatusServiceUnavailable) + } + }) + + t.Run("empty k8s list returns only upstream", func(t *testing.T) { + upstreamBody := `{"resource_providers":[{"uuid":"u1","name":"n1","generation":0,"links":[]}]}` + s := newTestShimWithHypervisors(t, http.StatusOK, upstreamBody) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers") + var resp listResourceProvidersResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.ResourceProviders) != 1 { + t.Fatalf("got %d providers, want 1", len(resp.ResourceProviders)) + } + }) +} + +func TestHandleListResourceProviders_Filters(t *testing.T) { + agg := hv1.Aggregate{Name: "az1", UUID: "agg-uuid-1"} + cpu16 := resource.MustParse("16") + mem64Gi := resource.MustParse("64Gi") + + hv1Obj := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-a"}, + Status: hv1.HypervisorStatus{ + HypervisorID: "aaaa-aaaa", + Aggregates: []hv1.Aggregate{agg}, + Traits: []string{"CUSTOM_TRAIT_A"}, + EffectiveCapacity: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceCPU: cpu16, + hv1.ResourceMemory: mem64Gi, + }, + }, + } + hv2Obj := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-b"}, + Status: hv1.HypervisorStatus{ + HypervisorID: "bbbb-bbbb", + Traits: []string{"CUSTOM_TRAIT_B"}, + }, } + + emptyUpstream := `{"resource_providers":[]}` + + t.Run("filter by uuid", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, emptyUpstream, hv1Obj, hv2Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers?uuid=aaaa-aaaa") + var resp listResourceProvidersResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode: %v", err) + } + if len(resp.ResourceProviders) != 1 || resp.ResourceProviders[0].Name != "hv-a" { + t.Errorf("got %v, want [hv-a]", resp.ResourceProviders) + } + }) + + t.Run("filter by name", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, emptyUpstream, hv1Obj, hv2Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers?name=hv-b") + var resp listResourceProvidersResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode: %v", err) + } + if len(resp.ResourceProviders) != 1 || resp.ResourceProviders[0].Name != "hv-b" { + t.Errorf("got %v, want [hv-b]", resp.ResourceProviders) + } + }) + + t.Run("filter by member_of", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, emptyUpstream, hv1Obj, hv2Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers?member_of=agg-uuid-1") + var resp listResourceProvidersResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode: %v", err) + } + if len(resp.ResourceProviders) != 1 || resp.ResourceProviders[0].Name != "hv-a" { + t.Errorf("got %v, want [hv-a]", resp.ResourceProviders) + } + }) + + t.Run("filter by in_tree", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, emptyUpstream, hv1Obj, hv2Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers?in_tree=bbbb-bbbb") + var resp listResourceProvidersResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode: %v", err) + } + if len(resp.ResourceProviders) != 1 || resp.ResourceProviders[0].Name != "hv-b" { + t.Errorf("got %v, want [hv-b]", resp.ResourceProviders) + } + }) + + t.Run("filter by required", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, emptyUpstream, hv1Obj, hv2Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers?required=CUSTOM_TRAIT_A") + var resp listResourceProvidersResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode: %v", err) + } + if len(resp.ResourceProviders) != 1 || resp.ResourceProviders[0].Name != "hv-a" { + t.Errorf("got %v, want [hv-a]", resp.ResourceProviders) + } + }) + + t.Run("filter by resources", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, emptyUpstream, hv1Obj, hv2Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers?resources=VCPU:8") + var resp listResourceProvidersResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode: %v", err) + } + if len(resp.ResourceProviders) != 1 || resp.ResourceProviders[0].Name != "hv-a" { + t.Errorf("got %v, want [hv-a]", resp.ResourceProviders) + } + }) + + t.Run("invalid resources returns 500", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, emptyUpstream, hv1Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers?resources=INVALID") + if w.Code != http.StatusInternalServerError { + t.Fatalf("status = %d, want %d", w.Code, http.StatusInternalServerError) + } + }) } func TestHandleCreateResourceProvider(t *testing.T) { - s := newTestShim(t, http.StatusCreated, "{}", nil) - w := serveHandler(t, "POST", "/resource_providers", s.HandleCreateResourceProvider, "/resource_providers") - if w.Code != http.StatusCreated { - t.Fatalf("status = %d, want %d", w.Code, http.StatusCreated) + hv1Obj := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "existing-hv"}, + Status: hv1.HypervisorStatus{HypervisorID: validUUID}, } + + t.Run("conflict with existing hypervisor", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusCreated, `{}`, hv1Obj) + body := `{"name":"existing-hv"}` + req := httptest.NewRequest(http.MethodPost, "/resource_providers", strings.NewReader(body)) + w := httptest.NewRecorder() + s.HandleCreateResourceProvider(w, req) + if w.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d", w.Code, http.StatusConflict) + } + }) + + t.Run("no conflict forwards to upstream", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusCreated, `{"uuid":"new-uuid","name":"new-rp"}`, hv1Obj) + body := `{"name":"new-rp"}` + req := httptest.NewRequest(http.MethodPost, "/resource_providers", strings.NewReader(body)) + w := httptest.NewRecorder() + s.HandleCreateResourceProvider(w, req) + if w.Code != http.StatusCreated { + t.Fatalf("status = %d, want %d", w.Code, http.StatusCreated) + } + }) + + t.Run("missing name returns 400", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusCreated, `{}`) + body := `{"name":""}` + req := httptest.NewRequest(http.MethodPost, "/resource_providers", strings.NewReader(body)) + w := httptest.NewRecorder() + s.HandleCreateResourceProvider(w, req) + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) + } + }) + + t.Run("invalid JSON returns 400", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusCreated, `{}`) + req := httptest.NewRequest(http.MethodPost, "/resource_providers", strings.NewReader("not json")) + w := httptest.NewRecorder() + s.HandleCreateResourceProvider(w, req) + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) + } + }) } func TestHandleShowResourceProvider(t *testing.T) { - t.Run("valid uuid", func(t *testing.T) { - s := newTestShim(t, http.StatusOK, "{}", nil) - w := serveHandler(t, "GET", "/resource_providers/{uuid}", s.HandleShowResourceProvider, - "/resource_providers/"+validUUID) + hv1Obj := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-show"}, + Status: hv1.HypervisorStatus{HypervisorID: validUUID}, + } + + t.Run("found in k8s", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, `{}`, hv1Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers/{uuid}", + s.HandleShowResourceProvider, "/resource_providers/"+validUUID) if w.Code != http.StatusOK { t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) } + var rp resourceProvider + if err := json.Unmarshal(w.Body.Bytes(), &rp); err != nil { + t.Fatalf("failed to decode: %v", err) + } + if rp.Name != "hv-show" { + t.Errorf("name = %q, want %q", rp.Name, "hv-show") + } + if rp.UUID != validUUID { + t.Errorf("uuid = %q, want %q", rp.UUID, validUUID) + } }) - t.Run("invalid uuid", func(t *testing.T) { - s := newTestShim(t, http.StatusOK, "{}", nil) - w := serveHandler(t, "GET", "/resource_providers/{uuid}", s.HandleShowResourceProvider, - "/resource_providers/not-a-uuid") + + t.Run("not in k8s forwards to upstream", func(t *testing.T) { + const otherUUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + upstreamBody := `{"uuid":"` + otherUUID + `","name":"upstream-rp","generation":0,"links":[]}` + s := newTestShimWithHypervisors(t, http.StatusOK, upstreamBody, hv1Obj) + w := serveHandler(t, http.MethodGet, "/resource_providers/{uuid}", + s.HandleShowResourceProvider, "/resource_providers/"+otherUUID) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + if !strings.Contains(w.Body.String(), "upstream-rp") { + t.Errorf("expected upstream response body, got %q", w.Body.String()) + } + }) + + t.Run("invalid UUID returns 400", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, `{}`) + w := serveHandler(t, http.MethodGet, "/resource_providers/{uuid}", + s.HandleShowResourceProvider, "/resource_providers/not-a-uuid") if w.Code != http.StatusBadRequest { t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) } @@ -48,18 +743,91 @@ func TestHandleShowResourceProvider(t *testing.T) { } func TestHandleUpdateResourceProvider(t *testing.T) { - t.Run("valid uuid", func(t *testing.T) { - s := newTestShim(t, http.StatusOK, "{}", nil) - w := serveHandler(t, "PUT", "/resource_providers/{uuid}", s.HandleUpdateResourceProvider, - "/resource_providers/"+validUUID) + hv1Obj := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-update"}, + Status: hv1.HypervisorStatus{HypervisorID: validUUID}, + } + + t.Run("no-op update returns current state", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, `{}`, hv1Obj) + body := `{"name":"hv-update"}` + req := httptest.NewRequest(http.MethodPut, "/resource_providers/"+validUUID, strings.NewReader(body)) + mux := http.NewServeMux() + mux.HandleFunc("PUT /resource_providers/{uuid}", s.HandleUpdateResourceProvider) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) if w.Code != http.StatusOK { t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) } + var rp resourceProvider + if err := json.Unmarshal(w.Body.Bytes(), &rp); err != nil { + t.Fatalf("failed to decode: %v", err) + } + if rp.Name != "hv-update" { + t.Errorf("name = %q, want %q", rp.Name, "hv-update") + } + }) + + t.Run("name change returns 409", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, `{}`, hv1Obj) + body := `{"name":"different-name"}` + req := httptest.NewRequest(http.MethodPut, "/resource_providers/"+validUUID, strings.NewReader(body)) + mux := http.NewServeMux() + mux.HandleFunc("PUT /resource_providers/{uuid}", s.HandleUpdateResourceProvider) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d", w.Code, http.StatusConflict) + } + }) + + t.Run("parent change returns 409", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, `{}`, hv1Obj) + body := `{"name":"hv-update","parent_provider_uuid":"other-parent-uuid"}` + req := httptest.NewRequest(http.MethodPut, "/resource_providers/"+validUUID, strings.NewReader(body)) + mux := http.NewServeMux() + mux.HandleFunc("PUT /resource_providers/{uuid}", s.HandleUpdateResourceProvider) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d", w.Code, http.StatusConflict) + } + }) + + t.Run("unknown UUID forwards to upstream", func(t *testing.T) { + const otherUUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + s := newTestShimWithHypervisors(t, http.StatusOK, `{"uuid":"`+otherUUID+`","name":"upstream"}`, hv1Obj) + body := `{"name":"upstream"}` + req := httptest.NewRequest(http.MethodPut, "/resource_providers/"+otherUUID, strings.NewReader(body)) + mux := http.NewServeMux() + mux.HandleFunc("PUT /resource_providers/{uuid}", s.HandleUpdateResourceProvider) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + }) + + t.Run("missing name returns 400", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, `{}`) + body := `{"name":""}` + req := httptest.NewRequest(http.MethodPut, "/resource_providers/"+validUUID, strings.NewReader(body)) + mux := http.NewServeMux() + mux.HandleFunc("PUT /resource_providers/{uuid}", s.HandleUpdateResourceProvider) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) + } }) - t.Run("invalid uuid", func(t *testing.T) { - s := newTestShim(t, http.StatusOK, "{}", nil) - w := serveHandler(t, "PUT", "/resource_providers/{uuid}", s.HandleUpdateResourceProvider, - "/resource_providers/not-a-uuid") + + t.Run("invalid JSON returns 400", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, `{}`) + req := httptest.NewRequest(http.MethodPut, "/resource_providers/"+validUUID, strings.NewReader("not json")) + mux := http.NewServeMux() + mux.HandleFunc("PUT /resource_providers/{uuid}", s.HandleUpdateResourceProvider) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) if w.Code != http.StatusBadRequest { t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) } @@ -67,20 +835,48 @@ func TestHandleUpdateResourceProvider(t *testing.T) { } func TestHandleDeleteResourceProvider(t *testing.T) { - t.Run("valid uuid", func(t *testing.T) { - s := newTestShim(t, http.StatusNoContent, "", nil) - w := serveHandler(t, "DELETE", "/resource_providers/{uuid}", s.HandleDeleteResourceProvider, - "/resource_providers/"+validUUID) + hv1Obj := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-delete"}, + Status: hv1.HypervisorStatus{HypervisorID: validUUID}, + } + + t.Run("k8s hypervisor returns 409", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusNoContent, "", hv1Obj) + w := serveHandler(t, http.MethodDelete, "/resource_providers/{uuid}", + s.HandleDeleteResourceProvider, "/resource_providers/"+validUUID) + if w.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d", w.Code, http.StatusConflict) + } + }) + + t.Run("unknown UUID forwards to upstream", func(t *testing.T) { + const otherUUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + s := newTestShimWithHypervisors(t, http.StatusNoContent, "", hv1Obj) + w := serveHandler(t, http.MethodDelete, "/resource_providers/{uuid}", + s.HandleDeleteResourceProvider, "/resource_providers/"+otherUUID) if w.Code != http.StatusNoContent { t.Fatalf("status = %d, want %d", w.Code, http.StatusNoContent) } }) - t.Run("invalid uuid", func(t *testing.T) { - s := newTestShim(t, http.StatusOK, "{}", nil) - w := serveHandler(t, "DELETE", "/resource_providers/{uuid}", s.HandleDeleteResourceProvider, - "/resource_providers/not-a-uuid") + + t.Run("invalid UUID returns 400", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusNoContent, "") + w := serveHandler(t, http.MethodDelete, "/resource_providers/{uuid}", + s.HandleDeleteResourceProvider, "/resource_providers/not-a-uuid") if w.Code != http.StatusBadRequest { t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) } }) } + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func names(hvs []hv1.Hypervisor) []string { + out := make([]string, len(hvs)) + for i, hv := range hvs { + out[i] = hv.Name + } + return out +} diff --git a/internal/shim/placement/shim.go b/internal/shim/placement/shim.go index 043777bf3..f44979bbf 100644 --- a/internal/shim/placement/shim.go +++ b/internal/shim/placement/shim.go @@ -307,6 +307,15 @@ func (s *Shim) SetupWithManager(ctx context.Context, mgr ctrl.Manager) (err erro // The route pattern for metric labels is read from the request context // (set by the measurement middleware in RegisterRoutes). func (s *Shim) forward(w http.ResponseWriter, r *http.Request) { + s.forwardWithHook(w, r, nil) +} + +// forwardWithHook works like forward but accepts an optional intercept +// callback. When hook is non-nil and the upstream returns a successful +// response, the hook receives the *http.Response and is responsible for +// writing the final response to w. If hook is nil the response is copied +// through unchanged, identical to forward. +func (s *Shim) forwardWithHook(w http.ResponseWriter, r *http.Request, hook func(w http.ResponseWriter, resp *http.Response)) { ctx := r.Context() log := logf.FromContext(ctx) log.Info("Forwarding request to placement API", @@ -360,7 +369,18 @@ func (s *Shim) forward(w http.ResponseWriter, r *http.Request) { } defer resp.Body.Close() - // Copy response headers, status code, and body back to the caller. + // Observe after the response is received (the hook or copy below + // may consume the body, but the upstream latency is already known). + s.upstreamRequestTimer. + WithLabelValues(r.Method, pattern, strconv.Itoa(resp.StatusCode)). + Observe(time.Since(start).Seconds()) + + if hook != nil { + hook(w, resp) + return + } + + // Default: copy response headers, status code, and body back to the caller. for k, vs := range resp.Header { for _, v := range vs { w.Header().Add(k, v) @@ -370,11 +390,6 @@ func (s *Shim) forward(w http.ResponseWriter, r *http.Request) { if _, err := io.Copy(w, resp.Body); err != nil { log.Error(err, "failed to copy upstream response body") } - // Observe after the body is fully consumed so the duration includes - // the time spent streaming the response from upstream. - s.upstreamRequestTimer. - WithLabelValues(r.Method, pattern, strconv.Itoa(resp.StatusCode)). - Observe(time.Since(start).Seconds()) } // RegisterRoutes binds all Placement API handlers to the given mux. The diff --git a/internal/shim/placement/shim_io.go b/internal/shim/placement/shim_io.go index 58321a793..604087cec 100644 --- a/internal/shim/placement/shim_io.go +++ b/internal/shim/placement/shim_io.go @@ -6,6 +6,7 @@ package placement import ( "bytes" "context" + "encoding/json" "io" "net/http" "strconv" @@ -80,6 +81,16 @@ func (w *shimResponseWriter) Write(b []byte) (int, error) { return n, err } +// writeJSON serializes v as JSON and writes it to w with the given HTTP status +// code. On encoding failure it sends a 500 Internal Server Error instead. +func (s *Shim) writeJSON(w http.ResponseWriter, statusCode int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + if err := json.NewEncoder(w).Encode(v); err != nil { + logf.Log.Error(err, "failed to encode JSON response") + } +} + // wrapHandler returns an http.HandlerFunc that wraps next with logging, // metrics collection, and request-ID propagation. It is used by // RegisterRoutes to apply uniform middleware to every placement API handler. diff --git a/internal/shim/placement/shim_io_test.go b/internal/shim/placement/shim_io_test.go index 1fca264f7..3eb75711f 100644 --- a/internal/shim/placement/shim_io_test.go +++ b/internal/shim/placement/shim_io_test.go @@ -154,6 +154,42 @@ func TestShimResponseWriterNilBodyBuf(t *testing.T) { } } +func TestWriteJSON(t *testing.T) { + s := &Shim{} + + t.Run("encodes struct as JSON with status", func(t *testing.T) { + rec := httptest.NewRecorder() + s.writeJSON(rec, http.StatusOK, map[string]string{"key": "value"}) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusOK) + } + if ct := rec.Header().Get("Content-Type"); ct != "application/json" { + t.Errorf("Content-Type = %q, want %q", ct, "application/json") + } + want := `{"key":"value"}` + "\n" + if got := rec.Body.String(); got != want { + t.Errorf("body = %q, want %q", got, want) + } + }) + + t.Run("uses provided status code", func(t *testing.T) { + rec := httptest.NewRecorder() + s.writeJSON(rec, http.StatusCreated, map[string]int{"n": 42}) + if rec.Code != http.StatusCreated { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusCreated) + } + }) + + t.Run("encodes nil as null", func(t *testing.T) { + rec := httptest.NewRecorder() + s.writeJSON(rec, http.StatusOK, nil) + want := "null\n" + if got := rec.Body.String(); got != want { + t.Errorf("body = %q, want %q", got, want) + } + }) +} + func TestWrapHandlerLogsAndMetrics(t *testing.T) { upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) diff --git a/internal/shim/placement/shim_test.go b/internal/shim/placement/shim_test.go index 09d5a4cf7..f8bacfd1f 100644 --- a/internal/shim/placement/shim_test.go +++ b/internal/shim/placement/shim_test.go @@ -272,7 +272,7 @@ func TestRegisterRoutesDownstreamMetrics(t *testing.T) { s.RegisterRoutes(mux) // Fire a request through the mux so the wrapper observes the downstream timer. - req := httptest.NewRequest(http.MethodGet, "/resource_providers", http.NoBody) + req := httptest.NewRequest(http.MethodGet, "/traits", http.NoBody) w := httptest.NewRecorder() mux.ServeHTTP(w, req) @@ -281,7 +281,7 @@ func TestRegisterRoutesDownstreamMetrics(t *testing.T) { } // The downstream timer should have exactly one observation for the // expected label combination (method, pattern, responsecode). - if n := histSampleCount(t, down, "GET", "/resource_providers", "200"); n != 1 { + if n := histSampleCount(t, down, "GET", "/traits", "200"); n != 1 { t.Errorf("downstream observation count = %d, want 1", n) } } @@ -351,7 +351,7 @@ func TestRequestIDPropagation(t *testing.T) { mux := http.NewServeMux() s.RegisterRoutes(mux) - req := httptest.NewRequest(http.MethodGet, "/resource_providers", http.NoBody) + req := httptest.NewRequest(http.MethodGet, "/traits", http.NoBody) req.Header.Set("X-OpenStack-Request-Id", wantID) w := httptest.NewRecorder() mux.ServeHTTP(w, req) From d6d1d8e02164a421bdbdf5ea67ba474b8c69dd6e Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Wed, 22 Apr 2026 13:40:23 +0200 Subject: [PATCH 3/5] Progress --- cmd/shim/main.go | 78 ++++-- internal/shim/placement/field_index.go | 4 +- internal/shim/placement/field_index_test.go | 18 +- .../handle_allocation_candidates_e2e.go | 3 +- .../shim/placement/handle_allocations_e2e.go | 3 +- .../shim/placement/handle_reshaper_e2e.go | 3 +- .../placement/handle_resource_classes_e2e.go | 3 +- ...handle_resource_provider_aggregates_e2e.go | 3 +- ...andle_resource_provider_allocations_e2e.go | 3 +- ...andle_resource_provider_inventories_e2e.go | 3 +- .../handle_resource_provider_traits_e2e.go | 3 +- .../handle_resource_provider_usages_e2e.go | 3 +- .../placement/handle_resource_providers.go | 30 +++ .../handle_resource_providers_e2e.go | 249 +++++++++++++++++- internal/shim/placement/handle_root_e2e.go | 3 +- internal/shim/placement/handle_traits_e2e.go | 3 +- internal/shim/placement/handle_usages_e2e.go | 3 +- internal/shim/placement/shim.go | 11 +- internal/shim/placement/shim_e2e.go | 7 +- 19 files changed, 384 insertions(+), 49 deletions(-) diff --git a/cmd/shim/main.go b/cmd/shim/main.go index c845b4a0e..45a6938de 100644 --- a/cmd/shim/main.go +++ b/cmd/shim/main.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/cluster" @@ -112,11 +113,41 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) // Custom entrypoint for placement shim e2e tests. + // Spins up a minimal manager with a multicluster client so that e2e + // tests can access the controller-runtime cache for hypervisor lookups. if runPlacementShimE2E { - if err := placement.RunE2E(ctx); err != nil { + mgrCtx, mgrCancel := context.WithCancel(ctx) + + mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ + Scheme: scheme, + Metrics: metricsserver.Options{BindAddress: "0"}, + HealthProbeBindAddress: "", + }) + if err != nil { + setupLog.Error(err, "unable to start e2e manager") + os.Exit(1) + } + multiclusterClient := setupMulticlusterClient(mgrCtx, mgr, restConfig) + if err := placement.IndexFields(mgrCtx, multiclusterClient); err != nil { + setupLog.Error(err, "unable to set up e2e field indexes") + os.Exit(1) + } + go func() { + if err := mgr.Start(mgrCtx); err != nil { + setupLog.Error(err, "e2e manager exited with error") + } + }() + if !mgr.GetCache().WaitForCacheSync(ctx) { + setupLog.Error(nil, "e2e cache sync failed") + mgrCancel() + os.Exit(1) + } + if err := placement.RunE2E(ctx, multiclusterClient); err != nil { setupLog.Error(err, "E2E tests failed") + mgrCancel() os.Exit(1) } + mgrCancel() os.Exit(0) } @@ -223,26 +254,7 @@ func main() { os.Exit(1) } - homeCluster, err := cluster.New(restConfig, func(o *cluster.Options) { o.Scheme = scheme }) - if err != nil { - setupLog.Error(err, "unable to create home cluster") - os.Exit(1) - } - if err := mgr.Add(homeCluster); err != nil { - setupLog.Error(err, "unable to add home cluster") - os.Exit(1) - } - multiclusterClient := &multicluster.Client{ - HomeCluster: homeCluster, - HomeRestConfig: restConfig, - HomeScheme: scheme, - ResourceRouters: multicluster.DefaultResourceRouters, - } - multiclusterClientConfig := conf.GetConfigOrDie[multicluster.ClientConfig]() - if err := multiclusterClient.InitFromConf(ctx, mgr, multiclusterClientConfig); err != nil { - setupLog.Error(err, "unable to initialize multicluster client") - os.Exit(1) - } + multiclusterClient := setupMulticlusterClient(ctx, mgr, restConfig) // Our custom monitoring registry can add prometheus labels to all metrics. // This is useful to distinguish metrics from different deployments. @@ -307,3 +319,27 @@ func main() { os.Exit(1) } } + +func setupMulticlusterClient(ctx context.Context, mgr manager.Manager, restConfig *rest.Config) *multicluster.Client { + homeCluster, err := cluster.New(restConfig, func(o *cluster.Options) { o.Scheme = scheme }) + if err != nil { + setupLog.Error(err, "unable to create home cluster") + os.Exit(1) + } + if err := mgr.Add(homeCluster); err != nil { + setupLog.Error(err, "unable to add home cluster") + os.Exit(1) + } + mcl := &multicluster.Client{ + HomeCluster: homeCluster, + HomeRestConfig: restConfig, + HomeScheme: scheme, + ResourceRouters: multicluster.DefaultResourceRouters, + } + mclConfig := conf.GetConfigOrDie[multicluster.ClientConfig]() + if err := mcl.InitFromConf(ctx, mgr, mclConfig); err != nil { + setupLog.Error(err, "unable to initialize multicluster client") + os.Exit(1) + } + return mcl +} diff --git a/internal/shim/placement/field_index.go b/internal/shim/placement/field_index.go index 6dd678ec1..23bf1470c 100644 --- a/internal/shim/placement/field_index.go +++ b/internal/shim/placement/field_index.go @@ -28,9 +28,9 @@ const ( idxHypervisorName = "metadata.name" ) -// indexFields indexes all fields that are needed by the shim to quickly +// IndexFields indexes all fields that are needed by the shim to quickly // look up objects from the controller-runtime cache. -func indexFields(ctx context.Context, mcl *multicluster.Client) error { +func IndexFields(ctx context.Context, mcl *multicluster.Client) error { log := logf.FromContext(ctx) log.Info("Setting up field indexes for the multicluster client") h, hl := &hv1.Hypervisor{}, &hv1.HypervisorList{} diff --git a/internal/shim/placement/field_index_test.go b/internal/shim/placement/field_index_test.go index f9721601e..8db1f2169 100644 --- a/internal/shim/placement/field_index_test.go +++ b/internal/shim/placement/field_index_test.go @@ -110,8 +110,8 @@ func TestIndexFields_RegistersAllIndexes(t *testing.T) { cc := &captureCache{} mcl := buildClient(t, cc) - if err := indexFields(context.Background(), mcl); err != nil { - t.Fatalf("indexFields: %v", err) + if err := IndexFields(context.Background(), mcl); err != nil { + t.Fatalf("IndexFields: %v", err) } wantFields := []string{ @@ -138,7 +138,7 @@ func TestIndexFields_PropagatesError(t *testing.T) { cc := &captureCache{err: wantErr} mcl := buildClient(t, cc) - err := indexFields(context.Background(), mcl) + err := IndexFields(context.Background(), mcl) if err == nil { t.Fatal("expected error, got nil") } @@ -150,8 +150,8 @@ func TestIndexFields_PropagatesError(t *testing.T) { func TestExtractor_HypervisorOpenStackId(t *testing.T) { cc := &captureCache{} mcl := buildClient(t, cc) - if err := indexFields(context.Background(), mcl); err != nil { - t.Fatalf("indexFields: %v", err) + if err := IndexFields(context.Background(), mcl); err != nil { + t.Fatalf("IndexFields: %v", err) } fn := extractorByField(t, cc.calls, idxHypervisorOpenStackId) @@ -191,8 +191,8 @@ func TestExtractor_HypervisorOpenStackId(t *testing.T) { func TestExtractor_HypervisorKubernetesId(t *testing.T) { cc := &captureCache{} mcl := buildClient(t, cc) - if err := indexFields(context.Background(), mcl); err != nil { - t.Fatalf("indexFields: %v", err) + if err := IndexFields(context.Background(), mcl); err != nil { + t.Fatalf("IndexFields: %v", err) } fn := extractorByField(t, cc.calls, idxHypervisorKubernetesId) @@ -232,8 +232,8 @@ func TestExtractor_HypervisorKubernetesId(t *testing.T) { func TestExtractor_HypervisorName(t *testing.T) { cc := &captureCache{} mcl := buildClient(t, cc) - if err := indexFields(context.Background(), mcl); err != nil { - t.Fatalf("indexFields: %v", err) + if err := IndexFields(context.Background(), mcl); err != nil { + t.Fatalf("IndexFields: %v", err) } fn := extractorByField(t, cc.calls, idxHypervisorName) diff --git a/internal/shim/placement/handle_allocation_candidates_e2e.go b/internal/shim/placement/handle_allocation_candidates_e2e.go index f6bffa924..e90193224 100644 --- a/internal/shim/placement/handle_allocation_candidates_e2e.go +++ b/internal/shim/placement/handle_allocation_candidates_e2e.go @@ -12,6 +12,7 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -25,7 +26,7 @@ import ( // - allocation_requests is non-empty (at least one candidate found). // - provider_summaries contains the test RP's UUID. // 4. Cleanup: DELETE the test RP and custom resource class. -func e2eTestAllocationCandidates(ctx context.Context) error { +func e2eTestAllocationCandidates(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running allocation candidates endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_allocations_e2e.go b/internal/shim/placement/handle_allocations_e2e.go index c37bfdd85..7f09a507b 100644 --- a/internal/shim/placement/handle_allocations_e2e.go +++ b/internal/shim/placement/handle_allocations_e2e.go @@ -11,6 +11,7 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -30,7 +31,7 @@ import ( // 7. GET /allocations/{consumer2} — verify the second allocation exists. // 8. DELETE /allocations/{consumer} — remove allocations for both consumers. // 9. Cleanup: DELETE the test RP and custom resource class. -func e2eTestAllocations(ctx context.Context) error { +func e2eTestAllocations(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running allocations endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_reshaper_e2e.go b/internal/shim/placement/handle_reshaper_e2e.go index 501951dc3..f43809de4 100644 --- a/internal/shim/placement/handle_reshaper_e2e.go +++ b/internal/shim/placement/handle_reshaper_e2e.go @@ -11,6 +11,7 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -30,7 +31,7 @@ import ( // 7. Verify allocation: GET /allocations/{consumer} — expect it to reference // RP-B (not RP-A). // 8. Cleanup: DELETE allocation, both RPs, and custom resource class. -func e2eTestReshaper(ctx context.Context) error { +func e2eTestReshaper(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running reshaper endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_resource_classes_e2e.go b/internal/shim/placement/handle_resource_classes_e2e.go index 7e4f27434..360e1ef80 100644 --- a/internal/shim/placement/handle_resource_classes_e2e.go +++ b/internal/shim/placement/handle_resource_classes_e2e.go @@ -10,6 +10,7 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -23,7 +24,7 @@ import ( // 5. GET /resource_classes/{name} — verify the custom class now exists. // 6. DELETE /resource_classes/{name} — remove the custom class. // 7. GET /resource_classes/{name} — confirm deletion returns 404. -func e2eTestResourceClasses(ctx context.Context) error { +func e2eTestResourceClasses(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running resource classes endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_resource_provider_aggregates_e2e.go b/internal/shim/placement/handle_resource_provider_aggregates_e2e.go index 72b2b78b4..b673c75f6 100644 --- a/internal/shim/placement/handle_resource_provider_aggregates_e2e.go +++ b/internal/shim/placement/handle_resource_provider_aggregates_e2e.go @@ -12,6 +12,7 @@ import ( "slices" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -26,7 +27,7 @@ import ( // 6. PUT /{uuid}/aggregates — clear aggregates by sending an empty list. // 7. GET /{uuid}/aggregates — verify aggregates are empty after clear. // 8. Cleanup: DELETE the test RP (also runs via deferred cleanup on failure). -func e2eTestResourceProviderAggregates(ctx context.Context) error { +func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running resource provider aggregates endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_resource_provider_allocations_e2e.go b/internal/shim/placement/handle_resource_provider_allocations_e2e.go index 2d749e285..a63c8cb4f 100644 --- a/internal/shim/placement/handle_resource_provider_allocations_e2e.go +++ b/internal/shim/placement/handle_resource_provider_allocations_e2e.go @@ -11,6 +11,7 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -24,7 +25,7 @@ import ( // 4. GET /resource_providers — list real providers, then GET /{uuid}/allocations // on up to 3 of them to verify the endpoint works with production data. // 5. Cleanup: DELETE the test RP. -func e2eTestResourceProviderAllocations(ctx context.Context) error { +func e2eTestResourceProviderAllocations(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running resource provider allocations endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_resource_provider_inventories_e2e.go b/internal/shim/placement/handle_resource_provider_inventories_e2e.go index ab4869186..354460e81 100644 --- a/internal/shim/placement/handle_resource_provider_inventories_e2e.go +++ b/internal/shim/placement/handle_resource_provider_inventories_e2e.go @@ -11,6 +11,7 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -31,7 +32,7 @@ import ( // 9. DELETE /{uuid}/inventories — bulk-delete all inventories at once. // // 10. Cleanup: DELETE the test RP and custom resource class. -func e2eTestResourceProviderInventories(ctx context.Context) error { +func e2eTestResourceProviderInventories(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running resource provider inventories endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_resource_provider_traits_e2e.go b/internal/shim/placement/handle_resource_provider_traits_e2e.go index 641be189f..c697ca7ff 100644 --- a/internal/shim/placement/handle_resource_provider_traits_e2e.go +++ b/internal/shim/placement/handle_resource_provider_traits_e2e.go @@ -11,6 +11,7 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -25,7 +26,7 @@ import ( // 6. DELETE /{uuid}/traits — disassociate all traits from the RP. // 7. GET /{uuid}/traits — verify the trait list is empty again. // 8. Cleanup: DELETE the test RP and custom trait. -func e2eTestResourceProviderTraits(ctx context.Context) error { +func e2eTestResourceProviderTraits(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running resource provider traits endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_resource_provider_usages_e2e.go b/internal/shim/placement/handle_resource_provider_usages_e2e.go index 54cb31613..c548162ba 100644 --- a/internal/shim/placement/handle_resource_provider_usages_e2e.go +++ b/internal/shim/placement/handle_resource_provider_usages_e2e.go @@ -11,6 +11,7 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -24,7 +25,7 @@ import ( // 4. GET /resource_providers — list real providers, then GET /{uuid}/usages // on up to 3 of them to verify the endpoint works with production data. // 5. Cleanup: DELETE the test RP. -func e2eTestResourceProviderUsages(ctx context.Context) error { +func e2eTestResourceProviderUsages(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running resource provider usages endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_resource_providers.go b/internal/shim/placement/handle_resource_providers.go index 44a0a316a..f47d5067e 100644 --- a/internal/shim/placement/handle_resource_providers.go +++ b/internal/shim/placement/handle_resource_providers.go @@ -160,6 +160,8 @@ func (s *Shim) HandleCreateResourceProvider(w http.ResponseWriter, r *http.Reque } // No conflict — restore the body and forward to upstream placement. + log.Info("no conflict with existing kvm hypervisor, forwarding create resource provider request to upstream placement", + "uuid", req.Name) r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) s.forward(w, r) } @@ -187,6 +189,8 @@ func (s *Shim) HandleShowResourceProvider(w http.ResponseWriter, r *http.Request err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) if apierrors.IsNotFound(err) || len(hvs.Items) == 0 { // Forward the request to placement if the hypervisor doesn't exist. + log.Info("resource provider not found in kubernetes, forwarding to upstream placement", + "uuid", uuid) s.forward(w, r) return } @@ -203,6 +207,8 @@ func (s *Shim) HandleShowResourceProvider(w http.ResponseWriter, r *http.Request } // Translate the hypervisor to a resource provider response. + log.Info("resource provider found in kubernetes, returning translated kvm hypervisor", + "uuid", uuid, "hypervisor", hvs.Items[0].Name) s.writeJSON(w, http.StatusOK, translateToResourceProvider(hvs.Items[0])) } @@ -252,6 +258,8 @@ func (s *Shim) HandleUpdateResourceProvider(w http.ResponseWriter, r *http.Reque err = s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) if apierrors.IsNotFound(err) || len(hvs.Items) == 0 { // Forward the request to placement if the hypervisor doesn't exist. + log.Info("resource provider not found in kubernetes, forwarding to upstream placement", + "uuid", uuid) r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) s.forward(w, r) return @@ -285,6 +293,8 @@ func (s *Shim) HandleUpdateResourceProvider(w http.ResponseWriter, r *http.Reque // If we get here, the request is valid but doesn't actually change anything, // so we can just return the current state of the resource provider. + log.Info("update to kvm hypervisor resource provider has no effect, returning current state", + "uuid", uuid, "name", hv.Name, "parentProviderUUID", hv.Status.HypervisorID) s.writeJSON(w, http.StatusOK, translateToResourceProvider(hv)) } @@ -309,6 +319,8 @@ func (s *Shim) HandleDeleteResourceProvider(w http.ResponseWriter, r *http.Reque err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) if apierrors.IsNotFound(err) || len(hvs.Items) == 0 { // Forward the request to placement if the hypervisor doesn't exist. + log.Info("resource provider not found in kubernetes, forwarding to upstream placement", + "uuid", uuid) s.forward(w, r) return } @@ -326,6 +338,7 @@ func (s *Shim) HandleDeleteResourceProvider(w http.ResponseWriter, r *http.Reque // KVM hypervisor resources are immutable to the extent that they cannot be // deleted, so we return a 409 Conflict to match the behavior of placement. + log.Error(nil, "attempt to delete a kvm hypervisor resource provider", "uuid", uuid) http.Error(w, "cannot delete a kvm hypervisor resource provider", http.StatusConflict) } @@ -376,6 +389,12 @@ func (s *Shim) HandleListResourceProviders(w http.ResponseWriter, r *http.Reques http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } + var uuids []string + for _, rp := range upstreamList.ResourceProviders { + uuids = append(uuids, rp.UUID) + } + log.Info("fetched resource providers from upstream placement", + "count", len(upstreamList.ResourceProviders), "uuids", uuids) // Fetch all KVM hypervisors from Kubernetes. query := r.URL.Query() @@ -385,6 +404,12 @@ func (s *Shim) HandleListResourceProviders(w http.ResponseWriter, r *http.Reques http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } + uuids = nil + for _, hv := range hvs.Items { + uuids = append(uuids, hv.Status.HypervisorID) + } + log.Info("fetched hypervisors from kubernetes", + "count", len(hvs.Items), "uuids", uuids) // Post-filter by query parameters. filtered := hvs.Items @@ -441,6 +466,11 @@ func (s *Shim) HandleListResourceProviders(w http.ResponseWriter, r *http.Reques merged = append(merged, rp) } + log.Info("merged resource providers from upstream placement and kubernetes", + "upstreamCount", len(upstreamList.ResourceProviders), + "kubernetesCount", len(filtered), + "mergedCount", len(merged), + ) s.writeJSON(w, http.StatusOK, listResourceProvidersResponse{ ResourceProviders: merged, }) diff --git a/internal/shim/placement/handle_resource_providers_e2e.go b/internal/shim/placement/handle_resource_providers_e2e.go index 3f62d7419..31cdb87c5 100644 --- a/internal/shim/placement/handle_resource_providers_e2e.go +++ b/internal/shim/placement/handle_resource_providers_e2e.go @@ -7,16 +7,22 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "github.com/gophercloud/gophercloud/v2" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) // e2eTestResourceProviders tests the /resource_providers and // /resource_providers/{uuid} endpoints. // +// Phase 1 — VMware path (passthrough to upstream placement): +// // 1. Pre-cleanup: DELETE any leftover test resource provider (ignore 404). // 2. GET /resource_providers — list all providers and verify the response. // 3. POST /resource_providers — create a test provider with a fixed UUID. @@ -24,7 +30,17 @@ import ( // 5. PUT /resource_providers/{uuid} — update the provider's name. // 6. DELETE /resource_providers/{uuid} — remove the test provider. // 7. GET /resource_providers/{uuid} — confirm deletion returns 404. -func e2eTestResourceProviders(ctx context.Context) error { +// +// Phase 2 — KVM path (backed by Hypervisor CRD): +// +// 1. Discover a KVM hypervisor with a non-empty OpenStack ID. +// 2. GET /resource_providers/{kvmUUID} — show translated resource provider. +// 3. PUT /resource_providers/{kvmUUID} — idempotent update with same name → 200. +// 4. PUT /resource_providers/{kvmUUID} — name change → 409. +// 5. DELETE /resource_providers/{kvmUUID} — protected → 409. +// 6. POST /resource_providers — name collision with KVM hypervisor → 409. +// 7. GET /resource_providers — list includes KVM provider (merge test). +func e2eTestResourceProviders(ctx context.Context, cl client.Client) error { log := logf.FromContext(ctx) log.Info("Running resource providers endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() @@ -40,6 +56,26 @@ func e2eTestResourceProviders(ctx context.Context) error { } log.Info("Successfully created openstack client for resource providers e2e test") + // ==================== Phase 1: VMware path ==================== + + log.Info("=== VMware path: passthrough resource provider tests ===") + if err := e2eVMwareResourceProviders(ctx, sc); err != nil { + return fmt.Errorf("VMware path: %w", err) + } + + // ==================== Phase 2: KVM path ==================== + + log.Info("=== KVM path: hypervisor-backed resource provider tests ===") + if err := e2eKVMResourceProviders(ctx, sc, cl); err != nil { + return fmt.Errorf("KVM path: %w", err) + } + + return nil +} + +func e2eVMwareResourceProviders(ctx context.Context, sc *gophercloud.ServiceClient) error { + log := logf.FromContext(ctx) + const testRPUUID = "e2e10000-0000-0000-0000-000000000001" const testRPName = "cortex-e2e-test-rp" const testRPNameUpdated = "cortex-e2e-test-rp-updated" @@ -254,6 +290,217 @@ func e2eTestResourceProviders(ctx context.Context) error { return nil } +func e2eKVMResourceProviders(ctx context.Context, sc *gophercloud.ServiceClient, cl client.Client) error { + log := logf.FromContext(ctx) + + // Discover a KVM hypervisor with a non-empty OpenStack ID. + var hvs hv1.HypervisorList + if err := cl.List(ctx, &hvs); err != nil { + log.Error(err, "failed to list hypervisors for KVM path") + return err + } + var kvmHV *hv1.Hypervisor + for i := range hvs.Items { + if hvs.Items[i].Status.HypervisorID != "" { + kvmHV = &hvs.Items[i] + break + } + } + if kvmHV == nil { + log.Info("No KVM hypervisors with OpenStack ID found, skipping KVM path tests") + return nil + } + kvmUUID := kvmHV.Status.HypervisorID + kvmName := kvmHV.Name + log.Info("Using KVM hypervisor for e2e tests", "uuid", kvmUUID, "name", kvmName) + + // Test GET /resource_providers/{kvmUUID} → 200 with translated RP. + log.Info("Testing GET /resource_providers/{uuid} for KVM hypervisor", "uuid", kvmUUID) + req, err := http.NewRequestWithContext(ctx, + http.MethodGet, sc.Endpoint+"/resource_providers/"+kvmUUID, http.NoBody) + if err != nil { + return fmt.Errorf("failed to create GET request for KVM RP: %w", err) + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.20") + req.Header.Set("Accept", "application/json") + resp, err := sc.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send GET request for KVM RP: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("GET /resource_providers/%s: expected 200, got %d", kvmUUID, resp.StatusCode) + } + var showRP struct { + UUID string `json:"uuid"` + Name string `json:"name"` + Links []struct { + Href string `json:"href"` + Rel string `json:"rel"` + } `json:"links"` + } + if err := json.NewDecoder(resp.Body).Decode(&showRP); err != nil { + return fmt.Errorf("failed to decode GET KVM RP response: %w", err) + } + if showRP.UUID != kvmUUID { + return fmt.Errorf("GET KVM RP: uuid = %q, want %q", showRP.UUID, kvmUUID) + } + if showRP.Name != kvmName { + return fmt.Errorf("GET KVM RP: name = %q, want %q", showRP.Name, kvmName) + } + if len(showRP.Links) == 0 { + return errors.New("GET KVM RP: expected non-empty links array") + } + log.Info("Successfully retrieved KVM resource provider", + "uuid", showRP.UUID, "name", showRP.Name, "links", len(showRP.Links)) + + // Test PUT /resource_providers/{kvmUUID} with same name → 200. + log.Info("Testing PUT /resource_providers/{uuid} with same name for KVM hypervisor", + "uuid", kvmUUID, "name", kvmName) + body, err := json.Marshal(map[string]string{"name": kvmName}) + if err != nil { + return fmt.Errorf("failed to marshal PUT body: %w", err) + } + req, err = http.NewRequestWithContext(ctx, + http.MethodPut, sc.Endpoint+"/resource_providers/"+kvmUUID, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("failed to create PUT request for KVM RP: %w", err) + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.20") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send PUT request for KVM RP: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("PUT /resource_providers/%s (same name): expected 200, got %d", kvmUUID, resp.StatusCode) + } + log.Info("Successfully performed idempotent PUT on KVM resource provider", "uuid", kvmUUID) + + // Test PUT /resource_providers/{kvmUUID} with different name → 409. + log.Info("Testing PUT /resource_providers/{uuid} with different name for KVM hypervisor", + "uuid", kvmUUID) + body, err = json.Marshal(map[string]string{"name": "cortex-e2e-kvm-renamed"}) + if err != nil { + return fmt.Errorf("failed to marshal PUT body: %w", err) + } + req, err = http.NewRequestWithContext(ctx, + http.MethodPut, sc.Endpoint+"/resource_providers/"+kvmUUID, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("failed to create PUT request for KVM RP rename: %w", err) + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.20") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send PUT rename request for KVM RP: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusConflict { + return fmt.Errorf("PUT /resource_providers/%s (rename): expected 409, got %d", kvmUUID, resp.StatusCode) + } + log.Info("Correctly received 409 on KVM resource provider rename", "uuid", kvmUUID) + + // Test DELETE /resource_providers/{kvmUUID} → 409. + log.Info("Testing DELETE /resource_providers/{uuid} for KVM hypervisor", "uuid", kvmUUID) + req, err = http.NewRequestWithContext(ctx, + http.MethodDelete, sc.Endpoint+"/resource_providers/"+kvmUUID, http.NoBody) + if err != nil { + return fmt.Errorf("failed to create DELETE request for KVM RP: %w", err) + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.20") + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send DELETE request for KVM RP: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusConflict { + return fmt.Errorf("DELETE /resource_providers/%s: expected 409, got %d", kvmUUID, resp.StatusCode) + } + log.Info("Correctly received 409 on KVM resource provider delete", "uuid", kvmUUID) + + // Test POST /resource_providers with KVM hypervisor name → 409. + log.Info("Testing POST /resource_providers with KVM hypervisor name", "name", kvmName) + body, err = json.Marshal(map[string]string{ + "name": kvmName, + "uuid": "e2e10000-0000-0000-0000-000000000099", + }) + if err != nil { + return fmt.Errorf("failed to marshal POST body: %w", err) + } + req, err = http.NewRequestWithContext(ctx, + http.MethodPost, sc.Endpoint+"/resource_providers", bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("failed to create POST request for KVM name collision: %w", err) + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.20") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send POST request for KVM name collision: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusConflict { + return fmt.Errorf("POST /resource_providers (KVM name): expected 409, got %d", resp.StatusCode) + } + log.Info("Correctly received 409 on POST with KVM hypervisor name", "name", kvmName) + + // Test GET /resource_providers → list includes KVM provider. + log.Info("Testing GET /resource_providers includes KVM hypervisor in merged list") + req, err = http.NewRequestWithContext(ctx, + http.MethodGet, sc.Endpoint+"/resource_providers", http.NoBody) + if err != nil { + return fmt.Errorf("failed to create GET list request: %w", err) + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.20") + req.Header.Set("Accept", "application/json") + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send GET list request: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("GET /resource_providers: expected 200, got %d", resp.StatusCode) + } + var listResp struct { + ResourceProviders []struct { + UUID string `json:"uuid"` + Name string `json:"name"` + } `json:"resource_providers"` + } + if err := json.NewDecoder(resp.Body).Decode(&listResp); err != nil { + return fmt.Errorf("failed to decode list response: %w", err) + } + found := false + for _, rp := range listResp.ResourceProviders { + if rp.UUID == kvmUUID { + if rp.Name != kvmName { + return fmt.Errorf("list KVM RP: name = %q, want %q", rp.Name, kvmName) + } + found = true + break + } + } + if !found { + return fmt.Errorf("GET /resource_providers: KVM hypervisor %s not found in merged list (%d providers)", + kvmUUID, len(listResp.ResourceProviders)) + } + log.Info("Successfully verified KVM hypervisor in merged resource provider list", + "uuid", kvmUUID, "totalProviders", len(listResp.ResourceProviders)) + + return nil +} + func init() { e2eTests = append(e2eTests, e2eTest{name: "resource_providers", run: e2eTestResourceProviders}) } diff --git a/internal/shim/placement/handle_root_e2e.go b/internal/shim/placement/handle_root_e2e.go index ce352f467..2e558705a 100644 --- a/internal/shim/placement/handle_root_e2e.go +++ b/internal/shim/placement/handle_root_e2e.go @@ -9,13 +9,14 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) // e2eTestGetRoot verifies basic connectivity to the placement shim. // It sends a GET request to the root endpoint (/) and checks that the shim // responds with a 2xx status code, confirming the service is reachable. -func e2eTestGetRoot(ctx context.Context) error { +func e2eTestGetRoot(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running root endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_traits_e2e.go b/internal/shim/placement/handle_traits_e2e.go index 69325216f..dabb6d74e 100644 --- a/internal/shim/placement/handle_traits_e2e.go +++ b/internal/shim/placement/handle_traits_e2e.go @@ -10,6 +10,7 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -20,7 +21,7 @@ import ( // 3. GET /traits/{name} — retrieve 5 individual existing traits by name. // 4. PUT /traits/{name} — create a custom test trait (CUSTOM_CORTEX_...). // 5. DELETE /traits/{name} — remove the custom test trait to clean up. -func e2eTestTraits(ctx context.Context) error { +func e2eTestTraits(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running traits endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/handle_usages_e2e.go b/internal/shim/placement/handle_usages_e2e.go index c4f055e5f..c7ac8c965 100644 --- a/internal/shim/placement/handle_usages_e2e.go +++ b/internal/shim/placement/handle_usages_e2e.go @@ -10,6 +10,7 @@ import ( "net/http" "github.com/cobaltcore-dev/cortex/pkg/conf" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -20,7 +21,7 @@ import ( // resource usages and verify each returns a successful response. // // This test is read-only and does not create any resources. -func e2eTestUsages(ctx context.Context) error { +func e2eTestUsages(ctx context.Context, _ client.Client) error { log := logf.FromContext(ctx) log.Info("Running usages endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() diff --git a/internal/shim/placement/shim.go b/internal/shim/placement/shim.go index f44979bbf..1e9ef5319 100644 --- a/internal/shim/placement/shim.go +++ b/internal/shim/placement/shim.go @@ -288,7 +288,7 @@ func (s *Shim) SetupWithManager(ctx context.Context, mgr ctrl.Manager) (err erro if !ok { return errors.New("provided client must be a multicluster client") } - if err := indexFields(ctx, mcl); err != nil { + if err := IndexFields(ctx, mcl); err != nil { return fmt.Errorf("failed to set up indexes: %w", err) } bldr := multicluster.BuildController(mcl, mgr) @@ -356,6 +356,15 @@ func (s *Shim) forwardWithHook(w http.ResponseWriter, r *http.Request, hook func // Copy all incoming headers. upstreamReq.Header = r.Header.Clone() + // When a hook will inspect the response body, remove Accept-Encoding + // so the upstream returns uncompressed data. Go's Transport would + // normally handle this automatically, but we're forwarding the + // downstream client's explicit Accept-Encoding, which bypasses the + // auto-decompression in net/http. + if hook != nil { + upstreamReq.Header.Del("Accept-Encoding") + } + pattern, _ := ctx.Value(routePatternKey).(string) start := time.Now() resp, err := s.httpClient.Do(upstreamReq) //nolint:gosec // G704: intentional reverse proxy diff --git a/internal/shim/placement/shim_e2e.go b/internal/shim/placement/shim_e2e.go index 7945fb77a..e7f9e30f3 100644 --- a/internal/shim/placement/shim_e2e.go +++ b/internal/shim/placement/shim_e2e.go @@ -12,6 +12,7 @@ import ( "github.com/cobaltcore-dev/cortex/pkg/sso" "github.com/gophercloud/gophercloud/v2" "github.com/gophercloud/gophercloud/v2/openstack" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -81,7 +82,7 @@ func makeE2EServiceClient(ctx context.Context, rc e2eRootConfig) (*gophercloud.S // e2eTest is a named end-to-end test registered by handler e2e files. type e2eTest struct { name string - run func(ctx context.Context) error + run func(ctx context.Context, cl client.Client) error } // e2eTests is populated by init() functions in the handle_*_e2e.go files. @@ -89,7 +90,7 @@ var e2eTests []e2eTest // RunE2E executes end-to-end tests for all placement shim handlers. // It stops on the first failure and returns the error. -func RunE2E(ctx context.Context) error { +func RunE2E(ctx context.Context, cl client.Client) error { log := logf.FromContext(ctx) log.Info("Running e2e test(s)", "count", len(e2eTests)) totalStart := time.Now() @@ -98,7 +99,7 @@ func RunE2E(ctx context.Context) error { "index", i+1, "total", len(e2eTests), "name", test.name) start := time.Now() testCtx := logf.IntoContext(ctx, log.WithName(test.name)) - if err := test.run(testCtx); err != nil { + if err := test.run(testCtx, cl); err != nil { log.Error(err, "FAIL e2e test", "index", i+1, "total", len(e2eTests), "name", test.name, "took_ms", time.Since(start).Milliseconds()) From 0b8b3b8a1d60e6d7e6265539613ebfcf1d1c6ba0 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Wed, 22 Apr 2026 14:03:15 +0200 Subject: [PATCH 4/5] Add features.enableResourceProviders feature flag Gate all 5 resource provider handlers behind a config flag so KVM-specific logic (hypervisor lookups, merged listings, 409 conflicts) can be toggled per deployment. When the flag is off (the default), handlers forward to upstream placement as a pure passthrough. The guard is placed defensively at the top of each handler, before any body reads or hypervisor lookups. --- .../bundles/cortex-placement-shim/values.yaml | 2 + .../placement/handle_resource_providers.go | 25 +++++ .../handle_resource_providers_test.go | 103 +++++++++++++++++- internal/shim/placement/shim.go | 13 +++ 4 files changed, 141 insertions(+), 2 deletions(-) diff --git a/helm/bundles/cortex-placement-shim/values.yaml b/helm/bundles/cortex-placement-shim/values.yaml index b7e7281f0..443d52a9c 100644 --- a/helm/bundles/cortex-placement-shim/values.yaml +++ b/helm/bundles/cortex-placement-shim/values.yaml @@ -41,6 +41,8 @@ cortex-shim: osProjectName: osUserDomainName: osProjectDomainName: + features: + enableResourceProviders: false auth: tokenCacheTTL: "5m" policies: diff --git a/internal/shim/placement/handle_resource_providers.go b/internal/shim/placement/handle_resource_providers.go index f47d5067e..1ee6a94b9 100644 --- a/internal/shim/placement/handle_resource_providers.go +++ b/internal/shim/placement/handle_resource_providers.go @@ -119,6 +119,11 @@ func (s *Shim) HandleCreateResourceProvider(w http.ResponseWriter, r *http.Reque ctx := r.Context() log := logf.FromContext(ctx) + if !s.config.Features.EnableResourceProviders { + s.forward(w, r) + return + } + // Buffer the body so we can decode it and still forward the original // bytes to upstream placement. bodyBytes, err := io.ReadAll(r.Body) @@ -184,6 +189,11 @@ func (s *Shim) HandleShowResourceProvider(w http.ResponseWriter, r *http.Request return } + if !s.config.Features.EnableResourceProviders { + s.forward(w, r) + return + } + // Try to find the hypervisor in kubernetes. var hvs hv1.HypervisorList err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) @@ -236,6 +246,11 @@ func (s *Shim) HandleUpdateResourceProvider(w http.ResponseWriter, r *http.Reque return } + if !s.config.Features.EnableResourceProviders { + s.forward(w, r) + return + } + bodyBytes, err := io.ReadAll(r.Body) if err != nil { log.Error(err, "failed to read request body") @@ -314,6 +329,11 @@ func (s *Shim) HandleDeleteResourceProvider(w http.ResponseWriter, r *http.Reque return } + if !s.config.Features.EnableResourceProviders { + s.forward(w, r) + return + } + // Try to find the hypervisor in kubernetes. var hvs hv1.HypervisorList err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) @@ -371,6 +391,11 @@ func (s *Shim) HandleListResourceProviders(w http.ResponseWriter, r *http.Reques ctx := r.Context() log := logf.FromContext(ctx) + if !s.config.Features.EnableResourceProviders { + s.forward(w, r) + return + } + s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { if resp.StatusCode != http.StatusOK { for k, vs := range resp.Header { diff --git a/internal/shim/placement/handle_resource_providers_test.go b/internal/shim/placement/handle_resource_providers_test.go index a4d6dbf76..b91e42da2 100644 --- a/internal/shim/placement/handle_resource_providers_test.go +++ b/internal/shim/placement/handle_resource_providers_test.go @@ -96,8 +96,11 @@ func newTestShimWithHypervisors(t *testing.T, upstreamStatus int, upstreamBody s t.Cleanup(upstream.Close) down, up := newTestTimers() return &Shim{ - Client: newFakeClient(t, hvs...), - config: config{PlacementURL: upstream.URL}, + Client: newFakeClient(t, hvs...), + config: config{ + PlacementURL: upstream.URL, + Features: featuresConfig{EnableResourceProviders: true}, + }, httpClient: upstream.Client(), maxBodyLogSize: 4096, downstreamRequestTimer: down, @@ -869,6 +872,102 @@ func TestHandleDeleteResourceProvider(t *testing.T) { }) } +// --------------------------------------------------------------------------- +// Feature flag tests +// --------------------------------------------------------------------------- + +func TestHandleResourceProviders_FeatureFlagOff(t *testing.T) { + hv1Obj := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-flagtest"}, + Status: hv1.HypervisorStatus{HypervisorID: validUUID}, + } + + newFlagOffShim := func(t *testing.T, upstreamStatus int, upstreamBody string) *Shim { + t.Helper() + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(upstreamStatus) + if _, err := w.Write([]byte(upstreamBody)); err != nil { + t.Errorf("failed to write upstream body: %v", err) + } + })) + t.Cleanup(upstream.Close) + down, up := newTestTimers() + return &Shim{ + Client: newFakeClient(t, hv1Obj), + config: config{ + PlacementURL: upstream.URL, + Features: featuresConfig{EnableResourceProviders: false}, + }, + httpClient: upstream.Client(), + maxBodyLogSize: 4096, + downstreamRequestTimer: down, + upstreamRequestTimer: up, + } + } + + t.Run("create forwards to upstream", func(t *testing.T) { + s := newFlagOffShim(t, http.StatusCreated, `{"uuid":"new","name":"hv-flagtest"}`) + body := `{"name":"hv-flagtest"}` + req := httptest.NewRequest(http.MethodPost, "/resource_providers", strings.NewReader(body)) + w := httptest.NewRecorder() + s.HandleCreateResourceProvider(w, req) + if w.Code != http.StatusCreated { + t.Fatalf("status = %d, want %d (flag off should forward, not 409)", w.Code, http.StatusCreated) + } + }) + + t.Run("show forwards to upstream", func(t *testing.T) { + s := newFlagOffShim(t, http.StatusOK, `{"uuid":"`+validUUID+`","name":"upstream-rp"}`) + w := serveHandler(t, http.MethodGet, "/resource_providers/{uuid}", + s.HandleShowResourceProvider, "/resource_providers/"+validUUID) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + if !strings.Contains(w.Body.String(), "upstream-rp") { + t.Errorf("expected upstream body, got %q", w.Body.String()) + } + }) + + t.Run("update forwards to upstream", func(t *testing.T) { + s := newFlagOffShim(t, http.StatusOK, `{"uuid":"`+validUUID+`","name":"different-name"}`) + body := `{"name":"different-name"}` + req := httptest.NewRequest(http.MethodPut, "/resource_providers/"+validUUID, strings.NewReader(body)) + mux := http.NewServeMux() + mux.HandleFunc("PUT /resource_providers/{uuid}", s.HandleUpdateResourceProvider) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d (flag off should forward, not 409)", w.Code, http.StatusOK) + } + }) + + t.Run("delete forwards to upstream", func(t *testing.T) { + s := newFlagOffShim(t, http.StatusNoContent, "") + w := serveHandler(t, http.MethodDelete, "/resource_providers/{uuid}", + s.HandleDeleteResourceProvider, "/resource_providers/"+validUUID) + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d (flag off should forward, not 409)", w.Code, http.StatusNoContent) + } + }) + + t.Run("list forwards to upstream without merge", func(t *testing.T) { + upstreamBody := `{"resource_providers":[{"uuid":"upstream-uuid","name":"upstream-rp","generation":1,"links":[]}]}` + s := newFlagOffShim(t, http.StatusOK, upstreamBody) + w := serveHandler(t, http.MethodGet, "/resource_providers", + s.HandleListResourceProviders, "/resource_providers") + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + if !strings.Contains(w.Body.String(), "upstream-uuid") { + t.Errorf("expected upstream body passthrough, got %q", w.Body.String()) + } + if strings.Contains(w.Body.String(), validUUID) { + t.Errorf("should not contain k8s hypervisor UUID when flag is off, got %q", w.Body.String()) + } + }) +} + // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- diff --git a/internal/shim/placement/shim.go b/internal/shim/placement/shim.go index 1e9ef5319..caab3ceab 100644 --- a/internal/shim/placement/shim.go +++ b/internal/shim/placement/shim.go @@ -48,6 +48,16 @@ type requestIDContextKey struct{} // header value through the request lifecycle for tracing. var requestIDKey = requestIDContextKey{} +// featuresConfig holds feature flags that can enable or disable specific +// shim behaviors. All flags default to off (false). +type featuresConfig struct { + // EnableResourceProviders enables the KVM-specific resource provider + // logic (hypervisor lookups, merged listings, 409 conflicts). When + // false, all resource provider handlers forward to upstream placement + // as a pure passthrough. + EnableResourceProviders bool `json:"enableResourceProviders,omitempty"` +} + // config holds configuration for the placement shim. type config struct { // SSO is an optional configuration for the certificates the http client @@ -85,6 +95,9 @@ type config struct { // Kubernetes resource.Quantity string (e.g. "4Ki"). Defaults to "4Ki" // when unset or empty. MaxBodyLogSize string `json:"maxBodyLogSize,omitempty"` + // Features holds feature flags for enabling or disabling specific + // shim behaviors. + Features featuresConfig `json:"features"` } // validate checks the config for required fields and returns an error if the From ec22b0adedb1ec6caed7df4e26c4098afc80d4d4 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Wed, 22 Apr 2026 14:22:04 +0200 Subject: [PATCH 5/5] Address CodeRabbit review feedback - Move feature flag guard before UUID validation in Show/Update/Delete so passthrough mode does not return local 400s for invalid UUIDs. - Fix self-parenting in translateToResourceProvider: root KVM providers now have ParentProviderUUID nil instead of pointing at themselves. The update handler rejects any non-nil parent_provider_uuid for KVM. - Add UUID collision check in HandleCreateResourceProvider: POSTs with a UUID matching an existing KVM hypervisor now return 409. - Return 400 (not 500) for malformed resources query filters, add negative amount and overflow validation for MEMORY_MB/DISK_GB. - Buffer JSON encoding in writeJSON before committing the HTTP status so encode failures correctly return 500. - Guard KVM e2e tests with the enableResourceProviders feature flag to avoid mutating upstream state when the flag is off. --- .../placement/handle_resource_providers.go | 86 ++++++++++++------- .../handle_resource_providers_e2e.go | 10 ++- .../handle_resource_providers_test.go | 21 +++-- internal/shim/placement/shim_io.go | 10 ++- 4 files changed, 86 insertions(+), 41 deletions(-) diff --git a/internal/shim/placement/handle_resource_providers.go b/internal/shim/placement/handle_resource_providers.go index 1ee6a94b9..18118d90a 100644 --- a/internal/shim/placement/handle_resource_providers.go +++ b/internal/shim/placement/handle_resource_providers.go @@ -9,6 +9,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "strconv" "strings" @@ -48,16 +49,13 @@ type resourceProviderLink struct { } // translateToResourceProvider constructs a resourceProvider from a Hypervisor. -// For the purposes of this shim, we treat each hypervisor as a root -// provider with no parent, so both ParentProviderUUID and RootProviderUUID -// are set to the hypervisor's UUID. +// KVM hypervisors are root providers with no parent. func translateToResourceProvider(hv hv1.Hypervisor) resourceProvider { return resourceProvider{ - Generation: hv.Generation, - UUID: hv.Status.HypervisorID, - Name: hv.Name, - ParentProviderUUID: &hv.Status.HypervisorID, - RootProviderUUID: &hv.Status.HypervisorID, + Generation: hv.Generation, + UUID: hv.Status.HypervisorID, + Name: hv.Name, + RootProviderUUID: &hv.Status.HypervisorID, Links: []resourceProviderLink{ { Rel: "self", @@ -91,10 +89,7 @@ func translateToResourceProvider(hv hv1.Hypervisor) resourceProvider { // POST /resource_providers. type createResourceProviderRequest struct { Name string `json:"name"` - - // Other fields are not relevant for the shim since we don't actually - // create resource providers in Kubernetes. We just check for name - // collisions with openstack placement. + UUID string `json:"uuid,omitempty"` } // HandleCreateResourceProvider handles POST /resource_providers requests. @@ -164,9 +159,26 @@ func (s *Shim) HandleCreateResourceProvider(w http.ResponseWriter, r *http.Reque return } + // Check UUID collision with existing KVM hypervisors. + if req.UUID != "" { + var hvsByUUID hv1.HypervisorList + err = s.List(ctx, &hvsByUUID, client.MatchingFields{idxHypervisorOpenStackId: req.UUID}) + if err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "failed to list hypervisors with OpenStack ID index") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if len(hvsByUUID.Items) > 0 { + log.Error(nil, "attempt to create a resource provider that conflicts with a kvm hypervisor UUID", + "uuid", req.UUID) + http.Error(w, "conflict with an existing kvm hypervisor resource provider", http.StatusConflict) + return + } + } + // No conflict — restore the body and forward to upstream placement. log.Info("no conflict with existing kvm hypervisor, forwarding create resource provider request to upstream placement", - "uuid", req.Name) + "name", req.Name) r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) s.forward(w, r) } @@ -184,16 +196,17 @@ func (s *Shim) HandleCreateResourceProvider(w http.ResponseWriter, r *http.Reque func (s *Shim) HandleShowResourceProvider(w http.ResponseWriter, r *http.Request) { ctx := r.Context() log := logf.FromContext(ctx) - uuid, ok := requiredUUIDPathParam(w, r, "uuid") - if !ok { - return - } if !s.config.Features.EnableResourceProviders { s.forward(w, r) return } + uuid, ok := requiredUUIDPathParam(w, r, "uuid") + if !ok { + return + } + // Try to find the hypervisor in kubernetes. var hvs hv1.HypervisorList err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) @@ -241,16 +254,17 @@ type updateResourceProviderRequest struct { func (s *Shim) HandleUpdateResourceProvider(w http.ResponseWriter, r *http.Request) { ctx := r.Context() log := logf.FromContext(ctx) - uuid, ok := requiredUUIDPathParam(w, r, "uuid") - if !ok { - return - } if !s.config.Features.EnableResourceProviders { s.forward(w, r) return } + uuid, ok := requiredUUIDPathParam(w, r, "uuid") + if !ok { + return + } + bodyBytes, err := io.ReadAll(r.Body) if err != nil { log.Error(err, "failed to read request body") @@ -298,10 +312,10 @@ func (s *Shim) HandleUpdateResourceProvider(w http.ResponseWriter, r *http.Reque http.Error(w, "cannot change the name of a kvm hypervisor resource provider", http.StatusConflict) return } - // The hypervisor's parent is also immutable, so if the request tries to - // change it, we return a 409 Conflict. - if req.ParentProviderUUID != nil && *req.ParentProviderUUID != hv.Status.HypervisorID { - log.Error(nil, "attempt to change the parent of a kvm hypervisor resource provider", "uuid", uuid, "currentParent", hv.Status.HypervisorID, "requestedParent", *req.ParentProviderUUID) + // KVM hypervisors are root providers with no parent. Any attempt to set a + // parent is rejected. + if req.ParentProviderUUID != nil { + log.Error(nil, "attempt to set parent on a kvm hypervisor resource provider", "uuid", uuid, "requestedParent", *req.ParentProviderUUID) http.Error(w, "cannot change the parent of a kvm hypervisor resource provider", http.StatusConflict) return } @@ -324,16 +338,17 @@ func (s *Shim) HandleUpdateResourceProvider(w http.ResponseWriter, r *http.Reque func (s *Shim) HandleDeleteResourceProvider(w http.ResponseWriter, r *http.Request) { ctx := r.Context() log := logf.FromContext(ctx) - uuid, ok := requiredUUIDPathParam(w, r, "uuid") - if !ok { - return - } if !s.config.Features.EnableResourceProviders { s.forward(w, r) return } + uuid, ok := requiredUUIDPathParam(w, r, "uuid") + if !ok { + return + } + // Try to find the hypervisor in kubernetes. var hvs hv1.HypervisorList err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) @@ -457,8 +472,8 @@ func (s *Shim) HandleListResourceProviders(w http.ResponseWriter, r *http.Reques var err error filtered, err = filterHypervisorsByResources(ctx, filtered, v) if err != nil { - log.Error(err, "failed to filter hypervisors by resources") - http.Error(w, "Internal Server Error", http.StatusInternalServerError) + log.Info("invalid resources query parameter", "error", err) + http.Error(w, "invalid resources query parameter: "+err.Error(), http.StatusBadRequest) return } } @@ -676,6 +691,9 @@ func filterHypervisorsByResources(ctx context.Context, hvs []hv1.Hypervisor, raw if err != nil { return nil, fmt.Errorf("invalid amount in resources token %q: %w", part, err) } + if amount < 0 { + return nil, fmt.Errorf("negative amount in resources token %q", part) + } mappedResourceName := "" var mappedResourceQuantity *resource.Quantity switch kv[0] { @@ -683,9 +701,15 @@ func filterHypervisorsByResources(ctx context.Context, hvs []hv1.Hypervisor, raw mappedResourceName = "cpu" mappedResourceQuantity = resource.NewQuantity(amount, resource.DecimalSI) case "MEMORY_MB": + if amount > math.MaxInt64/(1024*1024) { + return nil, fmt.Errorf("amount overflows bytes in resources token %q", part) + } mappedResourceName = "memory" mappedResourceQuantity = resource.NewQuantity(amount*1024*1024, resource.DecimalSI) case "DISK_GB": + if amount > math.MaxInt64/(1024*1024*1024) { + return nil, fmt.Errorf("amount overflows bytes in resources token %q", part) + } mappedResourceName = "disk" mappedResourceQuantity = resource.NewQuantity(amount*1024*1024*1024, resource.DecimalSI) default: diff --git a/internal/shim/placement/handle_resource_providers_e2e.go b/internal/shim/placement/handle_resource_providers_e2e.go index 31cdb87c5..fdd0091d2 100644 --- a/internal/shim/placement/handle_resource_providers_e2e.go +++ b/internal/shim/placement/handle_resource_providers_e2e.go @@ -65,9 +65,13 @@ func e2eTestResourceProviders(ctx context.Context, cl client.Client) error { // ==================== Phase 2: KVM path ==================== - log.Info("=== KVM path: hypervisor-backed resource provider tests ===") - if err := e2eKVMResourceProviders(ctx, sc, cl); err != nil { - return fmt.Errorf("KVM path: %w", err) + if !config.Features.EnableResourceProviders { + log.Info("Skipping KVM resource provider e2e tests because enableResourceProviders is false") + } else { + log.Info("=== KVM path: hypervisor-backed resource provider tests ===") + if err := e2eKVMResourceProviders(ctx, sc, cl); err != nil { + return fmt.Errorf("KVM path: %w", err) + } } return nil diff --git a/internal/shim/placement/handle_resource_providers_test.go b/internal/shim/placement/handle_resource_providers_test.go index b91e42da2..9b43bf818 100644 --- a/internal/shim/placement/handle_resource_providers_test.go +++ b/internal/shim/placement/handle_resource_providers_test.go @@ -426,8 +426,8 @@ func TestTranslateToResourceProvider(t *testing.T) { if rp.Generation != 3 { t.Errorf("Generation = %d, want 3", rp.Generation) } - if rp.ParentProviderUUID == nil || *rp.ParentProviderUUID != validUUID { - t.Errorf("ParentProviderUUID = %v, want %q", rp.ParentProviderUUID, validUUID) + if rp.ParentProviderUUID != nil { + t.Errorf("ParentProviderUUID = %v, want nil (root provider)", rp.ParentProviderUUID) } if rp.RootProviderUUID == nil || *rp.RootProviderUUID != validUUID { t.Errorf("RootProviderUUID = %v, want %q", rp.RootProviderUUID, validUUID) @@ -636,12 +636,12 @@ func TestHandleListResourceProviders_Filters(t *testing.T) { } }) - t.Run("invalid resources returns 500", func(t *testing.T) { + t.Run("invalid resources returns 400", func(t *testing.T) { s := newTestShimWithHypervisors(t, http.StatusOK, emptyUpstream, hv1Obj) w := serveHandler(t, http.MethodGet, "/resource_providers", s.HandleListResourceProviders, "/resource_providers?resources=INVALID") - if w.Code != http.StatusInternalServerError { - t.Fatalf("status = %d, want %d", w.Code, http.StatusInternalServerError) + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) } }) } @@ -674,6 +674,17 @@ func TestHandleCreateResourceProvider(t *testing.T) { } }) + t.Run("uuid conflict with existing hypervisor", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusCreated, `{}`, hv1Obj) + body := `{"name":"different-name","uuid":"` + validUUID + `"}` + req := httptest.NewRequest(http.MethodPost, "/resource_providers", strings.NewReader(body)) + w := httptest.NewRecorder() + s.HandleCreateResourceProvider(w, req) + if w.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d", w.Code, http.StatusConflict) + } + }) + t.Run("missing name returns 400", func(t *testing.T) { s := newTestShimWithHypervisors(t, http.StatusCreated, `{}`) body := `{"name":""}` diff --git a/internal/shim/placement/shim_io.go b/internal/shim/placement/shim_io.go index 604087cec..98d5ba0bc 100644 --- a/internal/shim/placement/shim_io.go +++ b/internal/shim/placement/shim_io.go @@ -84,10 +84,16 @@ func (w *shimResponseWriter) Write(b []byte) (int, error) { // writeJSON serializes v as JSON and writes it to w with the given HTTP status // code. On encoding failure it sends a 500 Internal Server Error instead. func (s *Shim) writeJSON(w http.ResponseWriter, statusCode int, v any) { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(v); err != nil { + logf.Log.Error(err, "failed to encode JSON response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) - if err := json.NewEncoder(w).Encode(v); err != nil { - logf.Log.Error(err, "failed to encode JSON response") + if _, err := w.Write(buf.Bytes()); err != nil { + logf.Log.Error(err, "failed to write JSON response") } }