Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 57 additions & 21 deletions cmd/shim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/cluster"
Expand Down Expand Up @@ -112,11 +113,41 @@ func main() {
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

// Custom entrypoint for placement shim e2e tests.
// Spins up a minimal manager with a multicluster client so that e2e
// tests can access the controller-runtime cache for hypervisor lookups.
if runPlacementShimE2E {
if err := placement.RunE2E(ctx); err != nil {
mgrCtx, mgrCancel := context.WithCancel(ctx)

mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: "0"},
HealthProbeBindAddress: "",
})
if err != nil {
setupLog.Error(err, "unable to start e2e manager")
os.Exit(1)
}
multiclusterClient := setupMulticlusterClient(mgrCtx, mgr, restConfig)
if err := placement.IndexFields(mgrCtx, multiclusterClient); err != nil {
setupLog.Error(err, "unable to set up e2e field indexes")
os.Exit(1)
}
go func() {
if err := mgr.Start(mgrCtx); err != nil {
setupLog.Error(err, "e2e manager exited with error")
}
}()
if !mgr.GetCache().WaitForCacheSync(ctx) {
setupLog.Error(nil, "e2e cache sync failed")
mgrCancel()
os.Exit(1)
}
if err := placement.RunE2E(ctx, multiclusterClient); err != nil {
setupLog.Error(err, "E2E tests failed")
mgrCancel()
os.Exit(1)
}
mgrCancel()
os.Exit(0)
}

Expand Down Expand Up @@ -223,26 +254,7 @@ func main() {
os.Exit(1)
}

homeCluster, err := cluster.New(restConfig, func(o *cluster.Options) { o.Scheme = scheme })
if err != nil {
setupLog.Error(err, "unable to create home cluster")
os.Exit(1)
}
if err := mgr.Add(homeCluster); err != nil {
setupLog.Error(err, "unable to add home cluster")
os.Exit(1)
}
multiclusterClient := &multicluster.Client{
HomeCluster: homeCluster,
HomeRestConfig: restConfig,
HomeScheme: scheme,
ResourceRouters: multicluster.DefaultResourceRouters,
}
multiclusterClientConfig := conf.GetConfigOrDie[multicluster.ClientConfig]()
if err := multiclusterClient.InitFromConf(ctx, mgr, multiclusterClientConfig); err != nil {
setupLog.Error(err, "unable to initialize multicluster client")
os.Exit(1)
}
multiclusterClient := setupMulticlusterClient(ctx, mgr, restConfig)

// Our custom monitoring registry can add prometheus labels to all metrics.
// This is useful to distinguish metrics from different deployments.
Expand Down Expand Up @@ -307,3 +319,27 @@ func main() {
os.Exit(1)
}
}

