Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 161 additions & 170 deletions backend/backend/handlers/pipelines/createPipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from distutils.command.config import config
import os
from venv import create
import boto3
Expand All @@ -11,191 +10,183 @@
from decimal import Decimal
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from backend.common.validators import validate
dynamodb = boto3.resource('dynamodb')
cloudformation= boto3.client('cloudformation')
lambda_client = boto3.client('lambda')
import traceback

response = {
'statusCode': 200,
'body': '',
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Credentials': True,
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'OPTIONS,POST,GET'
}
}

unitTest = {
"body": {
"databaseId": "Unit_Test",
"pipelineId":"stl_to_glb_converter",
"description": "converts stl to glb",
"assetType":".stl",
"lambdaName":"stl_to_glb_converter"
}
}
unitTest['body']=json.dumps(unitTest['body'])
class CreatePipeline():

db_table_name = None
def __init__(self, dynamodb, cloudformation, lambda_client, env):
self.dynamodb = dynamodb
self.cloudformation = cloudformation
self.lambda_client = lambda_client

try:
db_table_name = os.environ["PIPELINE_STORAGE_TABLE_NAME"]
enable_pipeline_function_name = os.environ["ENABLE_PIPELINE_FUNCTION_NAME"]
enable_pipeline_function_arn = os.environ["ENABLE_PIPELINE_FUNCTION_ARN"]
s3_bucket=os.environ['S3_BUCKET']
sagemaker_bucket_name=os.environ['SAGEMAKER_BUCKET_NAME']
sagemaker_bucket_arn = os.environ['SAGEMAKER_BUCKET_ARN']
asset_bucket_arn = os.environ['ASSET_BUCKET_ARN']
lambda_role_to_attach = os.environ['ROLE_TO_ATTACH_TO_LAMBDA_PIPELINE']
lambda_pipeline_sample_function_bucket = os.environ['LAMBDA_PIPELINE_SAMPLE_FUNCTION_BUCKET']
lambda_pipeline_sample_function_key = os.environ['LAMBDA_PIPELINE_SAMPLE_FUNCTION_KEY']

except:
print("Failed Loading Environment Variables")
response['body'] = json.dumps({
"message": "Failed Loading Environment Variables"
})
response['statusCode']=500
self.db_table_name = env["PIPELINE_STORAGE_TABLE_NAME"]
self.enable_pipeline_function_name = env["ENABLE_PIPELINE_FUNCTION_NAME"]
self.enable_pipeline_function_arn = env["ENABLE_PIPELINE_FUNCTION_ARN"]
self.s3_bucket = env['S3_BUCKET']
self.sagemaker_bucket_name = env['SAGEMAKER_BUCKET_NAME']
self.sagemaker_bucket_arn = env['SAGEMAKER_BUCKET_ARN']
self.asset_bucket_arn = env['ASSET_BUCKET_ARN']
self.lambda_role_to_attach = env['ROLE_TO_ATTACH_TO_LAMBDA_PIPELINE']
self.lambda_pipeline_sample_function_bucket = env['LAMBDA_PIPELINE_SAMPLE_FUNCTION_BUCKET']
self.lambda_pipeline_sample_function_key = env['LAMBDA_PIPELINE_SAMPLE_FUNCTION_KEY']

def upload_Pipeline(body):
print("Setting Table")
table = dynamodb.Table(db_table_name)
print("Setting Time Stamp")
dtNow = datetime.datetime.utcnow().strftime('%B %d %Y - %H:%M:%S')

userResource = {
'isProvided': False,
'resourceId': ''
}
if body['containerUri'] != None:
userResource['isProvided'] = True
userResource['resourceId'] = body['containerUri']
elif body['lambdaName'] != None:
userResource['isProvided'] = True
userResource['resourceId'] = body['lambdaName']

item = {
'databaseId': body['databaseId'],
'pipelineId':body['pipelineId'],
'assetType':body['assetType'],
'outputType':body['outputType'],
'description': body['description'],
'dateCreated': json.dumps(dtNow),
'pipelineType':body['pipelineType'],
'waitForCallback': body['waitForCallback'],
'userProvidedResource': json.dumps(userResource),
'enabled':False #Not doing anything with this yet
}
self.table = dynamodb.Table(self.db_table_name)

if 'taskTimeout' in body:
item['taskTimeout'] = body['taskTimeout']

