Skip to content

Commit 9486673

Browse files
authored
Prevent admitting inactive workloads (#7913)
* Prevent admitting inactive workloads * Adress lint finding & review comment * Yet another linter fix
1 parent cd629c0 commit 9486673

File tree

5 files changed

+138
-6
lines changed

5 files changed

+138
-6
lines changed

pkg/cache/queue/manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,12 @@ func (m *Manager) AddOrUpdateWorkload(w *kueue.Workload, opts ...workload.InfoOp
445445
}
446446

447447
func (m *Manager) AddOrUpdateWorkloadWithoutLock(w *kueue.Workload, opts ...workload.InfoOption) error {
448+
if !workload.IsActive(w) {
449+
return fmt.Errorf("workload %q is inactive and can't be added to a LocalQueue", w.Name)
450+
}
451+
if workload.HasQuotaReservation(w) {
452+
return fmt.Errorf("workload %q already has quota reserved and can't be added to a LocalQueue", w.Name)
453+
}
448454
qKey := queue.KeyFromWorkload(w)
449455
q := m.localQueues[qKey]
450456
if q == nil {

pkg/controller/core/workload_controller.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,8 +1149,10 @@ func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, q
11491149
continue
11501150
}
11511151

1152-
if err = h.r.queues.AddOrUpdateWorkload(wlCopy); err != nil {
1153-
log.V(2).Info("ignored an error for now", "error", err)
1152+
if workload.IsActive(wlCopy) && !workload.HasQuotaReservation(wlCopy) {
1153+
if err = h.r.queues.AddOrUpdateWorkload(wlCopy); err != nil {
1154+
log.V(2).Info("ignored an error for now", "error", err)
1155+
}
11541156
}
11551157
}
11561158
}

pkg/util/testing/core.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,23 @@ func CheckEventRecordedFor(ctx context.Context, k8sClient client.Client,
6060
len(events.Items), eventReason, eventType, eventMessage, ref.Namespace)
6161
}
6262

63-
// HasEventAppeared returns if an event has been emitted
64-
func HasEventAppeared(ctx context.Context, k8sClient client.Client, event corev1.Event) (bool, error) {
63+
// HasMatchingEventAppeared returns if an event has been emitted
64+
func HasMatchingEventAppeared(ctx context.Context, k8sClient client.Client, matcher func(*corev1.Event) bool) (bool, error) {
6565
events := &corev1.EventList{}
6666
if err := k8sClient.List(ctx, events, &client.ListOptions{}); err != nil {
6767
return false, err
6868
}
6969
for _, item := range events.Items {
70-
if item.Reason == event.Reason && item.Type == event.Type && item.Message == event.Message {
70+
if matcher(&item) {
7171
return true, nil
7272
}
7373
}
7474
return false, nil
7575
}
76+
77+
// HasEventAppeared returns if an event has been emitted
78+
func HasEventAppeared(ctx context.Context, k8sClient client.Client, event corev1.Event) (bool, error) {
79+
return HasMatchingEventAppeared(ctx, k8sClient, func(item *corev1.Event) bool {
80+
return item.Reason == event.Reason && item.Type == event.Type && item.Message == event.Message
81+
})
82+
}

test/integration/singlecluster/controller/core/suite_test.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ import (
2929
config "sigs.k8s.io/kueue/apis/config/v1beta2"
3030
qcache "sigs.k8s.io/kueue/pkg/cache/queue"
3131
schdcache "sigs.k8s.io/kueue/pkg/cache/scheduler"
32+
"sigs.k8s.io/kueue/pkg/constants"
3233
"sigs.k8s.io/kueue/pkg/controller/core"
3334
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
35+
"sigs.k8s.io/kueue/pkg/scheduler"
3436
"sigs.k8s.io/kueue/pkg/webhooks"
3537
"sigs.k8s.io/kueue/test/integration/framework"
3638
"sigs.k8s.io/kueue/test/util"
@@ -63,12 +65,31 @@ var _ = ginkgo.AfterSuite(func() {
6365
fwk.Teardown()
6466
})
6567

68+
type managerSetupOpts struct {
69+
runScheduler bool
70+
}
71+
72+
type managerSetupOption func(*managerSetupOpts)
73+
74+
func runScheduler(opts *managerSetupOpts) {
75+
opts.runScheduler = true
76+
}
77+
6678
func managerSetup(ctx context.Context, mgr manager.Manager) {
6779
managerAndControllerSetup(nil)(ctx, mgr)
6880
}
6981

70-
func managerAndControllerSetup(controllersCfg *config.Configuration) framework.ManagerSetup {
82+
func managerAndSchedulerSetup(ctx context.Context, mgr manager.Manager) {
83+
managerAndControllerSetup(nil, runScheduler)(ctx, mgr)
84+
}
85+
86+
func managerAndControllerSetup(controllersCfg *config.Configuration, options ...managerSetupOption) framework.ManagerSetup {
7187
return func(ctx context.Context, mgr manager.Manager) {
88+
var opts managerSetupOpts
89+
for _, opt := range options {
90+
opt(&opts)
91+
}
92+
7293
err := indexer.Setup(ctx, mgr.GetFieldIndexer())
7394
gomega.Expect(err).NotTo(gomega.HaveOccurred())
7495

@@ -88,5 +109,11 @@ func managerAndControllerSetup(controllersCfg *config.Configuration) framework.M
88109

89110
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, controllersCfg)
90111
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)
112+
113+
if opts.runScheduler {
114+
sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName))
115+
err = sched.Start(ctx)
116+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
117+
}
91118
}
92119
}

