-
Notifications
You must be signed in to change notification settings - Fork 6
feat: adding pipeline options #799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
36b7d94
ad66112
4a7bc9e
d298e75
54639d1
c2d0fc2
4de31f9
c2f0b56
7a014b2
cc9b6e6
4886d37
3b8cf87
b6a6139
037f74c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| // Copyright SAP SE | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package scheduling | ||
|
|
||
| import ( | ||
| "errors" | ||
|
|
||
| "github.com/cobaltcore-dev/cortex/api/v1alpha1" | ||
| ) | ||
|
|
||
| // Options configure the behavior of a single pipeline run at call time. | ||
| // These are distinct from per-step YAML options (FilterWeigherPipelineStepOpts), | ||
| // which are static and set when the pipeline is initialized. | ||
| type Options struct { | ||
| // ReadOnly means the pipeline run does not modify shared scheduling state (reservations, | ||
| // history, inflight records). Concurrent read-only runs are safe under a shared read lock. | ||
| // Note: the controller may still write the Decision status after Run() regardless of this flag. | ||
| ReadOnly bool `json:"read_only,omitempty"` | ||
| // LockReservations prevents reservation unlocking, e.g. in the capacity filter. | ||
| // Set when finding hosts for new reservations (failover, CR) to see true available capacity. | ||
| LockReservations bool `json:"lock_reservations,omitempty"` | ||
| // AssumeEmptyHosts treats all hosts as having no running VMs. | ||
| AssumeEmptyHosts bool `json:"assume_empty_hosts,omitempty"` | ||
| // IgnoredReservationTypes lists reservation types the capacity filter skips entirely. | ||
| IgnoredReservationTypes []v1alpha1.ReservationType `json:"ignored_reservation_types,omitempty"` | ||
| // MaxCandidates limits the number of hosts returned after weighing. 0 means no limit. | ||
| MaxCandidates int `json:"max_candidates,omitempty"` | ||
|
|
||
| // SkipHistory skips recording the placement decision in placement history. | ||
| // Defaults to false so Nova requests record history without needing to set this explicitly. | ||
| SkipHistory bool `json:"skip_history,omitempty"` | ||
| // CreateInflight creates pessimistic blocking reservations for all returned candidates. | ||
| CreateInflight bool `json:"create_inflight,omitempty"` | ||
| } | ||
|
|
||
| // Validate checks for mutually exclusive or inconsistent option combinations. | ||
| func (o Options) Validate() error { | ||
| if o.ReadOnly && !o.SkipHistory { | ||
| return errors.New("read-only runs must not write scheduling history: set SkipHistory=true") | ||
| } | ||
| if o.ReadOnly && o.CreateInflight { | ||
| return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not create inflight reservations") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error message starts with a capitalized field name, violating the lowercase error message guideline.
✏️ Proposed fix- return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not create inflight reservations")
+ return errors.New("readOnly and createInflight are mutually exclusive: read-only runs must not create inflight reservations")🤖 Prompt for AI Agents |
||
| } | ||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,34 @@ | ||||||||||||||||||||||||||||||
| // Copyright SAP SE | ||||||||||||||||||||||||||||||
| // SPDX-License-Identifier: Apache-2.0 | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| package scheduling | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import "testing" | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| func TestOptions_Validate(t *testing.T) { | ||||||||||||||||||||||||||||||
| tests := []struct { | ||||||||||||||||||||||||||||||
| name string | ||||||||||||||||||||||||||||||
| opts Options | ||||||||||||||||||||||||||||||
| wantErr bool | ||||||||||||||||||||||||||||||
| }{ | ||||||||||||||||||||||||||||||
| {"zero value is valid", Options{}, false}, | ||||||||||||||||||||||||||||||
| {"write run, history recorded by default", Options{}, false}, | ||||||||||||||||||||||||||||||
| {"write run with inflight", Options{CreateInflight: true}, false}, | ||||||||||||||||||||||||||||||
| {"read-only run, skipping history", Options{ReadOnly: true, SkipHistory: true}, false}, | ||||||||||||||||||||||||||||||
| {"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true}, | ||||||||||||||||||||||||||||||
| {"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, CreateInflight: true}, true}, | ||||||||||||||||||||||||||||||
| {"ReadOnly + both invalid", Options{ReadOnly: true, CreateInflight: true}, true}, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+14
to
+21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplicate test cases and the Two separate problems:
✏️ Suggested fix- {"zero value is valid", Options{}, false},
- {"write run, history recorded by default", Options{}, false},
- {"write run with inflight", Options{CreateInflight: true}, false},
- {"read-only run, skipping history", Options{ReadOnly: true, SkipHistory: true}, false},
- {"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true},
- {"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, CreateInflight: true}, true},
- {"ReadOnly + both invalid", Options{ReadOnly: true, CreateInflight: true}, true},
+ {"zero value is valid", Options{}, false},
+ {"write run with inflight", Options{CreateInflight: true}, false},
+ {"read-only run, skipping history", Options{ReadOnly: true, SkipHistory: true}, false},
+ {"ReadOnly without SkipHistory is invalid", Options{ReadOnly: true}, true},
+ // Tests the second rule: CreateInflight is incompatible with ReadOnly even when SkipHistory is set.
+ {"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, SkipHistory: true, CreateInflight: true}, true},📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| for _, tt := range tests { | ||||||||||||||||||||||||||||||
| t.Run(tt.name, func(t *testing.T) { | ||||||||||||||||||||||||||||||
| err := tt.opts.Validate() | ||||||||||||||||||||||||||||||
| if tt.wantErr && err == nil { | ||||||||||||||||||||||||||||||
| t.Error("expected error, got nil") | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| if !tt.wantErr && err != nil { | ||||||||||||||||||||||||||||||
| t.Errorf("expected no error, got %v", err) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ import ( | |
| "sigs.k8s.io/controller-runtime/pkg/client/fake" | ||
|
|
||
| api "github.com/cobaltcore-dev/cortex/api/external/cinder" | ||
| "github.com/cobaltcore-dev/cortex/api/scheduling" | ||
| "github.com/cobaltcore-dev/cortex/api/v1alpha1" | ||
|
|
||
| "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" | ||
|
|
@@ -46,6 +47,7 @@ func TestFilterWeigherPipelineController_Reconcile(t *testing.T) { | |
| }, | ||
| Weights: map[string]float64{"cinder-volume-1": 1.0, "cinder-volume-2": 0.5}, | ||
| Pipeline: "test-pipeline", | ||
| Options: scheduling.Options{SkipHistory: true}, | ||
| } | ||
|
|
||
| cinderRaw, err := json.Marshal(cinderRequest) | ||
|
|
@@ -281,7 +283,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) | |
| Spec: v1alpha1.PipelineSpec{ | ||
| Type: v1alpha1.PipelineTypeFilterWeigher, | ||
| SchedulingDomain: v1alpha1.SchedulingDomainCinder, | ||
| CreateHistory: true, | ||
| Filters: []v1alpha1.FilterSpec{}, | ||
| Weighers: []v1alpha1.WeigherSpec{}, | ||
| }, | ||
|
|
@@ -315,7 +316,6 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) | |
| Spec: v1alpha1.PipelineSpec{ | ||
| Type: v1alpha1.PipelineTypeFilterWeigher, | ||
| SchedulingDomain: v1alpha1.SchedulingDomainCinder, | ||
| CreateHistory: false, | ||
| Filters: []v1alpha1.FilterSpec{}, | ||
| Weighers: []v1alpha1.WeigherSpec{}, | ||
| }, | ||
|
|
@@ -369,14 +369,13 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) | |
| Spec: v1alpha1.PipelineSpec{ | ||
| Type: v1alpha1.PipelineTypeFilterWeigher, | ||
| SchedulingDomain: v1alpha1.SchedulingDomainCinder, | ||
| CreateHistory: true, | ||
| Filters: []v1alpha1.FilterSpec{}, | ||
| Weighers: []v1alpha1.WeigherSpec{}, | ||
| }, | ||
| }, | ||
| createHistory: true, | ||
| expectError: true, | ||
| expectHistoryCreated: true, | ||
| expectHistoryCreated: false, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assert the no-history expectation on error paths.
Also applies to: 434-450 🤖 Prompt for AI Agents |
||
| expectResult: false, | ||
| }, | ||
| } | ||
|
|
@@ -413,6 +412,16 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) | |
| controller.Pipelines[tt.pipelineConfig.Name] = initResult.Pipeline | ||
| } | ||
|
|
||
| if tt.decision.Spec.CinderRaw != nil { | ||
| req := cinderRequest | ||
| req.Options = scheduling.Options{SkipHistory: !tt.createHistory} | ||
| raw, marshalErr := json.Marshal(req) | ||
| if marshalErr != nil { | ||
| t.Fatalf("Failed to marshal request with options: %v", marshalErr) | ||
| } | ||
| tt.decision.Spec.CinderRaw = &runtime.RawExtension{Raw: raw} | ||
| } | ||
|
|
||
| err := controller.ProcessNewDecisionFromAPI(context.Background(), tt.decision) | ||
|
|
||
| if tt.expectError && err == nil { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ import ( | |
|
|
||
| type FilterWeigherPipeline[RequestType FilterWeigherPipelineRequest] interface { | ||
| // Run the scheduling pipeline with the given request. | ||
| // Call-time options are read from request.GetOptions(). | ||
| Run(request RequestType) (v1alpha1.DecisionResult, error) | ||
| } | ||
|
|
||
|
|
@@ -263,6 +264,10 @@ func (s *filterWeigherPipeline[RequestType]) sortHostsByWeights(weights map[stri | |
|
|
||
| // Evaluate the pipeline and return a list of hosts in order of preference. | ||
| func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.DecisionResult, error) { | ||
| opts := request.GetOptions() | ||
| if err := opts.Validate(); err != nil { | ||
| return v1alpha1.DecisionResult{}, err | ||
| } | ||
| slogArgs := request.GetTraceLogArgs() | ||
| slogArgsAny := make([]any, 0, len(slogArgs)) | ||
| for _, arg := range slogArgs { | ||
|
|
@@ -297,6 +302,21 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1. | |
| hosts := p.sortHostsByWeights(outWeights) | ||
| traceLog.Info("scheduler: sorted hosts", "hosts", hosts) | ||
|
|
||
| if opts.MaxCandidates > 0 && len(hosts) > opts.MaxCandidates { | ||
| traceLog.Info("scheduler: trimming candidate list", "maxCandidates", opts.MaxCandidates, "before", len(hosts)) | ||
| hosts = hosts[:opts.MaxCandidates] | ||
| // Drop trimmed hosts from outWeights so AggregatedOutWeights stays consistent. | ||
| kept := make(map[string]struct{}, len(hosts)) | ||
| for _, h := range hosts { | ||
| kept[h] = struct{}{} | ||
| } | ||
| for host := range outWeights { | ||
| if _, ok := kept[host]; !ok { | ||
| delete(outWeights, host) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please provide logging here so we see what's going on. |
||
| // Collect some metrics about the pipeline execution. | ||
| go p.monitor.observePipelineResult(request, hosts) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems inconsistent. Shouldn't we draw the line here? Read-only requests create or modify NO resources and are purely to calculate host candidates for constraints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are use cases, we can discuss live