diff --git a/api/external/cinder/messages.go b/api/external/cinder/messages.go index 260e93815..632189629 100644 --- a/api/external/cinder/messages.go +++ b/api/external/cinder/messages.go @@ -6,6 +6,7 @@ package api import ( "log/slog" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" ) @@ -30,8 +31,11 @@ type ExternalSchedulerRequest struct { Weights map[string]float64 `json:"weights"` // The name of the pipeline to execute. Pipeline string `json:"pipeline"` + // Options configure the pipeline behavior for this scheduling call. + Options scheduling.Options `json:"options,omitempty"` } +func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options } func (r ExternalSchedulerRequest) GetHosts() []string { hosts := make([]string, len(r.Hosts)) for i, host := range r.Hosts { diff --git a/api/external/ironcore/messages.go b/api/external/ironcore/messages.go index ac517f61a..25c71c0a5 100644 --- a/api/external/ironcore/messages.go +++ b/api/external/ironcore/messages.go @@ -7,14 +7,18 @@ import ( "log/slog" ironcorev1alpha1 "github.com/cobaltcore-dev/cortex/api/external/ironcore/v1alpha1" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" ) type MachinePipelineRequest struct { // The available machine pools. Pools []ironcorev1alpha1.MachinePool `json:"pools"` + // Options configure the pipeline behavior for this scheduling call. + Options scheduling.Options `json:"options,omitempty"` } +func (r MachinePipelineRequest) GetOptions() scheduling.Options { return r.Options } func (r MachinePipelineRequest) GetHosts() []string { hosts := make([]string, len(r.Pools)) for i, host := range r.Pools { diff --git a/api/external/manila/messages.go b/api/external/manila/messages.go index 5255a0d4f..ad0aef2f1 100644 --- a/api/external/manila/messages.go +++ b/api/external/manila/messages.go @@ -6,6 +6,7 @@ package api import ( "log/slog" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" ) @@ -30,8 +31,11 @@ type ExternalSchedulerRequest struct { Weights map[string]float64 `json:"weights"` // The name of the pipeline to execute. Pipeline string `json:"pipeline"` + // Options configure the pipeline behavior for this scheduling call. + Options scheduling.Options `json:"options,omitempty"` } +func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options } func (r ExternalSchedulerRequest) GetHosts() []string { hosts := make([]string, len(r.Hosts)) for i, host := range r.Hosts { diff --git a/api/external/nova/messages.go b/api/external/nova/messages.go index e82568941..e83d37ced 100644 --- a/api/external/nova/messages.go +++ b/api/external/nova/messages.go @@ -9,6 +9,7 @@ import ( "log/slog" "strings" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" ) @@ -37,8 +38,14 @@ type ExternalSchedulerRequest struct { // The name of the pipeline to execute. Pipeline string `json:"pipeline"` + + // Options configure the pipeline behavior for this scheduling call. + // Set by the caller (CR controller, failover controller, Nova). + // Nova does not set these; Cortex fills in config-derived defaults server-side. + Options scheduling.Options `json:"options,omitempty"` } +func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options } func (r ExternalSchedulerRequest) GetHosts() []string { hosts := make([]string, len(r.Hosts)) for i, host := range r.Hosts { diff --git a/api/external/pods/messages.go b/api/external/pods/messages.go index 3ec329b39..3d0930ef5 100644 --- a/api/external/pods/messages.go +++ b/api/external/pods/messages.go @@ -6,6 +6,7 @@ package pods import ( "log/slog" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" corev1 "k8s.io/api/core/v1" ) @@ -15,8 +16,11 @@ type PodPipelineRequest struct { Nodes []corev1.Node `json:"nodes"` // The pod to be scheduled. Pod corev1.Pod `json:"pod"` + // Options configure the pipeline behavior for this scheduling call. + Options scheduling.Options `json:"options,omitempty"` } +func (r PodPipelineRequest) GetOptions() scheduling.Options { return r.Options } func (r PodPipelineRequest) GetHosts() []string { hosts := make([]string, len(r.Nodes)) for i, host := range r.Nodes { diff --git a/api/scheduling/options.go b/api/scheduling/options.go new file mode 100644 index 000000000..c182fbdd4 --- /dev/null +++ b/api/scheduling/options.go @@ -0,0 +1,46 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package scheduling + +import ( + "errors" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" +) + +// Options configure the behavior of a single pipeline run at call time. +// These are distinct from per-step YAML options (FilterWeigherPipelineStepOpts), +// which are static and set when the pipeline is initialized. +type Options struct { + // ReadOnly means the pipeline run does not modify shared scheduling state (reservations, + // history, inflight records). Concurrent read-only runs are safe under a shared read lock. + // Note: the controller may still write the Decision status after Run() regardless of this flag. + ReadOnly bool `json:"read_only,omitempty"` + // LockReservations prevents reservation unlocking, e.g. in the capacity filter. + // Set when finding hosts for new reservations (failover, CR) to see true available capacity. + LockReservations bool `json:"lock_reservations,omitempty"` + // AssumeEmptyHosts treats all hosts as having no running VMs. + AssumeEmptyHosts bool `json:"assume_empty_hosts,omitempty"` + // IgnoredReservationTypes lists reservation types the capacity filter skips entirely. + IgnoredReservationTypes []v1alpha1.ReservationType `json:"ignored_reservation_types,omitempty"` + // MaxCandidates limits the number of hosts returned after weighing. 0 means no limit. + MaxCandidates int `json:"max_candidates,omitempty"` + + // SkipHistory skips recording the placement decision in placement history. + // Defaults to false so Nova requests record history without needing to set this explicitly. + SkipHistory bool `json:"skip_history,omitempty"` + // CreateInflight creates pessimistic blocking reservations for all returned candidates. + CreateInflight bool `json:"create_inflight,omitempty"` +} + +// Validate checks for mutually exclusive or inconsistent option combinations. +func (o Options) Validate() error { + if o.ReadOnly && !o.SkipHistory { + return errors.New("read-only runs must not write scheduling history: set SkipHistory=true") + } + if o.ReadOnly && o.CreateInflight { + return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not create inflight reservations") + } + return nil +} diff --git a/api/scheduling/options_test.go b/api/scheduling/options_test.go new file mode 100644 index 000000000..870609281 --- /dev/null +++ b/api/scheduling/options_test.go @@ -0,0 +1,34 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package scheduling + +import "testing" + +func TestOptions_Validate(t *testing.T) { + tests := []struct { + name string + opts Options + wantErr bool + }{ + {"zero value is valid", Options{}, false}, + {"write run, history recorded by default", Options{}, false}, + {"write run with inflight", Options{CreateInflight: true}, false}, + {"read-only run, skipping history", Options{ReadOnly: true, SkipHistory: true}, false}, + {"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true}, + {"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, CreateInflight: true}, true}, + {"ReadOnly + both invalid", Options{ReadOnly: true, CreateInflight: true}, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.opts.Validate() + if tt.wantErr && err == nil { + t.Error("expected error, got nil") + } + if !tt.wantErr && err != nil { + t.Errorf("expected no error, got %v", err) + } + }) + } +} diff --git a/api/v1alpha1/pipeline_types.go b/api/v1alpha1/pipeline_types.go index 180db85e5..1e5cfdca3 100644 --- a/api/v1alpha1/pipeline_types.go +++ b/api/v1alpha1/pipeline_types.go @@ -78,11 +78,6 @@ type PipelineSpec struct { // +kubebuilder:validation:Optional Description string `json:"description,omitempty"` - // If this pipeline should create history objects. - // When this is false, the pipeline will still process requests. - // +kubebuilder:default=false - CreateHistory bool `json:"createHistory,omitempty"` - // If this pipeline should ignore host preselection and gather all // available placement candidates before applying filters, instead of // relying on a pre-filtered set and weights. diff --git a/helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml b/helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml index 80c5497aa..702703f6c 100644 --- a/helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml +++ b/helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml @@ -58,12 +58,6 @@ spec: spec: description: spec defines the desired state of Pipeline properties: - createHistory: - default: false - description: |- - If this pipeline should create history objects. - When this is false, the pipeline will still process requests. - type: boolean description: description: An optional description of the pipeline, helping understand its purpose. diff --git a/internal/scheduling/cinder/filter_weigher_pipeline_controller.go b/internal/scheduling/cinder/filter_weigher_pipeline_controller.go index 52ec37306..99abfc1ba 100644 --- a/internal/scheduling/cinder/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/cinder/filter_weigher_pipeline_controller.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "sync" "time" @@ -74,10 +73,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. c.processMu.Lock() defer c.processMu.Unlock() - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) - } err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -94,11 +89,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - } return err } @@ -122,6 +112,11 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision } result, err := pipeline.Run(request) + if !request.Options.SkipHistory { + if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { + log.Error(upsertErr, "failed to create/update history") + } + } if err != nil { log.Error(err, "failed to run pipeline") return err diff --git a/internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go b/internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go index e51dfec87..05d474b60 100644 --- a/internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go @@ -17,6 +17,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" api "github.com/cobaltcore-dev/cortex/api/external/cinder" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" @@ -46,6 +47,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { }, Weights: map[string]float64{"cinder-volume-1": 1.0, "cinder-volume-2": 0.5}, Pipeline: "test-pipeline", + Options: scheduling.Options{SkipHistory: true}, } cinderRaw, err := json.Marshal(cinderRequest) @@ -281,7 +283,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainCinder, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -315,7 +316,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainCinder, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -369,14 +369,13 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainCinder, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: true, expectError: true, - expectHistoryCreated: true, + expectHistoryCreated: false, expectResult: false, }, } @@ -413,6 +412,16 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) controller.Pipelines[tt.pipelineConfig.Name] = initResult.Pipeline } + if tt.decision.Spec.CinderRaw != nil { + req := cinderRequest + req.Options = scheduling.Options{SkipHistory: !tt.createHistory} + raw, marshalErr := json.Marshal(req) + if marshalErr != nil { + t.Fatalf("Failed to marshal request with options: %v", marshalErr) + } + tt.decision.Spec.CinderRaw = &runtime.RawExtension{Raw: raw} + } + err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision) if tt.expectError && err == nil { diff --git a/internal/scheduling/lib/filter_weigher_pipeline.go b/internal/scheduling/lib/filter_weigher_pipeline.go index ee769433d..d12b566ad 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline.go +++ b/internal/scheduling/lib/filter_weigher_pipeline.go @@ -19,6 +19,7 @@ import ( type FilterWeigherPipeline[RequestType FilterWeigherPipelineRequest] interface { // Run the scheduling pipeline with the given request. + // Call-time options are read from request.GetOptions(). Run(request RequestType) (v1alpha1.DecisionResult, error) } @@ -263,6 +264,10 @@ func (s *filterWeigherPipeline[RequestType]) sortHostsByWeights(weights map[stri // Evaluate the pipeline and return a list of hosts in order of preference. func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.DecisionResult, error) { + opts := request.GetOptions() + if err := opts.Validate(); err != nil { + return v1alpha1.DecisionResult{}, err + } slogArgs := request.GetTraceLogArgs() slogArgsAny := make([]any, 0, len(slogArgs)) for _, arg := range slogArgs { @@ -297,6 +302,21 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1. hosts := p.sortHostsByWeights(outWeights) traceLog.Info("scheduler: sorted hosts", "hosts", hosts) + if opts.MaxCandidates > 0 && len(hosts) > opts.MaxCandidates { + traceLog.Info("scheduler: trimming candidate list", "maxCandidates", opts.MaxCandidates, "before", len(hosts)) + hosts = hosts[:opts.MaxCandidates] + // Drop trimmed hosts from outWeights so AggregatedOutWeights stays consistent. + kept := make(map[string]struct{}, len(hosts)) + for _, h := range hosts { + kept[h] = struct{}{} + } + for host := range outWeights { + if _, ok := kept[host]; !ok { + delete(outWeights, host) + } + } + } + // Collect some metrics about the pipeline execution. go p.monitor.observePipelineResult(request, hosts) diff --git a/internal/scheduling/lib/filter_weigher_pipeline_request.go b/internal/scheduling/lib/filter_weigher_pipeline_request.go index 26688c358..f66431545 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_request.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_request.go @@ -3,7 +3,11 @@ package lib -import "log/slog" +import ( + "log/slog" + + "github.com/cobaltcore-dev/cortex/api/scheduling" +) type FilterWeigherPipelineRequest interface { // Get the hosts that went in the pipeline. @@ -21,4 +25,6 @@ type FilterWeigherPipelineRequest interface { // Get logging args to be used in the step's trace log. // Usually, this will be the request context including the request ID. GetTraceLogArgs() []slog.Attr + // Get the call-time options for this pipeline run. + GetOptions() scheduling.Options } diff --git a/internal/scheduling/lib/filter_weigher_pipeline_request_test.go b/internal/scheduling/lib/filter_weigher_pipeline_request_test.go index 87ab0d786..3b2b6d246 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_request_test.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_request_test.go @@ -3,7 +3,11 @@ package lib -import "log/slog" +import ( + "log/slog" + + "github.com/cobaltcore-dev/cortex/api/scheduling" +) type mockFilterWeigherPipelineRequest struct { WeightKeys []string @@ -11,6 +15,7 @@ type mockFilterWeigherPipelineRequest struct { Hosts []string Weights map[string]float64 Pipeline string + Options scheduling.Options } func (m mockFilterWeigherPipelineRequest) GetWeightKeys() []string { return m.WeightKeys } @@ -18,6 +23,7 @@ func (m mockFilterWeigherPipelineRequest) GetTraceLogArgs() []slog.Attr { retu func (m mockFilterWeigherPipelineRequest) GetHosts() []string { return m.Hosts } func (m mockFilterWeigherPipelineRequest) GetWeights() map[string]float64 { return m.Weights } func (m mockFilterWeigherPipelineRequest) GetPipeline() string { return m.Pipeline } +func (m mockFilterWeigherPipelineRequest) GetOptions() scheduling.Options { return m.Options } func (m mockFilterWeigherPipelineRequest) Filter(hosts map[string]float64) FilterWeigherPipelineRequest { filteredHosts := make([]string, 0, len(hosts)) diff --git a/internal/scheduling/lib/filter_weigher_pipeline_step.go b/internal/scheduling/lib/filter_weigher_pipeline_step.go index 26dc5de40..54816519c 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_step.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_step.go @@ -30,6 +30,8 @@ type FilterWeigherPipelineStep[RequestType FilterWeigherPipelineRequest] interfa // // A traceLog is provided that contains the global request id and should // be used to log the step's execution. + // + // Per-call options are available via request.GetOptions(). Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) } diff --git a/internal/scheduling/lib/filter_weigher_pipeline_test.go b/internal/scheduling/lib/filter_weigher_pipeline_test.go index 0e2775944..3a2012db7 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_test.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_test.go @@ -7,8 +7,10 @@ import ( "context" "log/slog" "math" + "slices" "testing" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -372,3 +374,54 @@ func TestFilterWeigherPipelineMonitor_SubPipeline(t *testing.T) { t.Error("original monitor should not be modified") } } + +func TestPipeline_MaxCandidates(t *testing.T) { + // Pipeline that passes all 4 hosts with descending weights. + pipeline := &filterWeigherPipeline[mockFilterWeigherPipelineRequest]{ + filters: map[string]Filter[mockFilterWeigherPipelineRequest]{}, + filtersOrder: []string{}, + weighersOrder: []string{}, + weighers: map[string]Weigher[mockFilterWeigherPipelineRequest]{}, + } + request := mockFilterWeigherPipelineRequest{ + Hosts: []string{"host1", "host2", "host3", "host4"}, + Weights: map[string]float64{"host1": 4.0, "host2": 3.0, "host3": 2.0, "host4": 1.0}, + } + + tests := []struct { + name string + maxCandidates int + wantLen int + wantFirst string + }{ + {"no limit", 0, 4, "host1"}, + {"limit to 2", 2, 2, "host1"}, + {"limit to 1", 1, 1, "host1"}, + {"limit larger than hosts", 10, 4, "host1"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := request + req.Options = scheduling.Options{MaxCandidates: tt.maxCandidates} + result, err := pipeline.Run(req) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if len(result.OrderedHosts) != tt.wantLen { + t.Errorf("expected %d hosts, got %d: %v", tt.wantLen, len(result.OrderedHosts), result.OrderedHosts) + } + if len(result.OrderedHosts) > 0 && result.OrderedHosts[0] != tt.wantFirst { + t.Errorf("expected first host %s, got %s", tt.wantFirst, result.OrderedHosts[0]) + } + if tt.maxCandidates > 0 && len(result.OrderedHosts) <= tt.maxCandidates { + // AggregatedOutWeights must only contain returned hosts. + for host := range result.AggregatedOutWeights { + if !slices.Contains(result.OrderedHosts, host) { + t.Errorf("AggregatedOutWeights contains trimmed host %s", host) + } + } + } + }) + } +} diff --git a/internal/scheduling/machines/filter_weigher_pipeline_controller.go b/internal/scheduling/machines/filter_weigher_pipeline_controller.go index 35d51708a..29627814f 100644 --- a/internal/scheduling/machines/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/machines/filter_weigher_pipeline_controller.go @@ -6,7 +6,6 @@ package machines import ( "context" "errors" - "fmt" "sync" "time" @@ -95,10 +94,6 @@ func (c *FilterWeigherPipelineController) ProcessNewMachine(ctx context.Context, }, } - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) - } err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -115,11 +110,6 @@ func (c *FilterWeigherPipelineController) ProcessNewMachine(ctx context.Context, Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - } return err } @@ -145,6 +135,11 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision // Execute the scheduling pipeline. request := ironcore.MachinePipelineRequest{Pools: pools.Items} result, err := pipeline.Run(request) + if !request.Options.SkipHistory { + if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { + log.Error(upsertErr, "failed to create/update history") + } + } if err != nil { log.V(1).Error(err, "failed to run scheduler pipeline") return errors.New("failed to run scheduler pipeline") diff --git a/internal/scheduling/machines/filter_weigher_pipeline_controller_test.go b/internal/scheduling/machines/filter_weigher_pipeline_controller_test.go index bc2e0722a..9431f79ed 100644 --- a/internal/scheduling/machines/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/machines/filter_weigher_pipeline_controller_test.go @@ -125,6 +125,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { Pipelines: map[string]lib.FilterWeigherPipeline[ironcore.MachinePipelineRequest]{ "machines-scheduler": createMockPipeline(), }, + HistoryManager: lib.HistoryClient{Client: client}, }, Monitor: lib.FilterWeigherPipelineMonitor{}, } @@ -322,7 +323,6 @@ func TestFilterWeigherPipelineController_ProcessNewMachine(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainMachines, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -356,14 +356,13 @@ func TestFilterWeigherPipelineController_ProcessNewMachine(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainMachines, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: false, expectError: false, - expectHistoryCreated: false, + expectHistoryCreated: true, expectMachinePoolAssigned: true, expectTargetHost: "pool1", }, @@ -403,14 +402,13 @@ func TestFilterWeigherPipelineController_ProcessNewMachine(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainMachines, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: true, expectError: true, - expectHistoryCreated: true, // Decision is created but processing fails + expectHistoryCreated: false, expectMachinePoolAssigned: false, }, } diff --git a/internal/scheduling/manila/filter_weigher_pipeline_controller.go b/internal/scheduling/manila/filter_weigher_pipeline_controller.go index 128b7d719..6e00593a9 100644 --- a/internal/scheduling/manila/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/manila/filter_weigher_pipeline_controller.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "sync" "time" @@ -74,10 +73,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. c.processMu.Lock() defer c.processMu.Unlock() - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) - } err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -94,11 +89,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - } return err } @@ -122,6 +112,11 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision } result, err := pipeline.Run(request) + if !request.Options.SkipHistory { + if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { + log.Error(upsertErr, "failed to create/update history") + } + } if err != nil { log.Error(err, "failed to run pipeline") return err diff --git a/internal/scheduling/manila/filter_weigher_pipeline_controller_test.go b/internal/scheduling/manila/filter_weigher_pipeline_controller_test.go index 3ca4fd7f1..11292d3f5 100644 --- a/internal/scheduling/manila/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/manila/filter_weigher_pipeline_controller_test.go @@ -17,6 +17,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" api "github.com/cobaltcore-dev/cortex/api/external/manila" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/sapcc/go-bits/must" @@ -48,6 +49,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { }, Weights: map[string]float64{"manila-share-1@backend1": 1.0, "manila-share-2@backend2": 0.5}, Pipeline: "test-pipeline", + Options: scheduling.Options{SkipHistory: true}, } manilaRaw, err := json.Marshal(manilaRequest) @@ -278,7 +280,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainManila, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -312,7 +313,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainManila, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -366,14 +366,13 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainManila, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: true, expectError: true, - expectHistoryCreated: true, + expectHistoryCreated: false, expectResult: false, }, } @@ -410,11 +409,17 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) controller.Pipelines[tt.pipelineConfig.Name] = initResult.Pipeline } - err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision) - - if tt.expectError && err == nil { - t.Error("Expected error but got none") + if tt.decision.Spec.ManilaRaw != nil { + req := manilaRequest + req.Options = scheduling.Options{SkipHistory: !tt.createHistory} + raw, marshalErr := json.Marshal(req) + if marshalErr != nil { + t.Fatalf("Failed to marshal request with options: %v", marshalErr) + } + tt.decision.Spec.ManilaRaw = &runtime.RawExtension{Raw: raw} } + + err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision) if !tt.expectError && err != nil { t.Errorf("Expected no error but got: %v", err) } diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller.go b/internal/scheduling/nova/filter_weigher_pipeline_controller.go index 279ac1c3e..7be2b5dee 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "sync" "time" @@ -38,8 +37,9 @@ type FilterWeigherPipelineController struct { // Toolbox shared between all pipeline controllers. lib.BasePipelineController[lib.FilterWeigherPipeline[api.ExternalSchedulerRequest]] - // Mutex to only allow one process at a time - processMu sync.Mutex + // Mutex to only allow one process at a time. + // Read-only runs (opts.ReadOnly == true) acquire a read lock; write runs acquire the full lock. + processMu sync.RWMutex // Monitor to pass down to all pipelines. Monitor lib.FilterWeigherPipelineMonitor @@ -54,13 +54,23 @@ func (c *FilterWeigherPipelineController) PipelineType() v1alpha1.PipelineType { // Callback executed when kubernetes asks to reconcile a decision resource. func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - c.processMu.Lock() - defer c.processMu.Unlock() - + // Peek at the decision before acquiring the lock so we can choose the right lock type. + // Read-only runs can proceed concurrently; write runs need the exclusive lock. decision := &v1alpha1.Decision{} if err := c.Get(ctx, req.NamespacedName, decision); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } + if c.peekReadOnly(decision) { + c.processMu.RLock() + defer c.processMu.RUnlock() + } else { + c.processMu.Lock() + defer c.processMu.Unlock() + // Re-fetch after acquiring the exclusive lock to see consistent state. + if err := c.Get(ctx, req.NamespacedName, decision); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + } old := decision.DeepCopy() if err := c.process(ctx, decision); err != nil { return ctrl.Result{}, err @@ -74,13 +84,16 @@ func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctr // Process the decision from the API. Should create and return the updated decision. func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.Context, decision *v1alpha1.Decision) error { - c.processMu.Lock() - defer c.processMu.Unlock() - - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) + // Read-only runs share the cached decision state; no re-fetch needed because they + // don't observe writes from concurrent exclusive-lock runs. + if c.peekReadOnly(decision) { + c.processMu.RLock() + defer c.processMu.RUnlock() + } else { + c.processMu.Lock() + defer c.processMu.Unlock() } + err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -97,9 +110,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - c.upsertHistory(ctx, decision, err) - } return err } @@ -167,6 +177,9 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision } result, err := pipeline.Run(request) + if !request.Options.SkipHistory { + c.upsertHistory(ctx, decision, err) + } if err != nil { log.Error(err, "failed to run pipeline") return err @@ -182,7 +195,19 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision return nil } -// The base controller will delegate the pipeline creation down to this method. +// peekReadOnly determines whether a decision should use a read lock instead of +// the exclusive write lock. Defaults to false (exclusive) on any parse error. +func (c *FilterWeigherPipelineController) peekReadOnly(decision *v1alpha1.Decision) bool { + if decision.Spec.NovaRaw == nil { + return false + } + var request api.ExternalSchedulerRequest + if err := json.Unmarshal(decision.Spec.NovaRaw.Raw, &request); err != nil { + return false + } + return request.Options.ReadOnly +} + func (c *FilterWeigherPipelineController) InitPipeline( ctx context.Context, p v1alpha1.Pipeline, diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go b/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go index 752725df8..52064d0c3 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go @@ -20,6 +20,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" @@ -77,6 +78,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { }, Weights: map[string]float64{"compute-1": 1.0, "compute-2": 0.5}, Pipeline: "test-pipeline", + Options: scheduling.Options{SkipHistory: true}, } novaRaw, err := json.Marshal(novaRequest) @@ -431,7 +433,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -443,7 +444,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -480,7 +480,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -492,7 +491,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -528,7 +526,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) expectResult: false, expectHistoryCreated: false, expectUpdatedStatus: false, - errorContains: "pipeline nonexistent-pipeline not configured", + errorContains: "pipeline not found or not ready", }, { name: "decision without novaRaw spec", @@ -552,7 +550,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -564,7 +561,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -573,7 +569,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) createHistory: true, expectError: true, expectResult: false, - expectHistoryCreated: true, + expectHistoryCreated: false, expectUpdatedStatus: false, errorContains: "no novaRaw spec defined", }, @@ -602,7 +598,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -611,7 +606,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) createHistory: true, expectError: true, expectResult: false, - expectHistoryCreated: true, + expectHistoryCreated: false, expectUpdatedStatus: false, errorContains: "pipeline not found or not ready", }, @@ -640,7 +635,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -649,7 +643,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) createHistory: true, expectError: true, expectResult: false, - expectHistoryCreated: true, + expectHistoryCreated: false, expectUpdatedStatus: false, errorContains: "pipeline not found or not ready", }, @@ -697,6 +691,16 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) controller.Pipelines[tt.pipeline.Name] = initResult.Pipeline } + if tt.decision.Spec.NovaRaw != nil { + req := novaRequest + req.Options = scheduling.Options{SkipHistory: !tt.createHistory} + raw, marshalErr := json.Marshal(req) + if marshalErr != nil { + t.Fatalf("Failed to marshal request with options: %v", marshalErr) + } + tt.decision.Spec.NovaRaw = &runtime.RawExtension{Raw: raw} + } + // Call the method under test err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision) @@ -779,6 +783,7 @@ func TestFilterWeigherPipelineController_IgnorePreselection(t *testing.T) { }, Weights: map[string]float64{"original-host-1": 1.0, "original-host-2": 0.5}, Pipeline: "test-pipeline", + Options: scheduling.Options{SkipHistory: true}, } novaRaw, err := json.Marshal(novaRequest) @@ -864,7 +869,6 @@ func TestFilterWeigherPipelineController_IgnorePreselection(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainNova, - CreateHistory: false, IgnorePreselection: tt.ignorePreselection, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, @@ -928,3 +932,74 @@ func TestFilterWeigherPipelineController_IgnorePreselection(t *testing.T) { // Error variable for testing var errGathererFailed = errors.New("gatherer failed") + +func TestFilterWeigherPipelineController_PeekReadOnly(t *testing.T) { + makeRaw := func(readOnly bool) []byte { + r := api.ExternalSchedulerRequest{ + Spec: api.NovaObject[api.NovaSpec]{Data: api.NovaSpec{NumInstances: 1}}, + Options: scheduling.Options{ReadOnly: readOnly}, + } + raw, err := json.Marshal(r) + if err != nil { + panic(err) + } + return raw + } + + c := &FilterWeigherPipelineController{} + + tests := []struct { + name string + decision *v1alpha1.Decision + want bool + }{ + { + name: "nil NovaRaw defaults to exclusive lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + }, + }, + want: false, + }, + { + name: "invalid JSON defaults to exclusive lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + NovaRaw: &runtime.RawExtension{Raw: []byte("not-json")}, + }, + }, + want: false, + }, + { + name: "ReadOnly=false uses exclusive lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + NovaRaw: &runtime.RawExtension{Raw: makeRaw(false)}, + }, + }, + want: false, + }, + { + name: "ReadOnly=true uses read lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + NovaRaw: &runtime.RawExtension{Raw: makeRaw(true)}, + }, + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := c.peekReadOnly(tt.decision) + if got != tt.want { + t.Errorf("expected peekReadOnly = %v, got %v", tt.want, got) + } + }) + } +} diff --git a/internal/scheduling/nova/integration_test.go b/internal/scheduling/nova/integration_test.go index f4bb38a79..1b84601a7 100644 --- a/internal/scheduling/nova/integration_test.go +++ b/internal/scheduling/nova/integration_test.go @@ -252,6 +252,7 @@ func NewIntegrationTestServer(t *testing.T, pipelineConfig PipelineConfig, objec Client: k8sClient, Pipelines: make(map[string]lib.FilterWeigherPipeline[novaapi.ExternalSchedulerRequest]), PipelineConfigs: make(map[string]v1alpha1.Pipeline), + HistoryManager: lib.HistoryClient{Client: k8sClient}, }, Monitor: getSharedMonitor(), } diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go index b97d3e0e5..c0bbeae83 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go @@ -62,6 +62,7 @@ type FilterHasEnoughCapacity struct { // // Please also note that disk space is currently not considered by this filter. func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.ExternalSchedulerRequest) (*lib.FilterWeigherPipelineStepResult, error) { + opts := request.GetOptions() result := s.IncludeAllHostsFromRequest(request) // This map holds the free resources per host. @@ -112,7 +113,8 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa } // Check if this reservation type should be ignored - if slices.Contains(s.Options.IgnoredReservationTypes, reservation.Spec.Type) { + if slices.Contains(s.Options.IgnoredReservationTypes, reservation.Spec.Type) || + slices.Contains(opts.IgnoredReservationTypes, reservation.Spec.Type) { traceLog.Debug("ignoring reservation type", "type", reservation.Spec.Type, "reservation", reservation.Name) continue } @@ -128,18 +130,14 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa // Check if this is a CR reservation scheduling request. // If so, we should NOT unlock any CR reservations to prevent overbooking. // CR capacity should only be unlocked for actual VM scheduling. - intent, err := request.GetIntent() switch { - case err == nil && intent == api.ReserveForCommittedResourceIntent: - traceLog.Debug("keeping CR reservation locked for CR reservation scheduling", + case opts.LockReservations || s.Options.LockReserved: + traceLog.Debug("keeping CR reservation locked", "reservation", reservation.Name, - "intent", intent) + "lockReservations", opts.LockReservations, + "lockReserved", s.Options.LockReserved) // Don't continue - fall through to block the resources - case !s.Options.LockReserved && - // For committed resource reservations: unlock resources only if: - // 1. Project ID matches - // 2. ResourceGroup matches the flavor's hw_version - reservation.Spec.CommittedResourceReservation.ProjectID == request.Spec.Data.ProjectID && + case reservation.Spec.CommittedResourceReservation.ProjectID == request.Spec.Data.ProjectID && reservation.Spec.CommittedResourceReservation.ResourceGroup == request.Spec.Data.Flavor.Data.ExtraSpecs["hw_version"]: traceLog.Info("unlocking resources reserved by matching committed resource reservation with allocation", "reservation", reservation.Name, diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go index 5b026408f..1cdc800f0 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go @@ -4,6 +4,7 @@ package filters import ( + "github.com/cobaltcore-dev/cortex/api/scheduling" "log/slog" "testing" @@ -807,6 +808,44 @@ func TestFilterHasEnoughCapacity_IgnoredReservationTypes(t *testing.T) { } } +func TestFilterHasEnoughCapacity_IgnoredReservationTypes_CallTime(t *testing.T) { + scheme := buildTestScheme(t) + + // Same two-host setup as the YAML-path test: CR on host1, Failover on host2. + // Each blocks 4 CPU, leaving 4 free; request needs 8 CPU so both hosts fail without ignoring. + hypervisors := []*hv1.Hypervisor{ + newHypervisor("host1", "16", "8", "32Gi", "16Gi"), + newHypervisor("host2", "16", "8", "32Gi", "16Gi"), + } + reservations := []*v1alpha1.Reservation{ + newCommittedReservation("cr-res", "host1", "project-X", "m1.large", "gp-1", "4", "8Gi", nil, nil), + newFailoverReservation("failover-res", "host2", "4", "8Gi", map[string]string{"other-vm": "host3"}), + } + request := newNovaRequest("instance-123", "project-A", "m1.large", "gp-1", 8, "16Gi", false, []string{"host1", "host2"}) + + objects := make([]client.Object, 0, len(hypervisors)+len(reservations)) + for _, h := range hypervisors { + objects = append(objects, h.DeepCopy()) + } + for _, r := range reservations { + objects = append(objects, r.DeepCopy()) + } + + step := &FilterHasEnoughCapacity{} + step.Client = fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).Build() + step.Options = FilterHasEnoughCapacityOpts{LockReserved: true} // no YAML-level ignores + + // Call-time: ignore CR reservations → host1 passes, host2 still blocked by failover. + request.Options = scheduling.Options{ + IgnoredReservationTypes: []v1alpha1.ReservationType{v1alpha1.ReservationTypeCommittedResource}, + } + result, err := step.Run(slog.Default(), request) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + assertActivations(t, result.Activations, []string{"host1"}, []string{"host2"}) +} + func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) { scheme := buildTestScheme(t) @@ -819,6 +858,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) reservations []*v1alpha1.Reservation request api.ExternalSchedulerRequest opts FilterHasEnoughCapacityOpts + pipelineOpts scheduling.Options expectedHosts []string filteredHosts []string }{ @@ -834,8 +874,9 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) }, // Request with reserve_for_committed_resource intent (scheduling a new CR reservation) request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 4, "8Gi", "reserve_for_committed_resource", false, []string{"host1", "host2"}), - opts: FilterHasEnoughCapacityOpts{LockReserved: false}, // Note: LockReserved is false, but intent overrides - expectedHosts: []string{"host2"}, // host1 blocked because existing-cr stays locked + opts: FilterHasEnoughCapacityOpts{LockReserved: false}, + pipelineOpts: scheduling.Options{LockReservations: true}, + expectedHosts: []string{"host2"}, // host1 blocked because existing-cr stays locked filteredHosts: []string{"host1"}, }, { @@ -867,6 +908,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) // Request with reserve_for_committed_resource intent request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 4, "8Gi", "reserve_for_committed_resource", false, []string{"host1", "host2"}), opts: FilterHasEnoughCapacityOpts{LockReserved: false}, + pipelineOpts: scheduling.Options{LockReservations: true}, expectedHosts: []string{"host2"}, filteredHosts: []string{"host1"}, // host1 blocked by other project's reservation (would be blocked anyway) }, @@ -885,6 +927,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) // After blocking all 3 reservations (24 CPU), only 8 CPU free -> should fail request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 10, "20Gi", "reserve_for_committed_resource", false, []string{"host1"}), opts: FilterHasEnoughCapacityOpts{LockReserved: false}, + pipelineOpts: scheduling.Options{LockReservations: true}, expectedHosts: []string{}, filteredHosts: []string{"host1"}, // All reservations stay locked, not enough capacity }, @@ -916,13 +959,14 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) newCommittedReservation("existing-cr", "host1", "project-A", "m1.large", "gp-1", "8", "16Gi", nil, nil), }, // Request with reserve_for_committed_resource intent - // IgnoredReservationTypes is a safety flag that overrides everything, including intent + // IgnoredReservationTypes is a safety flag that overrides everything, including LockReservations request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 4, "8Gi", "reserve_for_committed_resource", false, []string{"host1"}), opts: FilterHasEnoughCapacityOpts{ LockReserved: false, // IgnoredReservationTypes is a safety override - ignores CR even for CR scheduling IgnoredReservationTypes: []v1alpha1.ReservationType{v1alpha1.ReservationTypeCommittedResource}, }, + pipelineOpts: scheduling.Options{LockReservations: true}, expectedHosts: []string{"host1"}, // CR reservation is ignored via IgnoredReservationTypes (safety override) filteredHosts: []string{}, }, @@ -960,6 +1004,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) step := &FilterHasEnoughCapacity{} step.Client = fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).Build() step.Options = tt.opts + tt.request.Options = tt.pipelineOpts result, err := step.Run(slog.Default(), tt.request) if err != nil { diff --git a/internal/scheduling/pods/filter_weigher_pipeline_controller.go b/internal/scheduling/pods/filter_weigher_pipeline_controller.go index 0ceee6485..aa28d988f 100644 --- a/internal/scheduling/pods/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/pods/filter_weigher_pipeline_controller.go @@ -6,7 +6,6 @@ package pods import ( "context" "errors" - "fmt" "sync" "time" @@ -95,10 +94,6 @@ func (c *FilterWeigherPipelineController) ProcessNewPod(ctx context.Context, pod }, } - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) - } err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -115,11 +110,6 @@ func (c *FilterWeigherPipelineController) ProcessNewPod(ctx context.Context, pod Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { - ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history") - } - } return err } @@ -159,6 +149,11 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision // Execute the scheduling pipeline. request := pods.PodPipelineRequest{Nodes: nodes.Items, Pod: *pod} result, err := pipeline.Run(request) + if !request.Options.SkipHistory { + if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil { + log.Error(upsertErr, "failed to create/update history") + } + } if err != nil { log.V(1).Error(err, "failed to run scheduler pipeline") return errors.New("failed to run scheduler pipeline") diff --git a/internal/scheduling/pods/filter_weigher_pipeline_controller_test.go b/internal/scheduling/pods/filter_weigher_pipeline_controller_test.go index 143ed9f83..7a8159be6 100644 --- a/internal/scheduling/pods/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/pods/filter_weigher_pipeline_controller_test.go @@ -122,6 +122,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { Pipelines: map[string]lib.FilterWeigherPipeline[pods.PodPipelineRequest]{ "pods-scheduler": createMockPodPipeline(), }, + HistoryManager: lib.HistoryClient{Client: client}, }, Monitor: lib.FilterWeigherPipelineMonitor{}, } @@ -300,7 +301,6 @@ func TestFilterWeigherPipelineController_ProcessNewPod(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainPods, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, @@ -334,14 +334,13 @@ func TestFilterWeigherPipelineController_ProcessNewPod(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainPods, - CreateHistory: false, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: false, expectError: false, - expectHistoryCreated: false, + expectHistoryCreated: true, expectNodeAssigned: true, expectTargetHost: "node1", }, @@ -381,14 +380,13 @@ func TestFilterWeigherPipelineController_ProcessNewPod(t *testing.T) { Spec: v1alpha1.PipelineSpec{ Type: v1alpha1.PipelineTypeFilterWeigher, SchedulingDomain: v1alpha1.SchedulingDomainPods, - CreateHistory: true, Filters: []v1alpha1.FilterSpec{}, Weighers: []v1alpha1.WeigherSpec{}, }, }, createHistory: true, expectError: true, - expectHistoryCreated: true, // Decision is created but processing fails + expectHistoryCreated: false, expectNodeAssigned: false, }, } diff --git a/internal/scheduling/reservations/capacity/controller.go b/internal/scheduling/reservations/capacity/controller.go index 37b6da4ef..100a0ffa9 100644 --- a/internal/scheduling/reservations/capacity/controller.go +++ b/internal/scheduling/reservations/capacity/controller.go @@ -22,6 +22,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" schedulerapi "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" @@ -264,7 +265,7 @@ func (c *Controller) probeScheduler( AvailabilityZone: az, Pipeline: pipeline, EligibleHosts: eligibleHosts, - }) + }, scheduling.Options{}) if err != nil { return 0, 0, fmt.Errorf("scheduler call failed (pipeline=%s): %w", pipeline, err) } diff --git a/internal/scheduling/reservations/commitments/reservation_controller.go b/internal/scheduling/reservations/commitments/reservation_controller.go index b65842c60..4cc545a81 100644 --- a/internal/scheduling/reservations/commitments/reservation_controller.go +++ b/internal/scheduling/reservations/commitments/reservation_controller.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" schedulerdelegationapi "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" "github.com/cobaltcore-dev/cortex/pkg/multicluster" @@ -286,8 +287,17 @@ func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctr "_nova_check_type": string(schedulerdelegationapi.ReserveForCommittedResourceIntent), }, } + scheduleOpts := scheduling.Options{ + ReadOnly: false, // mutates state (reservation placement) + LockReservations: true, // don't unlock CR reservations; finding a slot, not placing a VM + AssumeEmptyHosts: false, + IgnoredReservationTypes: nil, + MaxCandidates: 1, + SkipHistory: true, + CreateInflight: false, // not a VM placement; no pessimistic blocking needed + } - scheduleResp, err := r.SchedulerClient.ScheduleReservation(ctx, scheduleReq) + scheduleResp, err := r.SchedulerClient.ScheduleReservation(ctx, scheduleReq, scheduleOpts) if err != nil { logger.Error(err, "failed to schedule reservation") return ctrl.Result{}, err diff --git a/internal/scheduling/reservations/failover/integration_test.go b/internal/scheduling/reservations/failover/integration_test.go index df1354be6..316633e3a 100644 --- a/internal/scheduling/reservations/failover/integration_test.go +++ b/internal/scheduling/reservations/failover/integration_test.go @@ -1119,10 +1119,10 @@ func newIntegrationTestEnv(t *testing.T, vms []VM, hypervisors []*hv1.Hypervisor Client: k8sClient, Pipelines: make(map[string]lib.FilterWeigherPipeline[novaapi.ExternalSchedulerRequest]), PipelineConfigs: make(map[string]v1alpha1.Pipeline), + HistoryManager: lib.HistoryClient{Client: k8sClient}, }, Monitor: getSharedMonitor(), } - // Register all pipelines needed for testing pipelines := []v1alpha1.Pipeline{ { @@ -1309,11 +1309,10 @@ func newIntegrationTestEnvWithTraitsFilter(t *testing.T, vms []VM, hypervisors [ Client: k8sClient, Pipelines: make(map[string]lib.FilterWeigherPipeline[novaapi.ExternalSchedulerRequest]), PipelineConfigs: make(map[string]v1alpha1.Pipeline), + HistoryManager: lib.HistoryClient{Client: k8sClient}, }, Monitor: getSharedMonitor(), } - - // Register all pipelines needed for testing (with traits filter enabled) pipelines := []v1alpha1.Pipeline{ { ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/scheduling/reservations/failover/reservation_scheduling.go b/internal/scheduling/reservations/failover/reservation_scheduling.go index f482f3393..ed45a5cf3 100644 --- a/internal/scheduling/reservations/failover/reservation_scheduling.go +++ b/internal/scheduling/reservations/failover/reservation_scheduling.go @@ -10,6 +10,7 @@ import ( "sort" api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" ) @@ -91,7 +92,7 @@ func (c *FailoverReservationController) queryHypervisorsFromScheduler(ctx contex "eligibleHypervisors", len(eligibleHypervisors), "ignoreHypervisors", ignoreHypervisors) - scheduleResp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq) + scheduleResp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq, scheduling.Options{LockReservations: true}) if err != nil { logger.Error(err, "failed to schedule failover reservation", "vmUUID", vm.UUID, "pipeline", pipeline) return nil, fmt.Errorf("failed to schedule failover reservation: %w", err) @@ -222,7 +223,7 @@ func (c *FailoverReservationController) validateVMViaSchedulerEvacuation( "vmCurrentHost", vm.CurrentHypervisor, "pipeline", PipelineAcknowledgeFailoverReservation) - resp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq) + resp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq, scheduling.Options{ReadOnly: true, SkipHistory: true}) if err != nil { logger.Error(err, "failed to validate VM for reservation host", "vmUUID", vm.UUID, "reservationHost", reservationHost) return false, fmt.Errorf("failed to validate VM for reservation host: %w", err) diff --git a/internal/scheduling/reservations/scheduler_client.go b/internal/scheduling/reservations/scheduler_client.go index a42172ce2..c23e66354 100644 --- a/internal/scheduling/reservations/scheduler_client.go +++ b/internal/scheduling/reservations/scheduler_client.go @@ -12,6 +12,7 @@ import ( "time" api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/scheduling" "github.com/go-logr/logr" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -89,7 +90,7 @@ type ScheduleReservationResponse struct { // ScheduleReservation calls the external scheduler API to find a host for a reservation. // The context should contain GlobalRequestID and RequestID for logging (use WithGlobalRequestID/WithRequestID). -func (c *SchedulerClient) ScheduleReservation(ctx context.Context, req ScheduleReservationRequest) (*ScheduleReservationResponse, error) { +func (c *SchedulerClient) ScheduleReservation(ctx context.Context, req ScheduleReservationRequest, opts scheduling.Options) (*ScheduleReservationResponse, error) { logger := loggerFromContext(ctx) // Build weights map (all zero for reservations) @@ -115,6 +116,7 @@ func (c *SchedulerClient) ScheduleReservation(ctx context.Context, req ScheduleR Pipeline: req.Pipeline, Hosts: req.EligibleHosts, Weights: weights, + Options: opts, Context: api.NovaRequestContext{ RequestID: RequestIDFromContext(ctx), GlobalRequestID: globalReqID,