if 'taskHeartbeatTimeout' in body:
item['taskHeartbeatTimeout'] = body['taskHeartbeatTimeout']
@staticmethod
def from_env():
dynamodb = boto3.resource('dynamodb')
cloudformation= boto3.client('cloudformation')
lambda_client = boto3.client('lambda')
return CreatePipeline(
dynamodb,
cloudformation,
lambda_client,
os.environ)

table.put_item(
Item=item,
ConditionExpression='attribute_not_exists(databaseId) and attribute_not_exists(pipelineId)'
)
#If a lambda function name or ECR container URI was provided by the user, creation is not necessary
if userResource['isProvided'] == True:
return json.dumps({"message": 'Succeeded'})

def _now(self):
return datetime.datetime.utcnow().strftime('%B %d %Y - %H:%M:%S')

print("Running CFT")
if body['pipelineType']=='SageMaker':
createSagemakerPipeline(body)
elif body['pipelineType']=='Lambda':
createLambdaPipeline(body)
else:
raise ValueError("Unknown pipelineType")
def upload_Pipeline(self, body):
print("Setting Time Stamp")
dtNow = self._now()

userResource = {
'isProvided': False,
'resourceId': ''
}
if 'containerUri' in body:
userResource['isProvided'] = True
userResource['resourceId'] = body['containerUri']
elif 'lambdaName' in body:
userResource['isProvided'] = True
userResource['resourceId'] = body['lambdaName']

item = {
'databaseId': body['databaseId'],
'pipelineId':body['pipelineId'],
'assetType':body['assetType'],
'outputType':body['outputType'],
'description': body['description'],
'dateCreated': json.dumps(dtNow),
'pipelineType':body['pipelineType'],
'waitForCallback': body['waitForCallback'],
'userProvidedResource': json.dumps(userResource),
'enabled':False #Not doing anything with this yet
}

return json.dumps({"message": 'Succeeded'})
if 'taskTimeout' in body:
item['taskTimeout'] = body['taskTimeout']

def createLambdaPipeline(body):
print('Creating a lambda function')
lambda_client.create_function(
FunctionName=body['pipelineId'],
Role=lambda_role_to_attach,
PackageType='Zip',
Code={
'S3Bucket': lambda_pipeline_sample_function_bucket,
'S3Key': lambda_pipeline_sample_function_key
},
Handler='lambda_function.lambda_handler',
Runtime='python3.8'
)
if 'taskHeartbeatTimeout' in body:
item['taskHeartbeatTimeout'] = body['taskHeartbeatTimeout']

self.table.put_item(
Item=item,
ConditionExpression='attribute_not_exists(databaseId) and attribute_not_exists(pipelineId)'
)
#If a lambda function name or ECR container URI was provided by the user, creation is not necessary
if userResource['isProvided'] == True:
return json.dumps({"message": 'Succeeded'})

def readSagemakerTemplate():
s3 = boto3.resource('s3')
obj = s3.Object(s3_bucket, "cloudformation/sagemaker_notebook.yaml")
return obj.get()['Body'].read().decode('utf-8')
print("Running CFT")
if body['pipelineType']=='SageMaker':
self.createSagemakerPipeline(body)
elif body['pipelineType']=='Lambda':
self.createLambdaPipeline(body)
else:
raise ValueError("Unknown pipelineType")

