Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions helm-chart/kuberay-operator/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ Create the name of the service account to use
{{- end -}}
{{- end -}}


{{/*
FeatureGates
*/}}
{{- define "kuberay.featureGates" -}}
{{- $features := "" }}
{{- range .Values.featureGates }}
{{- $str := printf "%s=%t," .name .enabled }}
{{- $features = print $features $str }}
{{- end }}
{{- with .Values.featureGates }}
--feature-gates={{ $features | trimSuffix "," }}
{{- end }}
{{- end }}


{{/*
Create a template to ensure consistency for Role and ClusterRole.
*/}}
Expand Down
1 change: 1 addition & 0 deletions helm-chart/kuberay-operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ spec:
- /manager
args:
{{- $argList := list -}}
{{- $argList = append $argList (include "kuberay.featureGates" . | trim) -}}
{{- if .Values.batchScheduler.enabled -}}
{{- $argList = append $argList "--enable-batch-scheduler" -}}
{{- end -}}
Expand Down
5 changes: 5 additions & 0 deletions helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ readinessProbe:
batchScheduler:
enabled: false

featureGates:
- name: RayClusterStatusConditions
enabled: false


# Set up `securityContext` to improve Pod security.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/pod-security.md for further guidance.
podSecurityContext: {}
Expand Down
9 changes: 7 additions & 2 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,13 @@ type RayClusterStatus struct {
type RayClusterConditionType string

const (
// HeadReady is added in a RayCluster when its Head Pod is ready for requests.
HeadReady RayClusterConditionType = "HeadReady"
// PodRunningAndReady says that the pod is running and ready.
PodRunningAndReady = "PodRunningAndReady"
)

const (
// HeadPodReady is added in a RayCluster when its Head Pod is ready for requests.
HeadPodReady RayClusterConditionType = "HeadPodReady"
// RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted.
RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure"
)
Expand Down
20 changes: 20 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,26 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
newInstance.Status.State = rayv1.Ready
}

// Check if the head node is running and ready by checking the head pod's status.
if features.Enabled(features.RayClusterStatusConditions) {
headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible for GetRayClusterHeadPod to return nil, nil when no head Pod exists. We need to handle the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rueian maybe we can return a non-nil error when there is no head Pod exists?

I found that we always return a non-nil error.

        headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance)
	if headPod == nil {
		// return an error
	}

if err != nil {
return nil, err
}
// GetRayClusterHeadPod can return nil, nil when pod is not found, we handle it separately.
if headPod == nil {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionFalse,
Reason: "HeadPodNotFound",
Message: "Head Pod not found",
})
} else {
replicaHeadPodReadyCondition := utils.FindPodReadyCondition(headPod, rayv1.HeadPodReady)
meta.SetStatusCondition(&newInstance.Status.Conditions, replicaHeadPodReadyCondition)
}
}

if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
newInstance.Status.State = rayv1.Suspended
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,12 @@ func TestCalculateStatus(t *testing.T) {
Status: corev1.PodStatus{
PodIP: headNodeIP,
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
}
runtimeObjects := []runtime.Object{headPod, headService}
Expand Down Expand Up @@ -1705,8 +1711,41 @@ func TestCalculateStatus(t *testing.T) {
assert.Nil(t, err)
assert.Empty(t, newInstance.Status.Conditions)

// Test reconcilePodsErr with the feature gate enabled
// enable feature gate for the following tests
defer features.SetFeatureGateDuringTest(t, features.RayClusterStatusConditions, true)()

// Test CheckRayHeadRunningAndReady with head pod running and ready
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionTrue))
condition := meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
assert.Equal(t, metav1.ConditionTrue, condition.Status)

// // Test CheckRayHeadRunningAndReady with head pod not ready
headPod.Status.Conditions = []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionFalse,
},
}
runtimeObjects = []runtime.Object{headPod, headService}
fakeClient = clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
r.Client = fakeClient
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionFalse))
condition = meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
assert.Equal(t, metav1.ConditionFalse, condition.Status)

// Test CheckRayHeadRunningAndReady with head pod not running
headPod.Status.Phase = corev1.PodFailed
runtimeObjects = []runtime.Object{headPod, headService}
fakeClient = clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
r.Client = fakeClient
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionFalse))
condition = meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
assert.Equal(t, metav1.ConditionFalse, condition.Status)

