Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions helm/bundles/cortex-nova/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,22 @@ cortex-scheduling-controllers:
enableReportUsage: true
# When false, the endpoint returns HTTP 503.
enableReportCapacity: true
# Maps flavor group IDs to resource flag configs; "*" acts as catch-all.
# Controls handlesCommitments, hasCapacity, hasQuota per resource type for each group.
flavorGroupResourceConfig:
"*": # catch-all (unknown groups)
ram:
handlesCommitments: false
hasCapacity: true
hasQuota: false
cores:
handlesCommitments: false
hasCapacity: true
hasQuota: false
instances:
handlesCommitments: false
hasCapacity: true
hasQuota: false
# OvercommitMappings is a list of mappings that map hypervisor traits to
# overcommit ratios. Note that this list is applied in order, so if there
# are multiple mappings applying to the same hypervisors, the last mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ ProcessLoop:
break ProcessLoop
}

if !commitments.FlavorGroupAcceptsCommitments(&flavorGroup) {
failedReason = commitments.FlavorGroupCommitmentRejectionReason(&flavorGroup)
if !api.config.ResourceConfigForGroup(flavorGroupName).RAM.HandlesCommitments {
failedReason = fmt.Sprintf("flavor group %q is not configured to handle commitments", flavorGroupName)
rollback = true
break ProcessLoop
}
Expand Down Expand Up @@ -247,6 +247,10 @@ ProcessLoop:
cr.Name = crName
if _, err := controllerutil.CreateOrUpdate(ctx, api.client, cr, func() error {
applyCRSpec(cr, stateDesired, allowRejection)
if cr.Annotations == nil {
cr.Annotations = make(map[string]string)
}
cr.Annotations[v1alpha1.AnnotationCreatorRequestID] = reservations.GlobalRequestIDFromContext(ctx)
Comment on lines +250 to +253
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Rollback is incomplete after introducing annotation mutation.

At Line 250-253, the handler mutates cr.Annotations, but snapshots only preserve Spec. If a batch rolls back, the previous annotation state is not restored (and can be lost on delete/recreate), so rollback is no longer all-or-nothing.

Proposed fix (snapshot + restore annotations)
 import (
 	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
+	"maps"
 	"net/http"
 	"sort"
 	"strings"
 	"time"
@@
 type crSnapshot struct {
 	crName     string
 	prevSpec   *v1alpha1.CommittedResourceSpec
+	prevAnnotations map[string]string
 	wasDeleted bool
 }
@@
 				} else {
 					specCopy := existing.Spec
 					snap.prevSpec = &specCopy
+					if existing.Annotations != nil {
+						snap.prevAnnotations = make(map[string]string, len(existing.Annotations))
+						maps.Copy(snap.prevAnnotations, existing.Annotations)
+					}
 				}
@@
 	if snap.wasDeleted {
 		if snap.prevSpec == nil {
 			return // was absent before deletion attempt; nothing to undo
 		}
 		cr := &v1alpha1.CommittedResource{}
 		cr.Name = snap.crName
 		cr.Spec = *snap.prevSpec
+		if snap.prevAnnotations != nil {
+			cr.Annotations = make(map[string]string, len(snap.prevAnnotations))
+			maps.Copy(cr.Annotations, snap.prevAnnotations)
+		}
 		if err := k8sClient.Create(ctx, cr); client.IgnoreAlreadyExists(err) != nil {
 			logger.Error(err, "failed to re-create CommittedResource CRD during rollback", "name", snap.crName)
 		}
 		return
 	}
@@
 	cr.Spec = *snap.prevSpec
+	if snap.prevAnnotations != nil {
+		cr.Annotations = make(map[string]string, len(snap.prevAnnotations))
+		maps.Copy(cr.Annotations, snap.prevAnnotations)
+	} else {
+		cr.Annotations = nil
+	}
 	if err := k8sClient.Update(ctx, cr); err != nil {
 		logger.Error(err, "failed to restore CommittedResource CRD spec during rollback", "name", snap.crName)
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/scheduling/reservations/commitments/api/change_commitments.go`
around lines 250 - 253, The handler mutates cr.Annotations (setting
v1alpha1.AnnotationCreatorRequestID via
reservations.GlobalRequestIDFromContext(ctx)) but snapshots only capture Spec so
rollbacks miss restoring annotations; update the snapshot/restore logic to
include Annotations (deep-copy the map) when creating a snapshot and restore
that map on rollback so the previous annotation state is fully reverted; ensure
any snapshot struct or methods that reference the resource snapshot (e.g., where
Spec is captured) are extended to store and restore Annotations and use safe nil
checks and map copies when applying the restored annotations back to cr.

return nil
}); err != nil {
failedReason = fmt.Sprintf("commitment %s: failed to write CommittedResource CRD: %v", commitment.UUID, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func newE2EEnv(t *testing.T, flavors []*TestFlavor, infoVersion int64, scheduler
cfg := commitments.DefaultAPIConfig()
cfg.WatchTimeout = metav1.Duration{Duration: 5 * time.Second}
cfg.WatchPollInterval = metav1.Duration{Duration: 100 * time.Millisecond}
cfg.FlavorGroupResourceConfig = map[string]commitments.FlavorGroupResourcesConfig{
"*": {RAM: commitments.ResourceTypeConfig{HandlesCommitments: true, HasCapacity: true}},
}
api := NewAPIWithConfig(k8sClient, cfg, nil)
mux := http.NewServeMux()
api.Init(mux, prometheus.NewRegistry(), log.Log)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func TestHandleChangeCommitments(t *testing.T) {
cfg := commitments.DefaultAPIConfig()
cfg.WatchTimeout = metav1.Duration{}
cfg.WatchPollInterval = metav1.Duration{Duration: 100 * time.Millisecond}
cfg.FlavorGroupResourceConfig = map[string]commitments.FlavorGroupResourcesConfig{
"*": {RAM: commitments.ResourceTypeConfig{HandlesCommitments: true, HasCapacity: true}},
}
return &cfg
}(),
ExpectedAPIResponse: newAPIResponse("timeout reached while processing commitment changes"),
Expand Down Expand Up @@ -709,7 +712,16 @@ func newCRTestEnv(t *testing.T, tc CommitmentChangeTestCase) *CRTestEnv {
if tc.CustomConfig != nil {
api = NewAPIWithConfig(wrapped, *tc.CustomConfig, nil)
} else {
api = NewAPI(wrapped)
// Default test config: all flavor groups accept RAM commitments via wildcard.
cfg := commitments.DefaultAPIConfig()
cfg.FlavorGroupResourceConfig = map[string]commitments.FlavorGroupResourcesConfig{
"*": {
RAM: commitments.ResourceTypeConfig{HandlesCommitments: true, HasCapacity: true},
Cores: commitments.ResourceTypeConfig{HasCapacity: true},
Instances: commitments.ResourceTypeConfig{HasCapacity: true},
},
}
api = NewAPIWithConfig(wrapped, cfg, nil)
}
mux := http.NewServeMux()
registry := prometheus.NewRegistry()
Expand Down
38 changes: 17 additions & 21 deletions internal/scheduling/reservations/commitments/api/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ func (api *HTTPAPI) buildServiceInfo(ctx context.Context, logger logr.Logger) (l
// Build resources map
resources := make(map[liquid.ResourceName]liquid.ResourceInfo)
for groupName, groupData := range flavorGroups {
// Determine if this group accepts commitments (requires fixed RAM/core ratio)
handlesCommitments := commitments.FlavorGroupAcceptsCommitments(&groupData)

// All flavor groups are registered for usage reporting.
// Only those with a fixed RAM/core ratio have HandlesCommitments=true.
resCfg := api.config.ResourceConfigForGroup(groupName)

flavorNames := make([]string, 0, len(groupData.Flavors))
for _, flavor := range groupData.Flavors {
Expand Down Expand Up @@ -157,12 +153,12 @@ func (api *HTTPAPI) buildServiceInfo(ctx context.Context, logger logr.Logger) (l
groupData.SmallestFlavor.MemoryMB,
flavorListStr,
),
Unit: ramUnit, // Non-standard unit: multiples of smallest flavor RAM
Unit: ramUnit,
Topology: liquid.AZAwareTopology,
NeedsResourceDemand: false,
HasCapacity: true, // We report capacity via /commitments/v1/report-capacity
HasQuota: false,
HandlesCommitments: handlesCommitments, // Only groups with fixed ratio accept commitments
HasCapacity: resCfg.RAM.HasCapacity,
HasQuota: resCfg.RAM.HasQuota,
HandlesCommitments: resCfg.RAM.HandlesCommitments,
Attributes: attrsJSON,
}

Expand All @@ -173,13 +169,13 @@ func (api *HTTPAPI) buildServiceInfo(ctx context.Context, logger logr.Logger) (l
"CPU cores (usable by: %s)",
flavorListStr,
),
Unit: liquid.UnitNone, // Countable unit (omitted in JSON = "1")
Topology: liquid.AZAwareTopology, // Same topology as RAM
Unit: liquid.UnitNone,
Topology: liquid.AZAwareTopology,
NeedsResourceDemand: false,
HasCapacity: true, // We report capacity (as 0 for now)
HasQuota: false, // No quota enforcement
HandlesCommitments: false, // Cores are derived from RAM commitments
Attributes: attrsJSON, // Same attributes (ratio info)
HasCapacity: resCfg.Cores.HasCapacity,
HasQuota: resCfg.Cores.HasQuota,
HandlesCommitments: resCfg.Cores.HandlesCommitments,
Attributes: attrsJSON,
}

// === 3. Instances Resource ===
Expand All @@ -189,13 +185,13 @@ func (api *HTTPAPI) buildServiceInfo(ctx context.Context, logger logr.Logger) (l
"instances (usable by: %s)",
flavorListStr,
),
Unit: liquid.UnitNone, // Countable unit (omitted in JSON = "1")
Topology: liquid.AZAwareTopology, // Same topology as RAM
Unit: liquid.UnitNone,
Topology: liquid.AZAwareTopology,
NeedsResourceDemand: false,
HasCapacity: true, // We report capacity (as 0 for now)
HasQuota: false, // No quota enforcement
HandlesCommitments: false, // Instances are derived from RAM commitments
Attributes: attrsJSON, // Same attributes
HasCapacity: resCfg.Instances.HasCapacity,
HasQuota: resCfg.Instances.HasQuota,
HandlesCommitments: resCfg.Instances.HandlesCommitments,
Attributes: attrsJSON,
}

logger.V(1).Info("registered flavor group resources",
Expand Down
67 changes: 32 additions & 35 deletions internal/scheduling/reservations/commitments/api/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
commitments "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/commitments"
"github.com/sapcc/go-api-declarations/liquid"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -137,11 +138,9 @@ func TestHandleInfo_InvalidFlavorMemory(t *testing.T) {
}
}

func TestHandleInfo_HasCapacityEqualsHandlesCommitments(t *testing.T) {
// Test that ALL flavor groups get resources created:
// - Three resources are created per group: _ram, _cores, _instances
// - Only _ram of groups with FIXED ratio has HandlesCommitments=true
// - All resources have HasCapacity=true
func TestHandleInfo_ResourceFlagsFromConfig(t *testing.T) {
// Test that resource flags (HandlesCommitments, HasCapacity, HasQuota) are read from config,
// not derived from flavor group metadata. Both groups get resources regardless of ratio.
scheme := runtime.NewScheme()
if err := v1alpha1.AddToScheme(scheme); err != nil {
t.Fatalf("failed to add scheme: %v", err)
Expand All @@ -150,29 +149,25 @@ func TestHandleInfo_HasCapacityEqualsHandlesCommitments(t *testing.T) {
// Create flavor groups knowledge with both fixed and variable ratio groups
features := []map[string]interface{}{
{
// Group with fixed ratio - should accept commitments
// Creates 3 resources: _ram, _cores, _instances
"name": "hana_fixed",
"flavors": []map[string]interface{}{
{"name": "hana_c4_m16", "vcpus": 4, "memoryMB": 16384, "diskGB": 50},
{"name": "hana_c8_m32", "vcpus": 8, "memoryMB": 32768, "diskGB": 100},
},
"largestFlavor": map[string]interface{}{"name": "hana_c8_m32", "vcpus": 8, "memoryMB": 32768, "diskGB": 100},
"smallestFlavor": map[string]interface{}{"name": "hana_c4_m16", "vcpus": 4, "memoryMB": 16384, "diskGB": 50},
"ramCoreRatio": 4096, // Fixed: 4096 MiB per vCPU for all flavors
"ramCoreRatio": 4096,
},
{
// Group with variable ratio - should NOT accept commitments
// Will be SKIPPED entirely (no resources created)
"name": "v2_variable",
"flavors": []map[string]interface{}{
{"name": "v2_c4_m8", "vcpus": 4, "memoryMB": 8192, "diskGB": 50}, // 2048 MiB/vCPU
{"name": "v2_c4_m64", "vcpus": 4, "memoryMB": 65536, "diskGB": 100}, // 16384 MiB/vCPU
{"name": "v2_c4_m8", "vcpus": 4, "memoryMB": 8192, "diskGB": 50},
{"name": "v2_c4_m64", "vcpus": 4, "memoryMB": 65536, "diskGB": 100},
},
"largestFlavor": map[string]interface{}{"name": "v2_c4_m64", "vcpus": 4, "memoryMB": 65536, "diskGB": 100},
"smallestFlavor": map[string]interface{}{"name": "v2_c4_m8", "vcpus": 4, "memoryMB": 8192, "diskGB": 50},
"ramCoreRatioMin": 2048, // Variable: min ratio
"ramCoreRatioMax": 16384, // Variable: max ratio
"ramCoreRatioMin": 2048,
"ramCoreRatioMax": 16384,
},
}

Expand All @@ -199,7 +194,21 @@ func TestHandleInfo_HasCapacityEqualsHandlesCommitments(t *testing.T) {
WithObjects(knowledge).
Build()

api := NewAPI(k8sClient)
// hana_fixed: ram accepts commitments; v2_variable: nothing accepts commitments
cfg := commitments.DefaultAPIConfig()
cfg.FlavorGroupResourceConfig = map[string]commitments.FlavorGroupResourcesConfig{
"hana_fixed": {
RAM: commitments.ResourceTypeConfig{HandlesCommitments: true, HasCapacity: true},
Cores: commitments.ResourceTypeConfig{HasCapacity: true},
Instances: commitments.ResourceTypeConfig{HasCapacity: true},
},
"*": {
RAM: commitments.ResourceTypeConfig{HasCapacity: true},
Cores: commitments.ResourceTypeConfig{HasCapacity: true},
Instances: commitments.ResourceTypeConfig{HasCapacity: true},
},
}
api := NewAPIWithConfig(k8sClient, cfg, nil)

req := httptest.NewRequest(http.MethodGet, "/commitments/v1/info", http.NoBody)
w := httptest.NewRecorder()
Expand All @@ -217,14 +226,10 @@ func TestHandleInfo_HasCapacityEqualsHandlesCommitments(t *testing.T) {
t.Fatalf("failed to decode response: %v", err)
}

// Verify we have 6 resources (3 per flavor group, both groups included)
// hana_fixed generates: _ram, _cores, _instances
// v2_variable generates: _ram, _cores, _instances
if len(serviceInfo.Resources) != 6 {
t.Fatalf("expected 6 resources (3 per flavor group), got %d", len(serviceInfo.Resources))
}

// Test RAM resource: hw_version_hana_fixed_ram
ramResource, ok := serviceInfo.Resources["hw_version_hana_fixed_ram"]
if !ok {
t.Fatal("expected hw_version_hana_fixed_ram resource to exist")
Expand All @@ -233,10 +238,9 @@ func TestHandleInfo_HasCapacityEqualsHandlesCommitments(t *testing.T) {
t.Error("hw_version_hana_fixed_ram: expected HasCapacity=true")
}
if !ramResource.HandlesCommitments {
t.Error("hw_version_hana_fixed_ram: expected HandlesCommitments=true (RAM is primary commitment resource)")
t.Error("hw_version_hana_fixed_ram: expected HandlesCommitments=true (set in config)")
}

// Test Cores resource: hw_version_hana_fixed_cores
coresResource, ok := serviceInfo.Resources["hw_version_hana_fixed_cores"]
if !ok {
t.Fatal("expected hw_version_hana_fixed_cores resource to exist")
Expand All @@ -245,10 +249,9 @@ func TestHandleInfo_HasCapacityEqualsHandlesCommitments(t *testing.T) {
t.Error("hw_version_hana_fixed_cores: expected HasCapacity=true")
}
if coresResource.HandlesCommitments {
t.Error("hw_version_hana_fixed_cores: expected HandlesCommitments=false (cores are derived)")
t.Error("hw_version_hana_fixed_cores: expected HandlesCommitments=false")
}

// Test Instances resource: hw_version_hana_fixed_instances
instancesResource, ok := serviceInfo.Resources["hw_version_hana_fixed_instances"]
if !ok {
t.Fatal("expected hw_version_hana_fixed_instances resource to exist")
Expand All @@ -257,40 +260,34 @@ func TestHandleInfo_HasCapacityEqualsHandlesCommitments(t *testing.T) {
t.Error("hw_version_hana_fixed_instances: expected HasCapacity=true")
}
if instancesResource.HandlesCommitments {
t.Error("hw_version_hana_fixed_instances: expected HandlesCommitments=false (instances are derived)")
t.Error("hw_version_hana_fixed_instances: expected HandlesCommitments=false")
}

// Variable ratio group DOES have resources now, but HandlesCommitments=false for RAM
// v2_variable is covered by "*" wildcard: HasCapacity=true, HandlesCommitments=false
v2RamResource, ok := serviceInfo.Resources["hw_version_v2_variable_ram"]
if !ok {
t.Fatal("expected hw_version_v2_variable_ram resource to exist (all groups included)")
t.Fatal("expected hw_version_v2_variable_ram resource to exist")
}
if !v2RamResource.HasCapacity {
t.Error("hw_version_v2_variable_ram: expected HasCapacity=true")
}
if v2RamResource.HandlesCommitments {
t.Error("hw_version_v2_variable_ram: expected HandlesCommitments=false (variable ratio)")
t.Error("hw_version_v2_variable_ram: expected HandlesCommitments=false (not in config)")
}

v2CoresResource, ok := serviceInfo.Resources["hw_version_v2_variable_cores"]
if !ok {
t.Fatal("expected hw_version_v2_variable_cores resource to exist (all groups included)")
t.Fatal("expected hw_version_v2_variable_cores resource to exist")
}
if !v2CoresResource.HasCapacity {
t.Error("hw_version_v2_variable_cores: expected HasCapacity=true")
}
if v2CoresResource.HandlesCommitments {
t.Error("hw_version_v2_variable_cores: expected HandlesCommitments=false")
}

v2InstancesResource, ok := serviceInfo.Resources["hw_version_v2_variable_instances"]
if !ok {
t.Fatal("expected hw_version_v2_variable_instances resource to exist (all groups included)")
t.Fatal("expected hw_version_v2_variable_instances resource to exist")
}
if !v2InstancesResource.HasCapacity {
t.Error("hw_version_v2_variable_instances: expected HasCapacity=true")
}
if v2InstancesResource.HandlesCommitments {
t.Error("hw_version_v2_variable_instances: expected HandlesCommitments=false")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ func (r *CommittedResourceController) Reconcile(ctx context.Context, req ctrl.Re
return ctrl.Result{}, client.IgnoreNotFound(err)
}

ctx = WithNewGlobalRequestID(ctx)
if creatorReq := cr.Annotations[v1alpha1.AnnotationCreatorRequestID]; creatorReq != "" {
ctx = WithGlobalRequestID(ctx, creatorReq)
} else {
ctx = WithNewGlobalRequestID(ctx)
}
logger := LoggerFromContext(ctx).WithValues(
"component", "committed-resource-controller",
"committedResource", req.Name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,27 +360,13 @@ func TestCommittedResourceController_PlacementFailure(t *testing.T) {
expectedReason: "Reserving",
expectRequeue: true,
},
{
name: "guaranteed AllowRejection=true: rejects on failure, no retry",
state: v1alpha1.CommitmentStatusGuaranteed,
allowRejection: true,
expectedReason: "Rejected",
expectRequeue: false,
},
{
name: "confirmed AllowRejection=true: rejects on failure, no retry",
state: v1alpha1.CommitmentStatusConfirmed,
allowRejection: true,
expectedReason: "Rejected",
expectRequeue: false,
},
{
name: "guaranteed AllowRejection=false: retries on failure",
state: v1alpha1.CommitmentStatusGuaranteed,
allowRejection: false,
expectedReason: "Reserving",
expectRequeue: true,
},
{
name: "confirmed AllowRejection=false: retries on failure",
state: v1alpha1.CommitmentStatusConfirmed,
Expand Down Expand Up @@ -503,31 +489,6 @@ func TestCommittedResourceController_BadSpec(t *testing.T) {
}
}

func TestCommittedResourceController_Idempotent(t *testing.T) {
scheme := newCRTestScheme(t)
cr := newTestCommittedResource("test-cr", v1alpha1.CommitmentStatusConfirmed)
k8sClient := newCRTestClient(scheme, cr, newTestFlavorKnowledge())
controller := &CommittedResourceController{Client: k8sClient, Scheme: scheme, Conf: CommittedResourceControllerConfig{}}

// Round 1: creates reservation, waits for placement.
if _, err := controller.Reconcile(context.Background(), reconcileReq(cr.Name)); err != nil {
t.Fatalf("reconcile 1: %v", err)
}
// Simulate reservation controller setting Ready=True.
setChildReservationsReady(t, k8sClient, cr.Spec.CommitmentUUID)
// Rounds 2 and 3: accepts, then stays accepted.
for i := 2; i <= 3; i++ {
if _, err := controller.Reconcile(context.Background(), reconcileReq(cr.Name)); err != nil {
t.Fatalf("reconcile %d: %v", i, err)
}
}

if got := countChildReservations(t, k8sClient, cr.Spec.CommitmentUUID); got != 1 {
t.Errorf("expected 1 child reservation after 3 reconciles (idempotency), got %d", got)
}
assertCondition(t, k8sClient, cr.Name, metav1.ConditionTrue, "Accepted")
}

// ============================================================================
// Tests: checkChildReservationStatus generation guard
// ============================================================================
Expand Down
Loading
Loading