Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/external/cinder/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)

Expand All @@ -30,8 +31,11 @@ type ExternalSchedulerRequest struct {
Weights map[string]float64 `json:"weights"`
// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`
// Options configure the pipeline behavior for this scheduling call.
Options scheduling.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options }
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
4 changes: 4 additions & 0 deletions api/external/ironcore/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ import (
"log/slog"

ironcorev1alpha1 "github.com/cobaltcore-dev/cortex/api/external/ironcore/v1alpha1"
"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)

type MachinePipelineRequest struct {
// The available machine pools.
Pools []ironcorev1alpha1.MachinePool `json:"pools"`
// Options configure the pipeline behavior for this scheduling call.
Options scheduling.Options `json:"options,omitempty"`
}

func (r MachinePipelineRequest) GetOptions() scheduling.Options { return r.Options }
func (r MachinePipelineRequest) GetHosts() []string {
hosts := make([]string, len(r.Pools))
for i, host := range r.Pools {
Expand Down
4 changes: 4 additions & 0 deletions api/external/manila/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)

Expand All @@ -30,8 +31,11 @@ type ExternalSchedulerRequest struct {
Weights map[string]float64 `json:"weights"`
// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`
// Options configure the pipeline behavior for this scheduling call.
Options scheduling.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options }
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
7 changes: 7 additions & 0 deletions api/external/nova/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log/slog"
"strings"

"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)
Expand Down Expand Up @@ -37,8 +38,14 @@ type ExternalSchedulerRequest struct {

// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`

// Options configure the pipeline behavior for this scheduling call.
// Set by the caller (CR controller, failover controller, Nova).
// Nova does not set these; Cortex fills in config-derived defaults server-side.
Options scheduling.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() scheduling.Options { return r.Options }
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
4 changes: 4 additions & 0 deletions api/external/pods/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pods
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
corev1 "k8s.io/api/core/v1"
)
Expand All @@ -15,8 +16,11 @@ type PodPipelineRequest struct {
Nodes []corev1.Node `json:"nodes"`
// The pod to be scheduled.
Pod corev1.Pod `json:"pod"`
// Options configure the pipeline behavior for this scheduling call.
Options scheduling.Options `json:"options,omitempty"`
}

func (r PodPipelineRequest) GetOptions() scheduling.Options { return r.Options }
func (r PodPipelineRequest) GetHosts() []string {
hosts := make([]string, len(r.Nodes))
for i, host := range r.Nodes {
Expand Down
46 changes: 46 additions & 0 deletions api/scheduling/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package scheduling

import (
"errors"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
)

// Options configure the behavior of a single pipeline run at call time.
// These are distinct from per-step YAML options (FilterWeigherPipelineStepOpts),
// which are static and set when the pipeline is initialized.
type Options struct {
// ReadOnly means the pipeline run does not modify shared scheduling state (reservations,
// history, inflight records). Concurrent read-only runs are safe under a shared read lock.
// Note: the controller may still write the Decision status after Run() regardless of this flag.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems inconsistent. Shouldn't we draw the line here? Read-only requests create or modify NO resources and are purely to calculate host candidates for constraints.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are use cases, we can discuss live

ReadOnly bool `json:"read_only,omitempty"`
// LockReservations prevents reservation unlocking, e.g. in the capacity filter.
// Set when finding hosts for new reservations (failover, CR) to see true available capacity.
LockReservations bool `json:"lock_reservations,omitempty"`
// AssumeEmptyHosts treats all hosts as having no running VMs.
AssumeEmptyHosts bool `json:"assume_empty_hosts,omitempty"`
// IgnoredReservationTypes lists reservation types the capacity filter skips entirely.
IgnoredReservationTypes []v1alpha1.ReservationType `json:"ignored_reservation_types,omitempty"`
// MaxCandidates limits the number of hosts returned after weighing. 0 means no limit.
MaxCandidates int `json:"max_candidates,omitempty"`

// SkipHistory skips recording the placement decision in placement history.
// Defaults to false so Nova requests record history without needing to set this explicitly.
SkipHistory bool `json:"skip_history,omitempty"`
// CreateInflight creates pessimistic blocking reservations for all returned candidates.
CreateInflight bool `json:"create_inflight,omitempty"`
}

// Validate checks for mutually exclusive or inconsistent option combinations.
func (o Options) Validate() error {
if o.ReadOnly && !o.SkipHistory {
return errors.New("read-only runs must not write scheduling history: set SkipHistory=true")
}
if o.ReadOnly && o.CreateInflight {
return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not create inflight reservations")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Error message starts with a capitalized field name, violating the lowercase error message guideline.

"ReadOnly and CreateInflight are mutually exclusive..." starts with the capitalized field name ReadOnly, which will be flagged by ST1005/errcheck linters. As per coding guidelines, error messages must always be lowercase.

✏️ Proposed fix
-	return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not create inflight reservations")
+	return errors.New("readOnly and createInflight are mutually exclusive: read-only runs must not create inflight reservations")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@api/scheduling/options.go` at line 43, Change the error string to start with
a lowercase rune to satisfy ST1005: replace the current errors.New(...) message
that begins "ReadOnly and CreateInflight..." with a message that begins
lowercase (e.g., "readOnly and CreateInflight are mutually exclusive: read-only
runs must not create inflight reservations" or use hyphenated field names like
"read-only and create-inflight are mutually exclusive...") in the same return
statement so the error text in options.go uses a lowercase first letter.

}
return nil
}
34 changes: 34 additions & 0 deletions api/scheduling/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package scheduling

import "testing"

func TestOptions_Validate(t *testing.T) {
tests := []struct {
name string
opts Options
wantErr bool
}{
{"zero value is valid", Options{}, false},
{"write run, history recorded by default", Options{}, false},
{"write run with inflight", Options{CreateInflight: true}, false},
{"read-only run, skipping history", Options{ReadOnly: true, SkipHistory: true}, false},
{"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true},
{"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, CreateInflight: true}, true},
{"ReadOnly + both invalid", Options{ReadOnly: true, CreateInflight: true}, true},
}
Comment on lines +14 to +21
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Duplicate test cases and the CreateInflight validation rule is never exercised.

Two separate problems:

  1. Duplicate inputs: lines 14–15 are both Options{}, and lines 19–20 are both Options{ReadOnly: true, CreateInflight: true} — neither pair adds coverage.

  2. Unexercised rule: in both lines 19 and 20, SkipHistory defaults to false, so Validate() returns early on the first check (ReadOnly && !SkipHistory). The second rule — ReadOnly && CreateInflight — is never actually reached. To test it, a case with SkipHistory: true is needed so the first guard passes.

✏️ Suggested fix
-		{"zero value is valid", Options{}, false},
-		{"write run, history recorded by default", Options{}, false},
-		{"write run with inflight", Options{CreateInflight: true}, false},
-		{"read-only run, skipping history", Options{ReadOnly: true, SkipHistory: true}, false},
-		{"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true},
-		{"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, CreateInflight: true}, true},
-		{"ReadOnly + both invalid", Options{ReadOnly: true, CreateInflight: true}, true},
+		{"zero value is valid", Options{}, false},
+		{"write run with inflight", Options{CreateInflight: true}, false},
+		{"read-only run, skipping history", Options{ReadOnly: true, SkipHistory: true}, false},
+		{"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true},
+		// Tests the second rule: CreateInflight is incompatible with ReadOnly even when SkipHistory is set.
+		{"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, SkipHistory: true, CreateInflight: true}, true},
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
{"zero value is valid", Options{}, false},
{"write run, history recorded by default", Options{}, false},
{"write run with inflight", Options{CreateInflight: true}, false},
{"read-only run, skipping history", Options{ReadOnly: true, SkipHistory: true}, false},
{"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true},
{"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, CreateInflight: true}, true},
{"ReadOnly + both invalid", Options{ReadOnly: true, CreateInflight: true}, true},
}
{"zero value is valid", Options{}, false},
{"write run with inflight", Options{CreateInflight: true}, false},
{"read-only run, skipping history", Options{ReadOnly: true, SkipHistory: true}, false},
{"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true},
// Tests the second rule: CreateInflight is incompatible with ReadOnly even when SkipHistory is set.
{"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, SkipHistory: true, CreateInflight: true}, true},
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@api/scheduling/options_test.go` around lines 14 - 21, Remove the duplicate
test rows and add a case that actually exercises the CreateInflight validation:
update the table of test cases for Options so only one zero-value entry remains
(remove the duplicated Options{}), remove the duplicated Options{ReadOnly: true,
CreateInflight: true} row, and add a new test case using Options{ReadOnly: true,
CreateInflight: true, SkipHistory: true} so Validate() passes the first ReadOnly
&& !SkipHistory guard and exercises the ReadOnly && CreateInflight rule in
Options.Validate().


for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.opts.Validate()
if tt.wantErr && err == nil {
t.Error("expected error, got nil")
}
if !tt.wantErr && err != nil {
t.Errorf("expected no error, got %v", err)
}
})
}
}
5 changes: 0 additions & 5 deletions api/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ type PipelineSpec struct {
// +kubebuilder:validation:Optional
Description string `json:"description,omitempty"`

// If this pipeline should create history objects.
// When this is false, the pipeline will still process requests.
// +kubebuilder:default=false
CreateHistory bool `json:"createHistory,omitempty"`

// If this pipeline should ignore host preselection and gather all
// available placement candidates before applying filters, instead of
// relying on a pre-filtered set and weights.
Expand Down
6 changes: 0 additions & 6 deletions helm/library/cortex/files/crds/cortex.cloud_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ spec:
spec:
description: spec defines the desired state of Pipeline
properties:
createHistory:
default: false
description: |-
If this pipeline should create history objects.
When this is false, the pipeline will still process requests.
type: boolean
description:
description: An optional description of the pipeline, helping understand
its purpose.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -74,10 +73,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.
c.processMu.Lock()
defer c.processMu.Unlock()

pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name]
if !ok {
return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name)
}
err := c.process(ctx, decision)
if err != nil {
meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{
Expand All @@ -94,11 +89,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.
Message: "pipeline run succeeded",
})
}
if pipelineConf.Spec.CreateHistory {
if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil {
ctrl.LoggerFrom(ctx).Error(upsertErr, "failed to create/update history")
}
}
return err
}

