Skip to content

Commit f4fa7f7

Browse files
Improved RayJob Logic - bugfix on Lifecycled RayJobs causing Webhook to panic
len(spec.ClusterSelector)>0 && spec.RayClusterSpec != nil -> validation error len(spec.ClusterSelector)>0 && spec.RayClusterSpec == nil - valid len(spec.ClusterSelector)==0 && spec.ClusterSpec == nil -> validation error len(spec.ClusterSelector)==0 && spec.ClusterSpec != nil -> valid + perform additional validation
1 parent baf4c23 commit f4fa7f7

File tree

6 files changed

+129
-6
lines changed

6 files changed

+129
-6
lines changed

pkg/controller/jobs/rayjob/rayjob_controller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ type RayJob rayv1.RayJob
7878

7979
var _ jobframework.GenericJob = (*RayJob)(nil)
8080
var _ jobframework.JobWithManagedBy = (*RayJob)(nil)
81+
var _ jobframework.JobWithSkip = (*RayJob)(nil)
8182

8283
func (j *RayJob) Object() client.Object {
8384
return (*rayv1.RayJob)(j)
@@ -100,6 +101,12 @@ func (j *RayJob) Suspend() {
100101
j.Spec.Suspend = true
101102
}
102103

104+
func (j *RayJob) Skip(ctx context.Context) bool {
105+
// Skip reconciliation for RayJobs that use clusterSelector to reference existing clusters.
106+
// These jobs are not managed by Kueue.
107+
return len(j.Spec.ClusterSelector) > 0
108+
}
109+
103110
func (j *RayJob) GVK() schema.GroupVersionKind {
104111
return gvk
105112
}

pkg/controller/jobs/rayjob/rayjob_webhook.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,17 +110,34 @@ func (w *RayJobWebhook) validateCreate(ctx context.Context, job *rayv1.RayJob) (
110110
if w.manageJobsWithoutQueueName || jobframework.QueueName(kueueJob) != "" {
111111
spec := &job.Spec
112112
specPath := field.NewPath("spec")
113+
hasClusterSelector := len(spec.ClusterSelector) > 0
114+
hasRayClusterSpec := spec.RayClusterSpec != nil
115+
116+
// Validate the combination of clusterSelector and RayClusterSpec
117+
if hasClusterSelector && hasRayClusterSpec {
118+
// len(spec.ClusterSelector)>0 && spec.RayClusterSpec != nil -> validation error
119+
allErrors = append(allErrors, field.Invalid(specPath.Child("clusterSelector"), spec.ClusterSelector, "a kueue managed job should not use an existing cluster"))
120+
return allErrors, nil
121+
}
122+
123+
if hasClusterSelector && !hasRayClusterSpec {
124+
// len(spec.ClusterSelector)>0 && spec.RayClusterSpec == nil -> valid (skip validation)
125+
// RayJobs using existing clusters are not managed by Kueue
126+
return allErrors, nil
127+
}
128+
129+
if !hasClusterSelector && !hasRayClusterSpec {
130+
// len(spec.ClusterSelector)==0 && spec.RayClusterSpec == nil -> validation error
131+
allErrors = append(allErrors, field.Required(specPath.Child("rayClusterSpec"), "rayClusterSpec is required for Kueue-managed jobs that don't use clusterSelector"))
132+
return allErrors, nil
133+
}
134+
// len(spec.ClusterSelector)==0 && spec.RayClusterSpec != nil -> valid + perform additional validation
113135

114136
// Should always delete the cluster after the job has ended, otherwise it will continue to the queue's resources.
115137
if !spec.ShutdownAfterJobFinishes {
116138
allErrors = append(allErrors, field.Invalid(specPath.Child("shutdownAfterJobFinishes"), spec.ShutdownAfterJobFinishes, "a kueue managed job should delete the cluster after finishing"))
117139
}
118140

119-
// Should not want existing cluster. Kueue (workload) should be able to control the admission of the actual work, not only the trigger.
120-
if len(spec.ClusterSelector) > 0 {
121-
allErrors = append(allErrors, field.Invalid(specPath.Child("clusterSelector"), spec.ClusterSelector, "a kueue managed job should not use an existing cluster"))
122-
}
123-
124141
clusterSpec := spec.RayClusterSpec
125142
clusterSpecPath := specPath.Child("rayClusterSpec")
126143

pkg/controller/jobs/rayjob/rayjob_webhook_test.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,25 @@ func TestValidateCreate(t *testing.T) {
144144
Obj(),
145145
wantErr: nil,
146146
},
147+
148+
"valid managed - has cluster selector but no RayClusterSpec": {
149+
job: testingrayutil.MakeJob("job", "ns").Queue("queue").
150+
ClusterSelector(map[string]string{
151+
"k1": "v1",
152+
}).
153+
RayClusterSpec(nil).
154+
Obj(),
155+
localQueueDefaulting: false,
156+
wantErr: nil,
157+
},
158+
"invalid managed - no cluster selector and no RayClusterSpec": {
159+
job: testingrayutil.MakeJob("job", "ns").Queue("queue").
160+
RayClusterSpec(nil).
161+
Obj(),
162+
wantErr: field.ErrorList{
163+
field.Required(field.NewPath("spec", "rayClusterSpec"), "rayClusterSpec is required for Kueue-managed jobs that don't use clusterSelector"),
164+
}.ToAggregate(),
165+
},
147166
"invalid unmanaged - local queue default": {
148167
job: testingrayutil.MakeJob("job", "ns").
149168
ShutdownAfterJobFinishes(false).
@@ -168,7 +187,7 @@ func TestValidateCreate(t *testing.T) {
168187
field.Invalid(field.NewPath("spec", "shutdownAfterJobFinishes"), false, "a kueue managed job should delete the cluster after finishing"),
169188
}.ToAggregate(),
170189
},
171-
"invalid managed - has cluster selector": {
190+
"invalid managed - has cluster selector and RayClusterSpec": {
172191
job: testingrayutil.MakeJob("job", "ns").Queue("queue").
173192
ClusterSelector(map[string]string{
174193
"k1": "v1",

pkg/util/testingjobs/rayjob/wrappers.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ func (j *JobWrapper) WithEnableAutoscaling(value *bool) *JobWrapper {
147147
return j
148148
}
149149

150+
func (j *JobWrapper) RayClusterSpec(spec *rayv1.RayClusterSpec) *JobWrapper {
151+
j.Spec.RayClusterSpec = spec
152+
return j
153+
}
154+
150155
func (j *JobWrapper) WithWorkerGroups(workers ...rayv1.WorkerGroupSpec) *JobWrapper {
151156
j.Spec.RayClusterSpec.WorkerGroupSpecs = workers
152157
return j

test/integration/singlecluster/controller/jobs/rayjob/rayjob_controller_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,4 +681,30 @@ var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered
681681
g.Expect(createdJob.Spec.Suspend).Should(gomega.BeFalse())
682682
}, util.Timeout, util.Interval).Should(gomega.Succeed())
683683
})
684+
685+
ginkgo.It("Should skip reconciliation for RayJobs with clusterSelector", func() {
686+
ginkgo.By("Creating a RayJob with clusterSelector and queue label")
687+
job := testingrayjob.MakeJob("rayjob-with-selector", ns.Name).
688+
Queue("test-queue").
689+
ClusterSelector(map[string]string{"ray.io/cluster": "existing-cluster"}).
690+
Suspend(false).
691+
Obj()
692+
util.MustCreate(ctx, k8sClient, job)
693+
694+
ginkgo.By("Checking that no workload is created for this job and the job remains unchanged")
695+
lookupKey := types.NamespacedName{
696+
Name: workloadrayjob.GetWorkloadNameForRayJob(job.Name, job.UID),
697+
Namespace: ns.Name,
698+
}
699+
createdWorkload := &kueue.Workload{}
700+
createdJob := &rayv1.RayJob{}
701+
702+
gomega.Consistently(func(g gomega.Gomega) {
703+
err := k8sClient.Get(ctx, lookupKey, createdWorkload)
704+
g.Expect(err).Should(gomega.HaveOccurred())
705+
g.Expect(client.IgnoreNotFound(err)).Should(gomega.Succeed())
706+
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(job), createdJob)).Should(gomega.Succeed())
707+
g.Expect(createdJob.Spec.Suspend).Should(gomega.BeFalse())
708+
}, util.ConsistentDuration, util.Interval).Should(gomega.Succeed())
709+
})
684710
})

