Skip to content

Commit e5a7e6c

Browse files
committed
[BestEffortFIFO] Implement Sticky ClusterQueue Head Policy
1 parent 0d6a4ae commit e5a7e6c

File tree

2 files changed

+247
-2
lines changed

2 files changed

+247
-2
lines changed

pkg/cache/queue/cluster_queue.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,22 @@ var (
5656
realClock = clock.RealClock{}
5757
)
5858

59+
type stickyWorkload struct {
60+
workloadName workload.Reference
61+
}
62+
63+
func (s *stickyWorkload) matches(workload workload.Reference) bool {
64+
return s.workloadName == workload
65+
}
66+
67+
func (s *stickyWorkload) clear() {
68+
s.workloadName = ""
69+
}
70+
71+
func (s *stickyWorkload) set(workload workload.Reference) {
72+
s.workloadName = workload
73+
}
74+
5975
type ClusterQueue struct {
6076
hierarchy.ClusterQueue[*cohort]
6177
name kueue.ClusterQueueReference
@@ -90,6 +106,8 @@ type ClusterQueue struct {
90106

91107
afsEntryPenalties *utilmaps.SyncMap[utilqueue.LocalQueueReference, corev1.ResourceList]
92108
localQueuesInClusterQueue map[utilqueue.LocalQueueReference]bool
109+
110+
sw *stickyWorkload
93111
}
94112

95113
func (c *ClusterQueue) GetName() kueue.ClusterQueueReference {
@@ -111,7 +129,8 @@ func newClusterQueue(ctx context.Context, client client.Client, cq *kueue.Cluste
111129
}
112130

113131
func newClusterQueueImpl(ctx context.Context, client client.Client, wo workload.Ordering, clock clock.Clock, fsResWeights map[corev1.ResourceName]float64, enableAdmissionFs bool, afsEntryPenalties *utilmaps.SyncMap[utilqueue.LocalQueueReference, corev1.ResourceList]) *ClusterQueue {
114-
lessFunc := queueOrderingFunc(ctx, client, wo, fsResWeights, enableAdmissionFs, afsEntryPenalties)
132+
sw := stickyWorkload{}
133+
lessFunc := queueOrderingFunc(ctx, client, wo, fsResWeights, enableAdmissionFs, afsEntryPenalties, &sw)
115134
return &ClusterQueue{
116135
heap: *heap.New(workloadKey, lessFunc),
117136
inadmissibleWorkloads: make(map[workload.Reference]*workload.Info),
@@ -121,6 +140,7 @@ func newClusterQueueImpl(ctx context.Context, client client.Client, wo workload.
121140
clock: clock,
122141
afsEntryPenalties: afsEntryPenalties,
123142
localQueuesInClusterQueue: make(map[utilqueue.LocalQueueReference]bool),
143+
sw: &sw,
124144
}
125145
}
126146

@@ -342,6 +362,7 @@ func (c *ClusterQueue) Pop() *workload.Info {
342362
}
343363

344364
c.popCycle++
365+
c.sw.clear()
345366
if c.heap.Len() == 0 {
346367
c.inflight = nil
347368
return nil
@@ -453,6 +474,10 @@ func (c *ClusterQueue) Active() bool {
453474
// The workload should not be reinserted if it's already in the ClusterQueue.
454475
// Returns true if the workload was inserted.
455476
func (c *ClusterQueue) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueReason) bool {
477+
if reason == RequeueReasonPendingPreemption && c.queueingStrategy == kueue.BestEffortFIFO {
478+
c.sw.set(workload.Key(wInfo.Obj))
479+
}
480+
456481
if c.queueingStrategy == kueue.StrictFIFO {
457482
return c.requeueIfNotPresent(wInfo, reason != RequeueReasonNamespaceMismatch)
458483
}
@@ -463,7 +488,7 @@ func (c *ClusterQueue) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueR
463488
// to sort workloads. The function sorts workloads based on their priority.
464489
// When priorities are equal, it uses the workload's creation or eviction
465490
// time.
466-
func queueOrderingFunc(ctx context.Context, c client.Client, wo workload.Ordering, fsResWeights map[corev1.ResourceName]float64, enableAdmissionFs bool, afsEntryPenalties *utilmaps.SyncMap[utilqueue.LocalQueueReference, corev1.ResourceList]) func(a, b *workload.Info) bool {
491+
func queueOrderingFunc(ctx context.Context, c client.Client, wo workload.Ordering, fsResWeights map[corev1.ResourceName]float64, enableAdmissionFs bool, afsEntryPenalties *utilmaps.SyncMap[utilqueue.LocalQueueReference, corev1.ResourceList], sticky *stickyWorkload) func(a, b *workload.Info) bool {
467492
log := ctrl.LoggerFrom(ctx)
468493
return func(a, b *workload.Info) bool {
469494
if enableAdmissionFs {
@@ -482,6 +507,14 @@ func queueOrderingFunc(ctx context.Context, c client.Client, wo workload.Orderin
482507
}
483508
}
484509
}
510+
511+
if sticky.matches(workload.Key(a.Obj)) {
512+
return true
513+
}
514+
if sticky.matches(workload.Key(b.Obj)) {
515+
return false
516+
}
517+
485518
p1 := utilpriority.Priority(a.Obj)
486519
p2 := utilpriority.Priority(b.Obj)
487520

test/integration/singlecluster/scheduler/fairsharing/fair_sharing_test.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,218 @@ var _ = ginkgo.Describe("Scheduler", ginkgo.Ordered, ginkgo.ContinueOnFailure, f
649649
})
650650
})
651651

652+
// kueue#7101
653+
ginkgo.When("ClusterQueue head is ineligible for admission due to DominantResourceShare", func() {
654+
var (
655+
cqp1 *kueue.ClusterQueue
656+
cqp2 *kueue.ClusterQueue
657+
)
658+
ginkgo.BeforeEach(func() {
659+
createCohort(testing.MakeCohort("cohort-a").
660+
Parent("root-cohort").
661+
FairWeight(resource.MustParse("1")).
662+
ResourceGroup(
663+
*testing.MakeFlavorQuotas("flavor1").Resource(corev1.ResourceCPU, "9").Obj(),
664+
).
665+
Obj())
666+
667+
fungibility := kueue.FlavorFungibility{
668+
WhenCanBorrow: kueue.TryNextFlavor,
669+
WhenCanPreempt: kueue.TryNextFlavor,
670+
}
671+
preemption := kueue.ClusterQueuePreemption{
672+
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
673+
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
674+
}
675+
676+
cqp1 = createQueue(testing.MakeClusterQueue("cq-p1").
677+
Cohort("cohort-a").
678+
FairWeight(resource.MustParse("1")).
679+
ResourceGroup(
680+
*testing.MakeFlavorQuotas("flavor1").Resource(corev1.ResourceCPU, "0").Obj(),
681+
).
682+
FlavorFungibility(fungibility).
683+
Preemption(preemption).
684+
Obj())
685+
686+
cqp2 = createQueue(testing.MakeClusterQueue("cq-p2").
687+
Cohort("cohort-a").
688+
FairWeight(resource.MustParse("1")).
689+
ResourceGroup(
690+
*testing.MakeFlavorQuotas("flavor1").Resource(corev1.ResourceCPU, "0").Obj(),
691+
).
692+
FlavorFungibility(fungibility).
693+
Preemption(preemption).
694+
Obj())
695+
_ = features.SetEnable(features.FlavorFungibilityImplicitPreferenceDefault, true)
696+
})
697+
ginkgo.AfterEach(func() {
698+
_ = features.SetEnable(features.FlavorFungibilityImplicitPreferenceDefault, false)
699+
})
700+
701+
ginkgo.It("workload of size 5 preempts using LessThanInitialShare policy and admits", func() {
702+
ginkgo.By("Create workloads in queue1")
703+
for range 4 {
704+
createWorkload("cq-p1", "2")
705+
}
706+
util.ExpectReservingActiveWorkloadsMetric(cqp1, 4)
707+
708+
ginkgo.By("Create workload in queue2")
709+
createWorkload("cq-p2", "5")
710+
711+
ginkgo.By("Complete preemption")
712+
util.FinishEvictionOfWorkloadsInCQ(ctx, k8sClient, cqp1, 2)
713+
714+
ginkgo.By("Expect active workloads")
715+
util.ExpectReservingActiveWorkloadsMetric(cqp1, 2)
716+
util.ExpectReservingActiveWorkloadsMetric(cqp2, 1)
717+
util.ExpectClusterQueueWeightedShareMetric(cqp1, 445)
718+
util.ExpectClusterQueueWeightedShareMetric(cqp2, 556)
719+
})
720+
721+
ginkgo.It("workload of size 5 admits with inadmissible higher priority workload at ClusterQueue head", func() {
722+
ginkgo.By("Create workloads in queue1")
723+
for range 4 {
724+
createWorkload("cq-p1", "2")
725+
}
726+
util.ExpectReservingActiveWorkloadsMetric(cqp1, 4)
727+
728+
ginkgo.By("Create workloads in queue2")
729+
createWorkloadWithPriority("cq-p2", "6", 999)
730+
731+
ginkgo.By("Verify doesn't admit")
732+
util.ExpectReservingActiveWorkloadsMetric(cqp2, 0)
733+
734+
ginkgo.By("Create admissible workload in queue2")
735+
createWorkloadWithPriority("cq-p2", "5", 0)
736+
737+
ginkgo.By("Complete preemption")
738+
util.FinishEvictionOfWorkloadsInCQ(ctx, k8sClient, cqp1, 2)
739+
740+
ginkgo.By("Expect active workloads")
741+
util.ExpectReservingActiveWorkloadsMetric(cqp1, 2)
742+
util.ExpectReservingActiveWorkloadsMetric(cqp2, 1)
743+
util.ExpectClusterQueueWeightedShareMetric(cqp1, 445)
744+
util.ExpectClusterQueueWeightedShareMetric(cqp2, 556)
745+
})
746+
747+
ginkgo.It("workload of size 4 admits with inadmissible higher priority workload at ClusterQueue head", func() {
748+
ginkgo.By("Create workloads in queue1")
749+
for range 4 {
750+
createWorkload("cq-p1", "2")
751+
}
752+
util.ExpectReservingActiveWorkloadsMetric(cqp1, 4)
753+
754+
ginkgo.By("Create workload in queue2")
755+
createWorkloadWithPriority("cq-p2", "6", 999)
756+
757+
ginkgo.By("Verify doesn't admit")
758+
util.ExpectReservingActiveWorkloadsMetric(cqp2, 0)
759+
760+
ginkgo.By("Create admissible workload in queue2")
761+
createWorkloadWithPriority("cq-p2", "4", 0)
762+
763+
ginkgo.By("Complete preemption")
764+
util.FinishEvictionOfWorkloadsInCQ(ctx, k8sClient, cqp1, 2)
765+
766+
ginkgo.By("Expect active workloads")
767+
util.ExpectReservingActiveWorkloadsMetric(cqp1, 2)
768+
util.ExpectReservingActiveWorkloadsMetric(cqp2, 1)
769+
util.ExpectClusterQueueWeightedShareMetric(cqp1, 445)
770+
util.ExpectClusterQueueWeightedShareMetric(cqp2, 445)
771+
})
772+
773+
ginkgo.It("workload admits when several higher priority blocking workloads in front", func() {
774+
ginkgo.By("Create workloads in queue1")
775+
for range 4 {
776+
createWorkload("cq-p1", "2")
777+
}
778+
util.ExpectReservingActiveWorkloadsMetric(cqp1, 4)
779+
780+
ginkgo.By("Create workloads in queue2")
781+
createWorkloadWithPriority("cq-p2", "7", 999)
782+
createWorkloadWithPriority("cq-p2", "6", 999)
783+
784+
ginkgo.By("Verify don't admit")
785+
util.ExpectReservingActiveWorkloadsMetric(cqp2, 0)
786+
787+
ginkgo.By("Create admissible workload in queue2")
788+
createWorkloadWithPriority("cq-p2", "5", 0)
789+
790+
ginkgo.By("Complete preemption")
791+
util.FinishEvictionOfWorkloadsInCQ(ctx, k8sClient, cqp1, 2)
792+
793+
ginkgo.By("Expect active workloads")
794+
util.ExpectReservingActiveWorkloadsMetric(cqp1, 2)
795+
util.ExpectReservingActiveWorkloadsMetric(cqp2, 1)
796+
util.ExpectClusterQueueWeightedShareMetric(cqp1, 445)
797+
util.ExpectClusterQueueWeightedShareMetric(cqp2, 556)
798+
})
799+
800+
// kueue#6929
801+
ginkgo.When("ClusterQueue head has inadmissible workload", func() {
802+
var (
803+
cq1 *kueue.ClusterQueue
804+
cq2 *kueue.ClusterQueue
805+
)
806+
ginkgo.BeforeEach(func() {
807+
fungibility := kueue.FlavorFungibility{
808+
WhenCanBorrow: kueue.TryNextFlavor,
809+
WhenCanPreempt: kueue.TryNextFlavor,
810+
}
811+
preemption := kueue.ClusterQueuePreemption{
812+
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
813+
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
814+
}
815+
816+
cq1 = createQueue(testing.MakeClusterQueue("cq1").
817+
Cohort("root").
818+
FairWeight(resource.MustParse("1")).
819+
ResourceGroup(
820+
*testing.MakeFlavorQuotas("flavor1").Resource(corev1.ResourceCPU, "3").Obj(),
821+
).
822+
FlavorFungibility(fungibility).
823+
Preemption(preemption).
824+
Obj())
825+
826+
cq2 = createQueue(testing.MakeClusterQueue("cq2").
827+
Cohort("root").
828+
FairWeight(resource.MustParse("1")).
829+
ResourceGroup(
830+
*testing.MakeFlavorQuotas("flavor1").Resource(corev1.ResourceCPU, "0").Obj(),
831+
).
832+
FlavorFungibility(fungibility).
833+
Preemption(preemption).
834+
Obj())
835+
})
836+
837+
ginkgo.It("workload which fits behind ClusterQueue head is able to admit", func() {
838+
ginkgo.By("Creating borrowing workloads in queue2")
839+
createWorkload("cq2", "1")
840+
createWorkload("cq2", "1")
841+
util.ExpectReservingActiveWorkloadsMetric(cq2, 2)
842+
843+
ginkgo.By("Create inadmissible workload in queue2")
844+
createWorkloadWithPriority("cq1", "4", 999)
845+
846+
ginkgo.By("Verify doesn't admit")
847+
util.ExpectReservingActiveWorkloadsMetric(cq1, 0)
848+
849+
ginkgo.By("Create admissible workload in queue2")
850+
createWorkloadWithPriority("cq1", "3", 0)
851+
852+
ginkgo.By("Complete preemption")
853+
util.FinishEvictionOfWorkloadsInCQ(ctx, k8sClient, cq2, 2)
854+
855+
ginkgo.By("Expect active workloads")
856+
util.ExpectReservingActiveWorkloadsMetric(cq1, 1)
857+
util.ExpectReservingActiveWorkloadsMetric(cq2, 0)
858+
util.ExpectClusterQueueWeightedShareMetric(cq1, 0)
859+
util.ExpectClusterQueueWeightedShareMetric(cq2, 0)
860+
})
861+
})
862+
})
863+
652864
ginkgo.When("Using AdmissionFairSharing at ClusterQueue level", func() {
653865
var (
654866
cq1 *kueue.ClusterQueue

0 commit comments

Comments
 (0)