Skip to content

Commit f8d2887

Browse files
authored
[BestEffortFIFO] Implement Sticky ClusterQueue Head Policy (#7157)
* [BestEffortFIFO] Implement Sticky ClusterQueue Head Policy * assert using ExpectAdmittedWorkloadsTotalMetric * FinishEvictionOfAnyWorkloadsInCq method * fix flakiness; new test case
1 parent dc263e0 commit f8d2887

File tree

3 files changed

+371
-21
lines changed

3 files changed

+371
-21
lines changed

pkg/cache/queue/cluster_queue.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,28 @@ var (
5656
realClock = clock.RealClock{}
5757
)
5858

59+
// stickyWorkload is the workload at the ClusterQueue head which is
60+
// currently preempting workloads. It is only enabled for
61+
// BestEffortFIFO policy, and prevents skipped over ineligible
62+
// workloads from going back to the head of the queue. A workload is
63+
// considered sticky until it is admitted, unschedulable, or deleted.
64+
// See Kueue#6929 and Kueue#7101 for motivation.
65+
type stickyWorkload struct {
66+
workloadName workload.Reference
67+
}
68+
69+
func (s *stickyWorkload) matches(workload workload.Reference) bool {
70+
return s.workloadName == workload
71+
}
72+
73+
func (s *stickyWorkload) clear() {
74+
s.workloadName = ""
75+
}
76+
77+
func (s *stickyWorkload) set(workload workload.Reference) {
78+
s.workloadName = workload
79+
}
80+
5981
type ClusterQueue struct {
6082
hierarchy.ClusterQueue[*cohort]
6183
name kueue.ClusterQueueReference
@@ -90,6 +112,8 @@ type ClusterQueue struct {
90112

91113
afsEntryPenalties *utilmaps.SyncMap[utilqueue.LocalQueueReference, corev1.ResourceList]
92114
localQueuesInClusterQueue map[utilqueue.LocalQueueReference]bool
115+
116+
sw *stickyWorkload
93117
}
94118

95119
func (c *ClusterQueue) GetName() kueue.ClusterQueueReference {
@@ -111,7 +135,8 @@ func newClusterQueue(ctx context.Context, client client.Client, cq *kueue.Cluste
111135
}
112136

113137
func newClusterQueueImpl(ctx context.Context, client client.Client, wo workload.Ordering, clock clock.Clock, fsResWeights map[corev1.ResourceName]float64, enableAdmissionFs bool, afsEntryPenalties *utilmaps.SyncMap[utilqueue.LocalQueueReference, corev1.ResourceList]) *ClusterQueue {
114-
lessFunc := queueOrderingFunc(ctx, client, wo, fsResWeights, enableAdmissionFs, afsEntryPenalties)
138+
sw := stickyWorkload{}
139+
lessFunc := queueOrderingFunc(ctx, client, wo, fsResWeights, enableAdmissionFs, afsEntryPenalties, &sw)
115140
return &ClusterQueue{
116141
heap: *heap.New(workloadKey, lessFunc),
117142
inadmissibleWorkloads: make(map[workload.Reference]*workload.Info),
@@ -121,6 +146,7 @@ func newClusterQueueImpl(ctx context.Context, client client.Client, wo workload.
121146
clock: clock,
122147
afsEntryPenalties: afsEntryPenalties,
123148
localQueuesInClusterQueue: make(map[utilqueue.LocalQueueReference]bool),
149+
sw: &sw,
124150
}
125151
}
126152

@@ -223,6 +249,9 @@ func (c *ClusterQueue) delete(w *kueue.Workload) {
223249
delete(c.inadmissibleWorkloads, key)
224250
c.heap.Delete(key)
225251
c.forgetInflightByKey(key)
252+
if c.sw.matches(key) {
253+
c.sw.clear()
254+
}
226255
}
227256

228257
// DeleteFromLocalQueue removes all workloads belonging to this queue from
@@ -453,6 +482,13 @@ func (c *ClusterQueue) Active() bool {
453482
// The workload should not be reinserted if it's already in the ClusterQueue.
454483
// Returns true if the workload was inserted.
455484
func (c *ClusterQueue) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueReason) bool {
485+
// when preemptions are in-progress, we keep attempting to
486+
// schedule the same workload for BestEffortFIFO queues. See
487+
// documentation of stickyWorkload for more details
488+
if reason == RequeueReasonPendingPreemption && c.queueingStrategy == kueue.BestEffortFIFO {
489+
c.sw.set(workload.Key(wInfo.Obj))
490+
}
491+
456492
if c.queueingStrategy == kueue.StrictFIFO {
457493
return c.requeueIfNotPresent(wInfo, reason != RequeueReasonNamespaceMismatch)
458494
}
@@ -463,7 +499,7 @@ func (c *ClusterQueue) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueR
463499
// to sort workloads. The function sorts workloads based on their priority.
464500
// When priorities are equal, it uses the workload's creation or eviction
465501
// time.
466-
func queueOrderingFunc(ctx context.Context, c client.Client, wo workload.Ordering, fsResWeights map[corev1.ResourceName]float64, enableAdmissionFs bool, afsEntryPenalties *utilmaps.SyncMap[utilqueue.LocalQueueReference, corev1.ResourceList]) func(a, b *workload.Info) bool {
502+
func queueOrderingFunc(ctx context.Context, c client.Client, wo workload.Ordering, fsResWeights map[corev1.ResourceName]float64, enableAdmissionFs bool, afsEntryPenalties *utilmaps.SyncMap[utilqueue.LocalQueueReference, corev1.ResourceList], sw *stickyWorkload) func(a, b *workload.Info) bool {
467503
log := ctrl.LoggerFrom(ctx)
468504
return func(a, b *workload.Info) bool {
469505
if enableAdmissionFs {
@@ -482,6 +518,14 @@ func queueOrderingFunc(ctx context.Context, c client.Client, wo workload.Orderin
482518
}
483519
}
484520
}
521+
522+
if sw.matches(workload.Key(a.Obj)) {
523+
return true
524+
}
525+
if sw.matches(workload.Key(b.Obj)) {
526+
return false
527+
}
528+
485529
p1 := utilpriority.Priority(a.Obj)
486530
p2 := utilpriority.Priority(b.Obj)
487531

0 commit comments

Comments
 (0)