Expand All @@ -122,6 +112,11 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision
}

result, err := pipeline.Run(request)
if !request.Options.SkipHistory {
if upsertErr := c.HistoryManager.CreateOrUpdateHistory(ctx, decision, nil, err); upsertErr != nil {
log.Error(upsertErr, "failed to create/update history")
}
}
if err != nil {
log.Error(err, "failed to run pipeline")
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

api "github.com/cobaltcore-dev/cortex/api/external/cinder"
"github.com/cobaltcore-dev/cortex/api/scheduling"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"

"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
Expand Down Expand Up @@ -46,6 +47,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) {
},
Weights: map[string]float64{"cinder-volume-1": 1.0, "cinder-volume-2": 0.5},
Pipeline: "test-pipeline",
Options: scheduling.Options{SkipHistory: true},
}

cinderRaw, err := json.Marshal(cinderRequest)
Expand Down Expand Up @@ -281,7 +283,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T)
Spec: v1alpha1.PipelineSpec{
Type: v1alpha1.PipelineTypeFilterWeigher,
SchedulingDomain: v1alpha1.SchedulingDomainCinder,
CreateHistory: true,
Filters: []v1alpha1.FilterSpec{},
Weighers: []v1alpha1.WeigherSpec{},
},
Expand Down Expand Up @@ -315,7 +316,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T)
Spec: v1alpha1.PipelineSpec{
Type: v1alpha1.PipelineTypeFilterWeigher,
SchedulingDomain: v1alpha1.SchedulingDomainCinder,
CreateHistory: false,
Filters: []v1alpha1.FilterSpec{},
Weighers: []v1alpha1.WeigherSpec{},
},
Expand Down Expand Up @@ -369,14 +369,13 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T)
Spec: v1alpha1.PipelineSpec{
Type: v1alpha1.PipelineTypeFilterWeigher,
SchedulingDomain: v1alpha1.SchedulingDomainCinder,
CreateHistory: true,
Filters: []v1alpha1.FilterSpec{},
Weighers: []v1alpha1.WeigherSpec{},
},
},
createHistory: true,
expectError: true,
expectHistoryCreated: true,
expectHistoryCreated: false,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Assert the no-history expectation on error paths.