def createSagemakerPipeline(body):
print('Running SageMaker CFT')
# configPath = os.environ['LAMBDA_TASK_ROOT'] + "/nested_cft/sagemaker_notebook.yaml"
# print("Looking for CFT at " + configPath)
configContent = readSagemakerTemplate()
print(configContent)
# TODO: if this stack creation fails, we need to rollback to the database saved
cft_response=cloudformation.create_stack(
StackName=body['pipelineId'],
TemplateBody=configContent,
Parameters=[
{
'ParameterKey': 'EnablePipelineLambdaFunction',
'ParameterValue': enable_pipeline_function_name,
},
{
'ParameterKey': 'EnablePipelineLambdaFunctionArn',
'ParameterValue': enable_pipeline_function_arn,
},
{
'ParameterKey': 'DatabaseId',
'ParameterValue': body['databaseId'],
},
{
'ParameterKey': 'S3Bucket',
'ParameterValue': s3_bucket,
},
{
'ParameterKey': 'SagemakerBucketName',
'ParameterValue': sagemaker_bucket_name,
},
{
'ParameterKey': 'SagemakerBucketArn',
'ParameterValue': sagemaker_bucket_arn,
},
{
'ParameterKey': 'AssetBucketArn',
'ParameterValue': asset_bucket_arn,
},
{
'ParameterKey':'PipelineName',
'ParameterValue':body['pipelineId']
},
{
'ParameterKey':'SageMakeNotebookInstanceType',
'ParameterValue':'ml.t2.medium'
}
],
Tags=[
{
'Key': 'StackController',
'Value': 'VAMS'
}
],
Capabilities=[
'CAPABILITY_IAM',
],
return json.dumps({"message": 'Succeeded'})

def createLambdaPipeline(self, body):
print('Creating a lambda function')
self.lambda_client.create_function(
FunctionName=body['pipelineId'],
Role=self.lambda_role_to_attach,
PackageType='Zip',
Code={
'S3Bucket': self.lambda_pipeline_sample_function_bucket,
'S3Key': self.lambda_pipeline_sample_function_key
},
Handler='lambda_function.lambda_handler',
Runtime='python3.8'
)

def readSagemakerTemplate(self):
s3 = boto3.resource('s3')
obj = s3.Object(self.s3_bucket, "cloudformation/sagemaker_notebook.yaml")
return obj.get()['Body'].read().decode('utf-8')

def createSagemakerPipeline(self, body):
print('Running SageMaker CFT')
# configPath = os.environ['LAMBDA_TASK_ROOT'] + "/nested_cft/sagemaker_notebook.yaml"
# print("Looking for CFT at " + configPath)
configContent = self.readSagemakerTemplate()
print(configContent)
# TODO: if this stack creation fails, we need to rollback to the database saved
cft_response=self.cloudformation.create_stack(
StackName=body['pipelineId'],
TemplateBody=configContent,
Parameters=[
{
'ParameterKey': 'EnablePipelineLambdaFunction',
'ParameterValue': self.enable_pipeline_function_name,
},
{
'ParameterKey': 'EnablePipelineLambdaFunctionArn',
'ParameterValue': self.enable_pipeline_function_arn,
},
{
'ParameterKey': 'DatabaseId',
'ParameterValue': body['databaseId'],
},
{
'ParameterKey': 'S3Bucket',
'ParameterValue': self.s3_bucket,
},
{
'ParameterKey': 'SagemakerBucketName',
'ParameterValue': self.sagemaker_bucket_name,
},
{
'ParameterKey': 'SagemakerBucketArn',
'ParameterValue': self.sagemaker_bucket_arn,
},
{
'ParameterKey': 'AssetBucketArn',
'ParameterValue': self.asset_bucket_arn,
},
{
'ParameterKey':'PipelineName',
'ParameterValue':body['pipelineId']
},
{
'ParameterKey':'SageMakeNotebookInstanceType',
'ParameterValue':'ml.t2.medium'
}
],
Tags=[
{
'Key': 'StackController',
'Value': 'VAMS'
}
],
Capabilities=[
'CAPABILITY_IAM',
],
)

def enablePipeline():
print("Starting Pipeline Enablement")
def enablePipeline(self):
print("Starting Pipeline Enablement")

def lambda_handler(event, context):
def lambda_handler(event, context, create_pipeline_fn=CreatePipeline.from_env):
print(event)
create_pipeline = create_pipeline_fn()
response = {
'statusCode': 200,
'body': '',
Expand Down Expand Up @@ -278,13 +269,13 @@ def lambda_handler(event, context):

print("Trying to get Data")
if 'starting' in event['body'] and event['body']['starting']=='enabling':
enablePipeline()
create_pipeline.enablePipeline()
else:
response['body'] = upload_Pipeline(event['body'])
response['body'] = create_pipeline.upload_Pipeline(event['body'])
return response
except Exception as e:
response['statusCode'] = 500
print("Error!", e.__class__, "occurred.")
print("Error!", e.__class__, "occurred.", traceback.format_exc())
if e.response['Error']['Code']=='ConditionalCheckFailedException':
response['statusCode']=500
response['body'] = json.dumps({"message":"Pipeline "+str(event['body']['pipelineId']+" already exists.")})
Expand Down
Loading