Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 118 additions & 63 deletions pkg/app/AppService.go
Original file line number Diff line number Diff line change
Expand Up @@ -3246,6 +3246,72 @@ func (impl *AppServiceImpl) UpdateCdWorkflowRunnerByACDObject(app *v1alpha1.Appl
return nil
}

const kedaAutoscaling = "kedaAutoscaling"
const HorizontalPodAutoscaler = "HorizontalPodAutoscaler"
const fullnameOverride = "fullnameOverride"
const nameOverride = "nameOverride"
const enabled = "enabled"
const replicaCount = "replicaCount"

func (impl *AppServiceImpl) getAutoScalingReplicaCount(templateMap map[string]interface{}, appName string) *util2.HpaResourceRequest {
hasOverride := false
if _, ok := templateMap[fullnameOverride]; ok {
appNameOverride := templateMap[fullnameOverride].(string)
if len(appNameOverride) > 0 {
appName = appNameOverride
hasOverride = true
}
}
if !hasOverride {
if _, ok := templateMap[nameOverride]; ok {
nameOverride := templateMap[nameOverride].(string)
if len(nameOverride) > 0 {
appName = fmt.Sprintf("%s-%s", appName, nameOverride)
}
}
}
hpaResourceRequest := &util2.HpaResourceRequest{}
hpaResourceRequest.Version = ""
hpaResourceRequest.Group = autoscaling.ServiceName
hpaResourceRequest.Kind = HorizontalPodAutoscaler
impl.logger.Infow("getAutoScalingReplicaCount", "hpaResourceRequest", hpaResourceRequest)
if _, ok := templateMap[kedaAutoscaling]; ok {
as := templateMap[kedaAutoscaling]
asd := as.(map[string]interface{})
if _, ok := asd[enabled]; ok {
impl.logger.Infow("getAutoScalingReplicaCount", "hpaResourceRequest", hpaResourceRequest)
enable := asd[enabled].(bool)
if enable {
hpaResourceRequest.IsEnable = enable
hpaResourceRequest.ReqReplicaCount = templateMap[replicaCount].(float64)
hpaResourceRequest.ReqMaxReplicas = asd["maxReplicaCount"].(float64)
hpaResourceRequest.ReqMinReplicas = asd["minReplicaCount"].(float64)
hpaResourceRequest.ResourceName = fmt.Sprintf("%s-%s-%s", "keda-hpa", appName, "keda")
impl.logger.Infow("getAutoScalingReplicaCount", "hpaResourceRequest", hpaResourceRequest)
return hpaResourceRequest
}
}
}

if _, ok := templateMap[autoscaling.ServiceName]; ok {
as := templateMap[autoscaling.ServiceName]
asd := as.(map[string]interface{})
if _, ok := asd[enabled]; ok {
enable := asd[enabled].(bool)
if enable {
hpaResourceRequest.IsEnable = asd[enabled].(bool)
hpaResourceRequest.ReqReplicaCount = templateMap[replicaCount].(float64)
hpaResourceRequest.ReqMaxReplicas = asd["MaxReplicas"].(float64)
hpaResourceRequest.ReqMinReplicas = asd["MinReplicas"].(float64)
hpaResourceRequest.ResourceName = fmt.Sprintf("%s-%s", appName, "hpa")
return hpaResourceRequest
}
}
}
return hpaResourceRequest

}