func setupMulticlusterClient(ctx context.Context, mgr manager.Manager, restConfig *rest.Config) *multicluster.Client {
homeCluster, err := cluster.New(restConfig, func(o *cluster.Options) { o.Scheme = scheme })
if err != nil {
setupLog.Error(err, "unable to create home cluster")
os.Exit(1)
}
if err := mgr.Add(homeCluster); err != nil {
setupLog.Error(err, "unable to add home cluster")
os.Exit(1)
}
mcl := &multicluster.Client{
HomeCluster: homeCluster,
HomeRestConfig: restConfig,
HomeScheme: scheme,
ResourceRouters: multicluster.DefaultResourceRouters,
}
mclConfig := conf.GetConfigOrDie[multicluster.ClientConfig]()
if err := mcl.InitFromConf(ctx, mgr, mclConfig); err != nil {
setupLog.Error(err, "unable to initialize multicluster client")
os.Exit(1)
}
return mcl
}
2 changes: 2 additions & 0 deletions helm/bundles/cortex-placement-shim/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ cortex-shim:
osProjectName:
osUserDomainName:
osProjectDomainName:
features:
enableResourceProviders: false
auth:
tokenCacheTTL: "5m"
policies:
Expand Down
24 changes: 2 additions & 22 deletions internal/shim/placement/field_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ const (
// of the hypervisor in OpenStack. This also corresponds to the uuid of the
// resource provider in OpenStack Placement.
idxHypervisorOpenStackId = "status.hypervisorId"
// idxStatusAggUUIDs is the name of the index for looking up hypervisors
// by the UUIDs of the aggregates they belong to.
idxStatusAggUUIDs = "status.aggregates[*].uuid"
// idxHypervisorKubernetesId is the name of the index for looking up
// hypervisors by their uid in Kubernetes.
idxHypervisorKubernetesId = "metadata.uid"
Expand All @@ -31,9 +28,9 @@ const (
idxHypervisorName = "metadata.name"
)

// indexFields indexes all fields that are needed by the shim to quickly
// IndexFields indexes all fields that are needed by the shim to quickly
// look up objects from the controller-runtime cache.
func indexFields(ctx context.Context, mcl *multicluster.Client) error {
func IndexFields(ctx context.Context, mcl *multicluster.Client) error {
log := logf.FromContext(ctx)
log.Info("Setting up field indexes for the multicluster client")
h, hl := &hv1.Hypervisor{}, &hv1.HypervisorList{}
Expand Down Expand Up @@ -80,22 +77,5 @@ func indexFields(ctx context.Context, mcl *multicluster.Client) error {
}
log.Info("Successfully set up index for hypervisor name")

if err := mcl.IndexField(ctx, h, hl, idxStatusAggUUIDs, func(obj client.Object) []string {
hv, ok := obj.(*hv1.Hypervisor)
if !ok {
log.Error(errors.New("unexpected type"), "object", obj)
return nil
}
var uuids []string
for _, agg := range hv.Status.Aggregates {
uuids = append(uuids, agg.UUID)
}
return uuids
}); err != nil {
log.Error(err, "failed to set up index for hypervisor aggregates")
return err
}
log.Info("Successfully set up index for hypervisor aggregates")

return nil
}
76 changes: 9 additions & 67 deletions internal/shim/placement/field_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,14 @@ func TestIndexFields_RegistersAllIndexes(t *testing.T) {
cc := &captureCache{}
mcl := buildClient(t, cc)

if err := indexFields(context.Background(), mcl); err != nil {
t.Fatalf("indexFields: %v", err)
if err := IndexFields(context.Background(), mcl); err != nil {
t.Fatalf("IndexFields: %v", err)
}

wantFields := []string{
idxHypervisorOpenStackId,
idxHypervisorKubernetesId,
idxHypervisorName,
idxStatusAggUUIDs,
}
if len(cc.calls) != len(wantFields) {
t.Fatalf("got %d IndexField calls, want %d", len(cc.calls), len(wantFields))
Expand All @@ -139,7 +138,7 @@ func TestIndexFields_PropagatesError(t *testing.T) {
cc := &captureCache{err: wantErr}
mcl := buildClient(t, cc)

err := indexFields(context.Background(), mcl)
err := IndexFields(context.Background(), mcl)
if err == nil {
t.Fatal("expected error, got nil")
}
Expand All @@ -151,8 +150,8 @@ func TestIndexFields_PropagatesError(t *testing.T) {
func TestExtractor_HypervisorOpenStackId(t *testing.T) {
cc := &captureCache{}
mcl := buildClient(t, cc)
if err := indexFields(context.Background(), mcl); err != nil {
t.Fatalf("indexFields: %v", err)
if err := IndexFields(context.Background(), mcl); err != nil {
t.Fatalf("IndexFields: %v", err)
}
fn := extractorByField(t, cc.calls, idxHypervisorOpenStackId)

Expand Down Expand Up @@ -192,8 +191,8 @@ func TestExtractor_HypervisorOpenStackId(t *testing.T) {
func TestExtractor_HypervisorKubernetesId(t *testing.T) {
cc := &captureCache{}
mcl := buildClient(t, cc)
if err := indexFields(context.Background(), mcl); err != nil {
t.Fatalf("indexFields: %v", err)
if err := IndexFields(context.Background(), mcl); err != nil {
t.Fatalf("IndexFields: %v", err)
}
fn := extractorByField(t, cc.calls, idxHypervisorKubernetesId)

Expand Down Expand Up @@ -233,8 +232,8 @@ func TestExtractor_HypervisorKubernetesId(t *testing.T) {
func TestExtractor_HypervisorName(t *testing.T) {
cc := &captureCache{}
mcl := buildClient(t, cc)
if err := indexFields(context.Background(), mcl); err != nil {
t.Fatalf("indexFields: %v", err)
if err := IndexFields(context.Background(), mcl); err != nil {
t.Fatalf("IndexFields: %v", err)
}
fn := extractorByField(t, cc.calls, idxHypervisorName)

Expand Down Expand Up @@ -271,63 +270,6 @@ func TestExtractor_HypervisorName(t *testing.T) {
}
}

func TestExtractor_StatusAggUUIDs(t *testing.T) {
cc := &captureCache{}
mcl := buildClient(t, cc)
if err := indexFields(context.Background(), mcl); err != nil {
t.Fatalf("indexFields: %v", err)
}
fn := extractorByField(t, cc.calls, idxStatusAggUUIDs)

tests := []struct {
name string
obj client.Object
want []string
}{
{
name: "multiple aggregates",
obj: &hv1.Hypervisor{
Status: hv1.HypervisorStatus{
Aggregates: []hv1.Aggregate{
{Name: "agg-a", UUID: "uuid-a"},
{Name: "agg-b", UUID: "uuid-b"},
},
},
},
want: []string{"uuid-a", "uuid-b"},
},
{
name: "single aggregate",
obj: &hv1.Hypervisor{
Status: hv1.HypervisorStatus{
Aggregates: []hv1.Aggregate{
{Name: "agg-x", UUID: "uuid-x"},
},
},
},
want: []string{"uuid-x"},
},
{
name: "no aggregates",
obj: &hv1.Hypervisor{},
want: nil,
},
{
name: "wrong type",
obj: &corev1.ConfigMap{},
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := fn(tt.obj)
if !strSliceEqual(got, tt.want) {
t.Errorf("got %v, want %v", got, tt.want)
}
})
}
}

func strSliceEqual(a, b []string) bool {
if len(a) == 0 && len(b) == 0 {
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"

"github.com/cobaltcore-dev/cortex/pkg/conf"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -25,7 +26,7 @@ import (
// - allocation_requests is non-empty (at least one candidate found).
// - provider_summaries contains the test RP's UUID.
// 4. Cleanup: DELETE the test RP and custom resource class.
func e2eTestAllocationCandidates(ctx context.Context) error {
func e2eTestAllocationCandidates(ctx context.Context, _ client.Client) error {
log := logf.FromContext(ctx)
log.Info("Running allocation candidates endpoint e2e test")
config, err := conf.GetConfig[e2eRootConfig]()
Expand Down
3 changes: 2 additions & 1 deletion internal/shim/placement/handle_allocations_e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"

"github.com/cobaltcore-dev/cortex/pkg/conf"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -30,7 +31,7 @@ import (
// 7. GET /allocations/{consumer2} — verify the second allocation exists.
// 8. DELETE /allocations/{consumer} — remove allocations for both consumers.
// 9. Cleanup: DELETE the test RP and custom resource class.
func e2eTestAllocations(ctx context.Context) error {
func e2eTestAllocations(ctx context.Context, _ client.Client) error {
log := logf.FromContext(ctx)
log.Info("Running allocations endpoint e2e test")
config, err := conf.GetConfig[e2eRootConfig]()
Expand Down
3 changes: 2 additions & 1 deletion internal/shim/placement/handle_reshaper_e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"

"github.com/cobaltcore-dev/cortex/pkg/conf"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -30,7 +31,7 @@ import (
// 7. Verify allocation: GET /allocations/{consumer} — expect it to reference
// RP-B (not RP-A).
// 8. Cleanup: DELETE allocation, both RPs, and custom resource class.
func e2eTestReshaper(ctx context.Context) error {
func e2eTestReshaper(ctx context.Context, _ client.Client) error {
log := logf.FromContext(ctx)
log.Info("Running reshaper endpoint e2e test")
config, err := conf.GetConfig[e2eRootConfig]()
Expand Down
3 changes: 2 additions & 1 deletion internal/shim/placement/handle_resource_classes_e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"

"github.com/cobaltcore-dev/cortex/pkg/conf"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -23,7 +24,7 @@ import (
// 5. GET /resource_classes/{name} — verify the custom class now exists.
// 6. DELETE /resource_classes/{name} — remove the custom class.
// 7. GET /resource_classes/{name} — confirm deletion returns 404.
func e2eTestResourceClasses(ctx context.Context) error {
func e2eTestResourceClasses(ctx context.Context, _ client.Client) error {
log := logf.FromContext(ctx)
log.Info("Running resource classes endpoint e2e test")
config, err := conf.GetConfig[e2eRootConfig]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"slices"

"github.com/cobaltcore-dev/cortex/pkg/conf"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -26,7 +27,7 @@ import (
// 6. PUT /{uuid}/aggregates — clear aggregates by sending an empty list.
// 7. GET /{uuid}/aggregates — verify aggregates are empty after clear.
// 8. Cleanup: DELETE the test RP (also runs via deferred cleanup on failure).
func e2eTestResourceProviderAggregates(ctx context.Context) error {
func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) error {
log := logf.FromContext(ctx)
log.Info("Running resource provider aggregates endpoint e2e test")
config, err := conf.GetConfig[e2eRootConfig]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"

"github.com/cobaltcore-dev/cortex/pkg/conf"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -24,7 +25,7 @@ import (
// 4. GET /resource_providers — list real providers, then GET /{uuid}/allocations
// on up to 3 of them to verify the endpoint works with production data.
// 5. Cleanup: DELETE the test RP.
func e2eTestResourceProviderAllocations(ctx context.Context) error {
func e2eTestResourceProviderAllocations(ctx context.Context, _ client.Client) error {
log := logf.FromContext(ctx)
log.Info("Running resource provider allocations endpoint e2e test")
config, err := conf.GetConfig[e2eRootConfig]()
Expand Down
Loading
Loading