Skip to content

Commit 0d6a4ae

Browse files
authored
Auto-enable pod integration for pod-dependent frameworks (#6736)
1 parent 2b35517 commit 0d6a4ae

File tree

8 files changed

+286
-34
lines changed

8 files changed

+286
-34
lines changed

pkg/controller/jobframework/integrationmanager.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,10 @@ type IntegrationCallbacks struct {
8181
// The job's MultiKueue adapter (optional)
8282
MultiKueueAdapter MultiKueueAdapter
8383
// The list of integration that need to be enabled along with the current one.
84+
// Deprecated: Use ImplicitlyEnabledFrameworkNames instead.
8485
DependencyList []string
86+
// The list of integrations implicitly enabled as dependencies of the integration.
87+
ImplicitlyEnabledFrameworkNames []string
8588
}
8689

8790
func (i *IntegrationCallbacks) getGVK() schema.GroupVersionKind {
@@ -100,11 +103,13 @@ func (i *IntegrationCallbacks) matchingOwnerReference(ownerRef *metav1.OwnerRefe
100103
}
101104

102105
type integrationManager struct {
103-
names []string
104-
integrations map[string]IntegrationCallbacks
105-
enabledIntegrations set.Set[string]
106-
externalIntegrations map[string]runtime.Object
107-
mu sync.RWMutex
106+
names []string
107+
integrations map[string]IntegrationCallbacks
108+
enabledIntegrations set.Set[string]
109+
externalIntegrations map[string]runtime.Object
110+
implicitlyEnabledIntegrations sets.Set[string]
111+
gvkToName map[schema.GroupVersionKind]string
112+
mu sync.RWMutex
108113
}
109114

110115
var manager integrationManager
@@ -132,6 +137,11 @@ func (m *integrationManager) register(name string, cb IntegrationCallbacks) erro
132137
m.integrations[name] = cb
133138
m.names = append(m.names, name)
134139

140+
if m.gvkToName == nil {
141+
m.gvkToName = make(map[schema.GroupVersionKind]string)
142+
}
143+
m.gvkToName[cb.getGVK()] = name
144+
135145
return nil
136146
}
137147

@@ -323,6 +333,15 @@ func GetIntegrationByGVK(gvk schema.GroupVersionKind) (IntegrationCallbacks, boo
323333
return IntegrationCallbacks{}, false
324334
}
325335

336+
// HasImplicitlyEnabledFramework returns true if the given GVK maps to an implicitly enabled framework.
337+
func HasImplicitlyEnabledFramework(gvk schema.GroupVersionKind) bool {
338+
name, found := manager.gvkToName[gvk]
339+
if !found {
340+
return false
341+
}
342+
return manager.implicitlyEnabledIntegrations.Has(name)
343+
}
344+
326345
func ownerReferenceMatchingGVK(ownerRef *metav1.OwnerReference, gvk schema.GroupVersionKind) bool {
327346
apiVersion, kind := gvk.ToAPIVersionAndKind()
328347
return ownerRef.APIVersion == apiVersion && ownerRef.Kind == kind
@@ -370,3 +389,24 @@ func GetMultiKueueAdapters(enabledIntegrations sets.Set[string]) (map[string]Mul
370389
}
371390
return ret, nil
372391
}
392+
393+
func (m *integrationManager) collectImplicitlyEnabledIntegrations(enabledFrameworks sets.Set[string]) sets.Set[string] {
394+
result := sets.New[string]()
395+
for frameworkName := range enabledFrameworks {
396+
callbacks, found := m.get(frameworkName)
397+
if !found {
398+
continue
399+
}
400+
401+
for _, implicitFramework := range callbacks.ImplicitlyEnabledFrameworkNames {
402+
if !enabledFrameworks.Has(implicitFramework) {
403+
result.Insert(implicitFramework)
404+
}
405+
}
406+
}
407+
return result
408+
}
409+
410+
func (m *integrationManager) setImplicitlyEnabledIntegrations(implicitlyIntegrations sets.Set[string]) {
411+
m.implicitlyEnabledIntegrations = implicitlyIntegrations
412+
}

pkg/controller/jobframework/integrationmanager_test.go

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -458,14 +458,14 @@ func TestEnabledIntegrationsDependencies(t *testing.T) {
458458
enabled: []string{"i1"},
459459
wantError: errIntegrationNotFound,
460460
},
461-
"dependecncy not enabled": {
461+
"dependency not enabled": {
462462
integrationsDependencies: map[string][]string{
463463
"i1": {"i2"},
464464
},
465465
enabled: []string{"i1"},
466466
wantError: errDependencyIntegrationNotEnabled,
467467
},
468-
"dependecncy not found": {
468+
"dependency not found": {
469469
integrationsDependencies: map[string][]string{
470470
"i1": {"i2"},
471471
},
@@ -486,8 +486,8 @@ func TestEnabledIntegrationsDependencies(t *testing.T) {
486486
manager := integrationManager{
487487
integrations: map[string]IntegrationCallbacks{},
488488
}
489-
for inegration, deps := range tc.integrationsDependencies {
490-
manager.integrations[inegration] = IntegrationCallbacks{
489+
for integration, deps := range tc.integrationsDependencies {
490+
manager.integrations[integration] = IntegrationCallbacks{
491491
DependencyList: deps,
492492
}
493493
}
@@ -498,3 +498,59 @@ func TestEnabledIntegrationsDependencies(t *testing.T) {
498498
})
499499
}
500500
}
501+
502+
func TestImplicitlyEnabledIntegrations(t *testing.T) {
503+
cases := map[string]struct {
504+
integrationsImplicit map[string][]string
505+
enabled []string
506+
wantImplicit []string
507+
}{
508+
"empty": {},
509+
"no implicit dependencies": {
510+
integrationsImplicit: map[string][]string{
511+
"i1": nil,
512+
},
513+
enabled: []string{"i1"},
514+
wantImplicit: []string{},
515+
},
516+
"single implicit dependency": {
517+
integrationsImplicit: map[string][]string{
518+
"i1": {"i2"},
519+
},
520+
enabled: []string{"i1"},
521+
wantImplicit: []string{"i2"},
522+
},
523+
"multiple implicit dependencies": {
524+
integrationsImplicit: map[string][]string{
525+
"i1": {"i2", "i3"},
526+
},
527+
enabled: []string{"i1"},
528+
wantImplicit: []string{"i2", "i3"},
529+
},
530+
"nested implicit dependencies": {
531+
integrationsImplicit: map[string][]string{
532+
"i1": {"i2", "i3"},
533+
"i2": {"i3"},
534+
"i3": nil,
535+
},
536+
enabled: []string{"i1", "i2"},
537+
wantImplicit: []string{"i3"},
538+
},
539+
}
540+
for tcName, tc := range cases {
541+
t.Run(tcName, func(t *testing.T) {
542+
mgr := integrationManager{
543+
integrations: map[string]IntegrationCallbacks{},
544+
}
545+
for integration, implicitDeps := range tc.integrationsImplicit {
546+
mgr.integrations[integration] = IntegrationCallbacks{
547+
ImplicitlyEnabledFrameworkNames: implicitDeps,
548+
}
549+
}
550+
gotImplicit := mgr.collectImplicitlyEnabledIntegrations(sets.New(tc.enabled...))
551+
if diff := cmp.Diff(sets.New(tc.wantImplicit...), gotImplicit); diff != "" {
552+
t.Errorf("Unexpected implicitly enabled integrations (-want +got):\n%s", diff)
553+
}
554+
})
555+
}
556+
}

pkg/controller/jobframework/setup.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ func SetupControllers(ctx context.Context, mgr ctrl.Manager, log logr.Logger, op
5656
func (m *integrationManager) setupControllers(ctx context.Context, mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
5757
options := ProcessOptions(opts...)
5858

59-
if err := m.checkEnabledListDependencies(options.EnabledFrameworks); err != nil {
59+
implicitlyEnabledIntegrations := m.collectImplicitlyEnabledIntegrations(options.EnabledFrameworks)
60+
m.setImplicitlyEnabledIntegrations(implicitlyEnabledIntegrations)
61+
allEnabledIntegrations := options.EnabledFrameworks.Union(implicitlyEnabledIntegrations)
62+
63+
if err := m.checkEnabledListDependencies(allEnabledIntegrations); err != nil {
6064
return fmt.Errorf("check enabled frameworks list: %w", err)
6165
}
6266

@@ -69,7 +73,7 @@ func (m *integrationManager) setupControllers(ctx context.Context, mgr ctrl.Mana
6973
logger := log.WithValues("jobFrameworkName", name)
7074
fwkNamePrefix := fmt.Sprintf("jobFrameworkName %q", name)
7175

72-
if options.EnabledFrameworks.Has(name) {
76+
if allEnabledIntegrations.Has(name) {
7377
if cb.CanSupportIntegration != nil {
7478
if canSupport, err := cb.CanSupportIntegration(opts...); !canSupport || err != nil {
7579
return fmt.Errorf("failed to configure reconcilers: %w", err)
@@ -178,8 +182,10 @@ func restMappingExists(mgr ctrl.Manager, gvk schema.GroupVersionKind) error {
178182
// Note that the second argument, "indexer" needs to be the fieldIndexer obtained from the Manager.
179183
func SetupIndexes(ctx context.Context, indexer client.FieldIndexer, opts ...Option) error {
180184
options := ProcessOptions(opts...)
185+
186+
allEnabledIntegrations := options.EnabledFrameworks.Union(manager.collectImplicitlyEnabledIntegrations(options.EnabledFrameworks))
181187
return ForEachIntegration(func(name string, cb IntegrationCallbacks) error {
182-
if options.EnabledFrameworks.Has(name) {
188+
if allEnabledIntegrations.Has(name) {
183189
if err := cb.SetupIndexes(ctx, indexer); err != nil {
184190
return fmt.Errorf("jobFrameworkName %q: %w", name, err)
185191
}

pkg/controller/jobs/deployment/deployment_controller.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,13 @@ const (
3838

3939
func init() {
4040
utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{
41-
SetupIndexes: SetupIndexes,
42-
NewReconciler: jobframework.NewNoopReconcilerFactory(gvk),
43-
GVK: gvk,
44-
SetupWebhook: SetupWebhook,
45-
JobType: &appsv1.Deployment{},
46-
AddToScheme: appsv1.AddToScheme,
47-
DependencyList: []string{"pod"},
41+
SetupIndexes: SetupIndexes,
42+
NewReconciler: jobframework.NewNoopReconcilerFactory(gvk),
43+
GVK: gvk,
44+
SetupWebhook: SetupWebhook,
45+
JobType: &appsv1.Deployment{},
46+
AddToScheme: appsv1.AddToScheme,
47+
ImplicitlyEnabledFrameworkNames: []string{"pod"},
4848
}))
4949
}
5050

pkg/controller/jobs/leaderworkerset/leaderworkerset_controller.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ const (
3838

3939
func init() {
4040
utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{
41-
SetupIndexes: SetupIndexes,
42-
NewReconciler: NewReconciler,
43-
NewAdditionalReconcilers: []jobframework.ReconcilerFactory{NewPodReconciler},
44-
SetupWebhook: SetupWebhook,
45-
JobType: &leaderworkersetv1.LeaderWorkerSet{},
46-
AddToScheme: leaderworkersetv1.AddToScheme,
47-
DependencyList: []string{"pod"},
48-
GVK: gvk,
41+
SetupIndexes: SetupIndexes,
42+
NewReconciler: NewReconciler,
43+
NewAdditionalReconcilers: []jobframework.ReconcilerFactory{NewPodReconciler},
44+
SetupWebhook: SetupWebhook,
45+
JobType: &leaderworkersetv1.LeaderWorkerSet{},
46+
AddToScheme: leaderworkersetv1.AddToScheme,
47+
ImplicitlyEnabledFrameworkNames: []string{"pod"},
48+
GVK: gvk,
4949
}))
5050
}
5151

pkg/controller/jobs/pod/pod_controller.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,11 @@ func (p *Pod) Skip() bool {
565565
if v, ok := p.pod.GetLabels()[constants.ManagedByKueueLabelKey]; p.isFound && (!ok || v != constants.ManagedByKueueLabelValue) {
566566
return true
567567
}
568+
if jobframework.HasImplicitlyEnabledFramework(p.pod.GroupVersionKind()) &&
569+
p.pod.GetAnnotations()[podconstants.SuspendedByParentAnnotation] == "" {
570+
ctrl.Log.V(3).Info("Pod Integration was implicitly enabled but object lacks parent annotation, skipping")
571+
return true
572+
}
568573
return false
569574
}
570575

pkg/controller/jobs/statefulset/statefulset_controller.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,13 @@ const (
3838

3939
func init() {
4040
utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{
41-
SetupIndexes: SetupIndexes,
42-
NewReconciler: NewReconciler,
43-
SetupWebhook: SetupWebhook,
44-
JobType: &appsv1.StatefulSet{},
45-
AddToScheme: appsv1.AddToScheme,
46-
DependencyList: []string{"pod"},
47-
GVK: gvk,
41+
SetupIndexes: SetupIndexes,
42+
NewReconciler: NewReconciler,
43+
SetupWebhook: SetupWebhook,
44+
JobType: &appsv1.StatefulSet{},
45+
AddToScheme: appsv1.AddToScheme,
46+
ImplicitlyEnabledFrameworkNames: []string{"pod"},
47+
GVK: gvk,
4848
}))
4949
}
5050

0 commit comments

Comments
 (0)