// Test reconcilePodsErr with the feature gate enabled
newInstance, err = r.calculateStatus(ctx, testRayCluster, utils.ErrFailedCreateHeadPod)
assert.Nil(t, err)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure), metav1.ConditionTrue))
Expand Down
28 changes: 20 additions & 8 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,26 @@ import (
networkingv1 "k8s.io/api/networking/v1"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/pkg/features"

cmap "github.com/orcaman/concurrent-map/v2"

"github.com/go-logr/logr"
fmtErrors "github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -1055,13 +1058,22 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
// after the head pod is running and ready. Hence, some requests to the Dashboard (e.g. `UpdateDeployments`) may fail.
// This is not an issue since `UpdateDeployments` is an idempotent operation.
logger.Info("Check the head Pod status of the pending RayCluster", "RayCluster name", rayClusterInstance.Name)
if isRunningAndReady, err := r.isHeadPodRunningAndReady(ctx, rayClusterInstance); err != nil || !isRunningAndReady {
if err != nil {
logger.Error(err, "Failed to check if head Pod is running and ready!")
} else {
logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.")

// check the latest condition of the head Pod to see if it is ready.
if features.Enabled(features.RayClusterStatusConditions) {
if !meta.IsStatusConditionTrue(rayClusterInstance.Status.Conditions, string(rayv1.HeadPodReady)) {
logger.Info("The head Pod is not ready, requeue the resource event to avoid redundant custom resource status updates.")
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, nil
}
} else {
if isRunningAndReady, err := r.isHeadPodRunningAndReady(ctx, rayClusterInstance); err != nil || !isRunningAndReady {
if err != nil {
logger.Error(err, "Failed to check if head Pod is running and ready!")
} else {
logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.")
}
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

// TODO(architkulkarni): Check the RayVersion. If < 2.8.0, error.
Expand Down
30 changes: 30 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
v1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)

const (
Expand Down Expand Up @@ -71,6 +72,35 @@ func IsCreated(pod *corev1.Pod) bool {
return pod.Status.Phase != ""
}

func FindPodReadyCondition(pod *corev1.Pod, condType rayv1.RayClusterConditionType) metav1.Condition {
replicaPodReadyCondition := metav1.Condition{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for pod.Status.Conditions not to have corev1.PodReady? If so, we will create a condition with an empty Reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right, i update the logic here to have an initial Condition

replicaPodReadyCondition := metav1.Condition{
Type: string(condType),
Status: metav1.ConditionFalse,
Reason: rayv1.UnknownReason,
}

plus this logic

// Update the reason if it's not empty
if reason != "" {
replicaPodReadyCondition.Reason = reason
}

to prevent empty reason sneak into later SetStatusCondition

Type: string(condType),
Status: metav1.ConditionFalse,
}

for _, cond := range pod.Status.Conditions {
if cond.Type == corev1.PodReady {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if cond.Type == corev1.PodReady {
    var reason := cond.Reason
    if reason is empty {
        // metav1.Condition.Reason requires a non-empty value
        reason = cond.Reason
    }
    replicaPodReadyCondition = metav1.Condition{
        Type:    string(condType),
        Status:  cond.Status,
        Reason:  reason,
        Message: cond.Message,
    }
    break
}

if cond.Status == corev1.ConditionTrue {
replicaPodReadyCondition = metav1.Condition{
Type: string(condType),
Status: metav1.ConditionTrue,
Reason: v1.PodRunningAndReady, // metav1.Condition.Reason requires a non-empty value
Message: cond.Message,
}
} else {
replicaPodReadyCondition = metav1.Condition{
Type: string(condType),
Status: metav1.ConditionFalse,
Reason: cond.Reason, // PodReady condition comes with a reason when it's not ready, e.g. ContainersNotReady
Message: cond.Message,
}
break
}
}
}
return replicaPodReadyCondition
}

// IsRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
func IsRunningAndReady(pod *corev1.Pod) bool {
if pod.Status.Phase != corev1.PodRunning {
Expand Down
62 changes: 62 additions & 0 deletions ray-operator/controllers/ray/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
corev1 "k8s.io/api/core/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
v1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)

func TestGetClusterDomainName(t *testing.T) {
Expand Down Expand Up @@ -265,6 +266,31 @@ func createSomePodWithCondition(typ corev1.PodConditionType, status corev1.Condi
}
}

func createRayHeadPodWithPhaseAndCondition(phase corev1.PodPhase, typ corev1.PodConditionType, status corev1.ConditionStatus) (pod *corev1.Pod) {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Name: "raycluster-sample-head",
Namespace: "default",
Labels: map[string]string{
"ray.io/node-type": string(rayv1.HeadNode),
},
},
Status: corev1.PodStatus{
Phase: phase,
Conditions: []corev1.PodCondition{
{
Type: typ,
Status: status,
},
},
},
}
}

func TestGetHeadGroupServiceAccountName(t *testing.T) {
tests := map[string]struct {
input *rayv1.RayCluster
Expand Down Expand Up @@ -588,6 +614,42 @@ env_vars:
}
}

func TestFindHeadPodReadyCondition(t *testing.T) {
tests := map[string]struct {
pod *corev1.Pod
expected metav1.Condition
}{
"condition true if Ray head pod is running and ready": {
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodRunning, corev1.PodReady, corev1.ConditionTrue),
expected: metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionTrue,
},
},
"condition false if Ray head pod is not running": {
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodPending, corev1.PodReady, corev1.ConditionFalse),
expected: metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionFalse,
},
},
"condition false if Ray head pod is not ready": {
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodRunning, corev1.PodReady, corev1.ConditionFalse),
expected: metav1.Condition{
Type: string(rayv1.HeadPodReady),
Status: metav1.ConditionFalse,
},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
replicaHeadPodReadyCondition := FindPodReadyCondition(tc.pod, v1.HeadPodReady)
assert.Equal(t, tc.expected.Status, replicaHeadPodReadyCondition.Status)
})
}
}

func TestErrRayClusterReplicaFailureReason(t *testing.T) {
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteAllPods), "FailedDeleteAllPods")
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteHeadPod), "FailedDeleteHeadPod")
Expand Down