test/integration/singlecluster/controller/core/workload_controller_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/onsi/ginkgo/v2"
2424
"github.com/onsi/gomega"
2525
corev1 "k8s.io/api/core/v1"
26+
nodev1 "k8s.io/api/node/v1"
2627
apimeta "k8s.io/apimachinery/pkg/api/meta"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/utils/ptr"
@@ -612,6 +613,95 @@ var _ = ginkgo.Describe("Workload controller", ginkgo.Ordered, ginkgo.ContinueOn
612613
})
613614
})
614615

616+
var _ = ginkgo.Describe("Workload controller interaction with scheduler", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
617+
var (
618+
ns *corev1.Namespace
619+
clusterQueue *kueue.ClusterQueue
620+
localQueue *kueue.LocalQueue
621+
wl *kueue.Workload
622+
)
623+
624+
ginkgo.BeforeAll(func() {
625+
fwk.StartManager(ctx, cfg, managerAndSchedulerSetup)
626+
})
627+
628+
ginkgo.AfterAll(func() {
629+
fwk.StopManager(ctx)
630+
})
631+
632+
ginkgo.When("workload's runtime class is changed", func() {
633+
var flavor *kueue.ResourceFlavor
634+
var runtimeClass *nodev1.RuntimeClass
635+
636+
const runtimeClassName = "test-kueue-class"
637+
638+
ginkgo.BeforeEach(func() {
639+
ns = util.CreateNamespaceFromPrefixWithLog(ctx, k8sClient, "core-workload-")
640+
flavor = utiltestingapi.MakeResourceFlavor(flavorOnDemand).Obj()
641+
util.MustCreate(ctx, k8sClient, flavor)
642+
clusterQueue = utiltestingapi.MakeClusterQueue("cluster-queue").
643+
ResourceGroup(*utiltestingapi.MakeFlavorQuotas(flavorOnDemand).Resource(corev1.ResourceCPU, "5", "5").Obj()).
644+
Cohort("cohort").
645+
Obj()
646+
util.MustCreate(ctx, k8sClient, clusterQueue)
647+
localQueue = utiltestingapi.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj()
648+
util.MustCreate(ctx, k8sClient, localQueue)
649+
runtimeClass = utiltesting.MakeRuntimeClass(runtimeClassName, "rc-handler-1").Obj()
650+
util.MustCreate(ctx, k8sClient, runtimeClass)
651+
})
652+
653+
ginkgo.AfterEach(func() {
654+
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
655+
util.ExpectObjectToBeDeleted(ctx, k8sClient, clusterQueue, true)
656+
util.ExpectObjectToBeDeleted(ctx, k8sClient, flavor, true)
657+
util.ExpectObjectToBeDeleted(ctx, k8sClient, runtimeClass, true)
658+
})
659+
660+
ginkgo.It("should not temporarily admit an inactive workload", func() {
661+
ginkgo.By("creating an inactive workload", func() {
662+
wl = utiltestingapi.MakeWorkload("wl1", ns.Name).
663+
Queue(kueue.LocalQueueName(localQueue.Name)).
664+
Request(corev1.ResourceCPU, "1").
665+
RuntimeClass(runtimeClassName).
666+
Active(false).
667+
Obj()
668+
util.MustCreate(ctx, k8sClient, wl)
669+
670+
wlKey := client.ObjectKeyFromObject(wl)
671+
gomega.Eventually(func(g gomega.Gomega) {
672+
g.Expect(k8sClient.Get(ctx, wlKey, wl)).To(gomega.Succeed())
673+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
674+
})
675+
676+
ginkgo.By("changing the runtime class", func() {
677+
gomega.Eventually(func(g gomega.Gomega) {
678+
runtimeClassKey := client.ObjectKeyFromObject(runtimeClass)
679+
g.Expect(k8sClient.Get(ctx, runtimeClassKey, runtimeClass)).To(gomega.Succeed())
680+
if runtimeClass.ObjectMeta.Annotations == nil {
681+
runtimeClass.ObjectMeta.Annotations = map[string]string{}
682+
}
683+
runtimeClass.ObjectMeta.Annotations["foo"] = "bar"
684+
g.Expect(k8sClient.Update(ctx, runtimeClass)).To(gomega.Succeed())
685+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
686+
})
687+
688+
ginkgo.By("checking no 'quota reseved' event appearing for the workload", func() {
689+
gomega.Consistently(func(g gomega.Gomega) {
690+
found, err := utiltesting.HasMatchingEventAppeared(ctx, k8sClient, func(e *corev1.Event) bool {
691+
return e.Reason == "QuotaReserved" &&
692+
e.Type == corev1.EventTypeNormal &&
693+
e.InvolvedObject.Kind == "Workload" &&
694+
e.InvolvedObject.Name == wl.Name &&
695+
e.InvolvedObject.Namespace == wl.Namespace
696+
})
697+
g.Expect(err).NotTo(gomega.HaveOccurred())
698+
g.Expect(found).To(gomega.BeTrue())
699+
}, util.ShortTimeout, util.Interval).ShouldNot(gomega.Succeed())
700+
})
701+
})
702+
})
703+
})
704+
615705
var _ = ginkgo.Describe("Workload controller with resource retention", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
616706
ginkgo.When("manager is setup with tiny retention period", func() {
617707
var (

0 commit comments

Comments
 (0)