expectHistoryCreated: false for the nil-raw case is not currently enforced, because the test only checks for zero History objects in the !tt.expectError branch. If ProcessNewDecisionFromAPI() accidentally creates history before returning the error, this case will still pass.

Also applies to: 434-450

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/scheduling/cinder/filter_weigher_pipeline_controller_test.go` at
line 378, The test currently only asserts zero History when !tt.expectError, so
add an explicit assertion of history count == 0 whenever tt.expectHistoryCreated
is false (regardless of tt.expectError) after calling
ProcessNewDecisionFromAPI(); i.e. move or duplicate the expectHistoryCreated
check out of the !tt.expectError branch so the test fails if
ProcessNewDecisionFromAPI() creates History even when it returns an error. Apply
the same change for the other case group around the lines that correspond to the
second nil-raw check (the block covering the additional assertions referenced in
the comment).

expectResult: false,
},
}
Expand Down Expand Up @@ -413,6 +412,16 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T)
controller.Pipelines[tt.pipelineConfig.Name] = initResult.Pipeline
}

if tt.decision.Spec.CinderRaw != nil {
req := cinderRequest
req.Options = scheduling.Options{SkipHistory: !tt.createHistory}
raw, marshalErr := json.Marshal(req)
if marshalErr != nil {
t.Fatalf("Failed to marshal request with options: %v", marshalErr)
}
tt.decision.Spec.CinderRaw = &runtime.RawExtension{Raw: raw}
}

err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision)

if tt.expectError && err == nil {
Expand Down
20 changes: 20 additions & 0 deletions internal/scheduling/lib/filter_weigher_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

type FilterWeigherPipeline[RequestType FilterWeigherPipelineRequest] interface {
// Run the scheduling pipeline with the given request.
// Call-time options are read from request.GetOptions().
Run(request RequestType) (v1alpha1.DecisionResult, error)
}

Expand Down Expand Up @@ -263,6 +264,10 @@ func (s *filterWeigherPipeline[RequestType]) sortHostsByWeights(weights map[stri

// Evaluate the pipeline and return a list of hosts in order of preference.
func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.DecisionResult, error) {
opts := request.GetOptions()
if err := opts.Validate(); err != nil {
return v1alpha1.DecisionResult{}, err
}
slogArgs := request.GetTraceLogArgs()
slogArgsAny := make([]any, 0, len(slogArgs))
for _, arg := range slogArgs {
Expand Down Expand Up @@ -297,6 +302,21 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.
hosts := p.sortHostsByWeights(outWeights)
traceLog.Info("scheduler: sorted hosts", "hosts", hosts)

if opts.MaxCandidates > 0 && len(hosts) > opts.MaxCandidates {
traceLog.Info("scheduler: trimming candidate list", "maxCandidates", opts.MaxCandidates, "before", len(hosts))
hosts = hosts[:opts.MaxCandidates]
// Drop trimmed hosts from outWeights so AggregatedOutWeights stays consistent.
kept := make(map[string]struct{}, len(hosts))
for _, h := range hosts {
kept[h] = struct{}{}
}
for host := range outWeights {
if _, ok := kept[host]; !ok {
delete(outWeights, host)
}
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide logging here so we see what's going on.

// Collect some metrics about the pipeline execution.
go p.monitor.observePipelineResult(request, hosts)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

package lib

import "log/slog"
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/api/scheduling"
)

type FilterWeigherPipelineRequest interface {
// Get the hosts that went in the pipeline.
Expand All @@ -21,4 +25,6 @@ type FilterWeigherPipelineRequest interface {
// Get logging args to be used in the step's trace log.
// Usually, this will be the request context including the request ID.
GetTraceLogArgs() []slog.Attr
// Get the call-time options for this pipeline run.
GetOptions() scheduling.Options
}
Loading