test/integration/singlecluster/webhook/jobs/rayjob_webhook_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,54 @@ var _ = ginkgo.Describe("RayJob Webhook", func() {
6161
gomega.Expect(err).Should(gomega.HaveOccurred())
6262
gomega.Expect(err).Should(testing.BeForbiddenError())
6363
})
64+
65+
ginkgo.It("should reject RayJob with clusterSelector and RayClusterSpec", func() {
66+
job := testingjob.MakeJob("rayjob-with-both", ns.Name).
67+
Queue("queue-name").
68+
ClusterSelector(map[string]string{"ray.io/cluster": "existing-cluster"}).
69+
Obj()
70+
// clusterSelector + RayClusterSpec -> validation error
71+
err := k8sClient.Create(ctx, job)
72+
gomega.Expect(err).Should(gomega.HaveOccurred())
73+
gomega.Expect(err).Should(testing.BeForbiddenError())
74+
})
75+
76+
ginkgo.It("should allow RayJob with clusterSelector but no RayClusterSpec", func() {
77+
job := testingjob.MakeJob("rayjob-selector-only", ns.Name).
78+
Queue("queue-name").
79+
ClusterSelector(map[string]string{"ray.io/cluster": "existing-cluster"}).
80+
RayClusterSpec(nil).
81+
Obj()
82+
// clusterSelector + nil RayClusterSpec -> valid (not managed by Kueue)
83+
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
84+
})
85+
86+
ginkgo.It("should reject RayJob with queue label but no clusterSelector and no RayClusterSpec", func() {
87+
job := testingjob.MakeJob("rayjob-nil-clusterspec", ns.Name).
88+
Queue("queue-name").
89+
RayClusterSpec(nil).
90+
Obj()
91+
// no clusterSelector + nil RayClusterSpec -> validation error
92+
err := k8sClient.Create(ctx, job)
93+
gomega.Expect(err).Should(gomega.HaveOccurred())
94+
gomega.Expect(err).Should(testing.BeForbiddenError())
95+
})
96+
97+
ginkgo.It("should allow RayJob with clusterSelector and no queue label", func() {
98+
job := testingjob.MakeJob("rayjob-with-selector-no-queue", ns.Name).
99+
ClusterSelector(map[string]string{"ray.io/cluster": "existing-cluster"}).
100+
Obj()
101+
// No queue label -> not managed by Kueue, validation skipped
102+
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
103+
})
104+
105+
ginkgo.It("should allow valid RayJob with RayClusterSpec and no clusterSelector", func() {
106+
job := testingjob.MakeJob("rayjob-valid", ns.Name).
107+
Queue("queue-name").
108+
Obj()
109+
// no clusterSelector + RayClusterSpec present -> valid, full validation runs
110+
// MakeJob() creates a valid RayClusterSpec by default
111+
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
112+
})
64113
})
65114
})

0 commit comments

Comments
 (0)