func (impl *AppServiceImpl) autoscalingCheckBeforeTrigger(ctx context.Context, appName string, namespace string, merged []byte, overrideRequest *bean.ValuesOverrideRequest) []byte {
//pipeline := overrideRequest.Pipeline
var appId = overrideRequest.AppId
Expand All @@ -3258,77 +3324,66 @@ func (impl *AppServiceImpl) autoscalingCheckBeforeTrigger(ctx context.Context, a
if err != nil {
return merged
}
if _, ok := templateMap[autoscaling.ServiceName]; ok {
as := templateMap[autoscaling.ServiceName]
asd := as.(map[string]interface{})
isEnable := false
if _, ok := asd["enabled"]; ok {
isEnable = asd["enabled"].(bool)
}
if isEnable {
reqReplicaCount := templateMap["replicaCount"].(float64)
reqMaxReplicas := asd["MaxReplicas"].(float64)
reqMinReplicas := asd["MinReplicas"].(float64)
version := ""
group := autoscaling.ServiceName
kind := "HorizontalPodAutoscaler"
resourceName := fmt.Sprintf("%s-%s", appName, "hpa")
resourceManifest := make(map[string]interface{})
if IsAcdApp(appDeploymentType) {
query := &application2.ApplicationResourceRequest{
Name: &appName,
Version: &version,
Group: &group,
Kind: &kind,
ResourceName: &resourceName,
Namespace: &namespace,
}
recv, err := impl.acdClient.GetResource(ctx, query)
impl.logger.Debugw("resource manifest get replica count", "response", recv)
if err != nil {
impl.logger.Errorw("ACD Get Resource API Failed", "err", err)
middleware.AcdGetResourceCounter.WithLabelValues(strconv.Itoa(appId), namespace, appName).Inc()
return merged
}
if recv != nil && len(*recv.Manifest) > 0 {
err := json.Unmarshal([]byte(*recv.Manifest), &resourceManifest)
if err != nil {
impl.logger.Errorw("unmarshal failed for hpa check", "err", err)
return merged
}
}
} else {
version = "v2beta2"
k8sResource, err := impl.k8sApplicationService.GetResource(ctx, &k8s.ResourceRequestBean{ClusterId: clusterId,
K8sRequest: &application3.K8sRequestBean{ResourceIdentifier: application3.ResourceIdentifier{Name: resourceName,
Namespace: namespace, GroupVersionKind: schema.GroupVersionKind{Group: group, Kind: kind, Version: version}}}})
if err != nil {
impl.logger.Errorw("error occurred while fetching resource for app", "resourceName", resourceName, "err", err)
return merged
}
resourceManifest = k8sResource.Manifest.Object
}
if len(resourceManifest) > 0 {
statusMap := resourceManifest["status"].(map[string]interface{})
currentReplicaVal := statusMap["currentReplicas"]
currentReplicaCount, err := util2.ParseFloatNumber(currentReplicaVal)
if err != nil {
impl.logger.Errorw("error occurred while parsing replica count", "currentReplicas", currentReplicaVal, "err", err)
return merged
}

reqReplicaCount = impl.fetchRequiredReplicaCount(currentReplicaCount, reqMaxReplicas, reqMinReplicas)
templateMap["replicaCount"] = reqReplicaCount
merged, err = json.Marshal(&templateMap)
hpaResourceRequest := impl.getAutoScalingReplicaCount(templateMap, appName)
impl.logger.Debugw("autoscalingCheckBeforeTrigger", "hpaResourceRequest", hpaResourceRequest)
if hpaResourceRequest.IsEnable {
resourceManifest := make(map[string]interface{})
if IsAcdApp(appDeploymentType) {
query := &application2.ApplicationResourceRequest{
Name: &appName,
Version: &hpaResourceRequest.Version,
Group: &hpaResourceRequest.Group,
Kind: &hpaResourceRequest.Kind,
ResourceName: &hpaResourceRequest.ResourceName,
Namespace: &namespace,
}
recv, err := impl.acdClient.GetResource(ctx, query)
impl.logger.Debugw("resource manifest get replica count", "response", recv)
if err != nil {
impl.logger.Errorw("ACD Get Resource API Failed", "err", err)
middleware.AcdGetResourceCounter.WithLabelValues(strconv.Itoa(appId), namespace, appName).Inc()
return merged
}
if recv != nil && len(*recv.Manifest) > 0 {
err := json.Unmarshal([]byte(*recv.Manifest), &resourceManifest)
if err != nil {
impl.logger.Errorw("marshaling failed for hpa check", "err", err)
impl.logger.Errorw("unmarshal failed for hpa check", "err", err)
return merged
}
}
} else {
impl.logger.Errorw("autoscaling is not enabled", "pipelineId", pipelineId)
version := "v2beta2"
k8sResource, err := impl.k8sApplicationService.GetResource(ctx, &k8s.ResourceRequestBean{ClusterId: clusterId,
K8sRequest: &application3.K8sRequestBean{ResourceIdentifier: application3.ResourceIdentifier{Name: hpaResourceRequest.ResourceName,
Namespace: namespace, GroupVersionKind: schema.GroupVersionKind{Group: hpaResourceRequest.Group, Kind: hpaResourceRequest.Kind, Version: version}}}})
if err != nil {
impl.logger.Errorw("error occurred while fetching resource for app", "resourceName", hpaResourceRequest.ResourceName, "err", err)
return merged
}
resourceManifest = k8sResource.Manifest.Object
}
if len(resourceManifest) > 0 {
statusMap := resourceManifest["status"].(map[string]interface{})
currentReplicaVal := statusMap["currentReplicas"]
currentReplicaCount, err := util2.ParseFloatNumber(currentReplicaVal)
if err != nil {
impl.logger.Errorw("error occurred while parsing replica count", "currentReplicas", currentReplicaVal, "err", err)
return merged
}

reqReplicaCount := impl.fetchRequiredReplicaCount(currentReplicaCount, hpaResourceRequest.ReqMaxReplicas, hpaResourceRequest.ReqMinReplicas)
templateMap["replicaCount"] = reqReplicaCount
merged, err = json.Marshal(&templateMap)
if err != nil {
impl.logger.Errorw("marshaling failed for hpa check", "err", err)
return merged
}
}
} else {
impl.logger.Errorw("autoscaling is not enabled", "pipelineId", pipelineId)
}

//check for custom chart support
if autoscalingEnabledPath, ok := templateMap[bean2.CustomAutoScalingEnabledPathKey]; ok {
if deploymentType == models.DEPLOYMENTTYPE_STOP {
Expand Down
11 changes: 11 additions & 0 deletions util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,14 @@ func InterfaceToFloat(resp interface{}) float64 {
}
return dat
}

type HpaResourceRequest struct {
ResourceName string
ReqReplicaCount float64
ReqMaxReplicas float64
ReqMinReplicas float64
IsEnable bool
Group string
Version string
Kind string
}