Source code for airflow.providers.amazon.aws.example_dags.example_sagemaker
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importbase64importosimportsubprocessfromdatetimeimportdatetimefromtempfileimportNamedTemporaryFileimportboto3fromairflowimportDAGfromairflow.decoratorsimporttaskfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.amazon.aws.operators.sagemakerimport(SageMakerDeleteModelOperator,SageMakerModelOperator,SageMakerProcessingOperator,SageMakerTrainingOperator,SageMakerTransformOperator,SageMakerTuningOperator,)fromairflow.providers.amazon.aws.sensors.sagemakerimport(SageMakerTrainingSensor,SageMakerTransformSensor,SageMakerTuningSensor,)# Project name will be used in naming the S3 buckets and various tasks.# The dataset used in this example is identifying varieties of the Iris flower.
# For this example we are using a subset of Fischer's Iris Data Set.# The full dataset can be found at UC Irvine's machine learning repository:# https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
# The URI of an Amazon-provided docker image for handling KNN model training. This is a public ECR# repo cited in public SageMaker documentation, so the account number does not need to be redacted.# For more info see: https://docs.aws.amazon.com/sagemaker/latest/dg/ecr-us-west-2.html#knn-us-west-2.title
[docs]TUNING_CONFIG={'HyperParameterTuningJobName':TUNING_JOB_NAME,'HyperParameterTuningJobConfig':{'Strategy':'Bayesian','HyperParameterTuningJobObjective':{'MetricName':'test:accuracy','Type':'Maximize',},'ResourceLimits':{# You would bump these up in production as appropriate.'MaxNumberOfTrainingJobs':1,'MaxParallelTrainingJobs':1,},'ParameterRanges':{'CategoricalParameterRanges':[],'IntegerParameterRanges':[# Set the min and max values of the hyperparameters you want to tune.{'Name':'k','MinValue':'1',"MaxValue":str(SAMPLE_SIZE),},{'Name':'sample_size','MinValue':'1','MaxValue':str(SAMPLE_SIZE),},],},},'TrainingJobDefinition':{'StaticHyperParameters':{'predictor_type':'classifier','feature_dim':'4',},'AlgorithmSpecification':{'TrainingImage':KNN_IMAGE_URI,'TrainingInputMode':'File'},'InputDataConfig':[{'ChannelName':'train',**TRAINING_DATA_SOURCE,# type: ignore [arg-type]},{'ChannelName':'test',**TRAINING_DATA_SOURCE,# type: ignore [arg-type]},],'OutputDataConfig':{'S3OutputPath':f's3://{S3_BUCKET}/{TRAINING_OUTPUT_S3_KEY}'},'ResourceConfig':RESOURCE_CONFIG,'StoppingCondition':TASK_TIMEOUT,'RoleArn':ROLE_ARN,
},}# This script will be the entrypoint for the docker image which will handle preprocessing the raw data# NOTE: The following string must remain dedented as it is being written to a file.
[docs]PREPROCESS_SCRIPT=("""import boto3import numpy as npimport pandas as pddef main(): # Load the Iris dataset from {input_path}/input.csv, split it into train/test # subsets, and write them to {output_path}/ for the Processing Operator. columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species'] iris = pd.read_csv('{input_path}/input.csv', names=columns) # Process data iris['species'] = iris['species'].replace({{'Iris-virginica': 0, 'Iris-versicolor': 1, 'Iris-setosa': 2}}) iris = iris[['species', 'sepal_length', 'sepal_width', 'petal_length', 'petal_width']] # Split into test and train data iris_train, iris_test = np.split( iris.sample(frac=1, random_state=np.random.RandomState()), [int(0.7 * len(iris))] ) # Remove the "answers" from the test set iris_test.drop(['species'], axis=1, inplace=True) # Write the splits to disk iris_train.to_csv('{output_path}/train.csv', index=False, header=False) iris_test.to_csv('{output_path}/test.csv', index=False, header=False) print('Preprocessing Done.')if __name__ == "__main__": main() """).format(input_path=PROCESSING_LOCAL_INPUT_PATH,output_path=PROCESSING_LOCAL_OUTPUT_PATH)
@task
[docs]defupload_dataset_to_s3():"""Uploads the provided dataset to a designated Amazon S3 bucket."""S3Hook().load_string(string_data=DATASET,bucket_name=S3_BUCKET,key=RAW_DATA_S3_KEY,replace=True,
)@task
[docs]defbuild_and_upload_docker_image():""" We need a Docker image with the following requirements: - Has numpy, pandas, requests, and boto3 installed - Has our data preprocessing script mounted and set as the entry point """# Fetch and parse ECR Token to be used for the docker pushecr_client=boto3.client('ecr',region_name=REGION)token=ecr_client.get_authorization_token()credentials=(base64.b64decode(token['authorizationData'][0]['authorizationToken'])).decode('utf-8')username,password=credentials.split(':')withNamedTemporaryFile(mode='w+t')aspreprocessing_script,NamedTemporaryFile(mode='w+t')asdockerfile:preprocessing_script.write(PREPROCESS_SCRIPT)preprocessing_script.flush()dockerfile.write(f""" FROM amazonlinux COPY {preprocessing_script.name.split('/')[2]} /preprocessing.py ADD credentials /credentials ENV AWS_SHARED_CREDENTIALS_FILE=/credentials RUN yum install python3 pip -y RUN pip3 install boto3 pandas requests CMD [ "python3", "/preprocessing.py"] """)dockerfile.flush()docker_build_and_push_commands=f""" cp /root/.aws/credentials /tmp/credentials && docker build -f {dockerfile.name} -t {ECR_REPOSITORY} /tmp && rm /tmp/credentials && aws ecr get-login-password --region {REGION} | docker login --username {username} --password {password}{ECR_REPOSITORY} && docker push {ECR_REPOSITORY} """docker_build=subprocess.Popen(docker_build_and_push_commands,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,)_,err=docker_build.communicate()ifdocker_build.returncode!=0:raiseRuntimeError(err)
)# [END howto_operator_sagemaker_processing]# [START howto_operator_sagemaker_training]train_model=SageMakerTrainingOperator(task_id='train_model',config=TRAINING_CONFIG,# Waits by default, setting as False to demonstrate the Sensor below.wait_for_completion=False,do_xcom_push=False,)# [END howto_operator_sagemaker_training]# [START howto_sensor_sagemaker_training]await_training=SageMakerTrainingSensor(task_id='await_training',job_name=TRAINING_JOB_NAME,)# [END howto_sensor_sagemaker_training]# [START howto_operator_sagemaker_model]create_model=SageMakerModelOperator(task_id='create_model',config=MODEL_CONFIG,do_xcom_push=False,)# [END howto_operator_sagemaker_model]# [START howto_operator_sagemaker_tuning]tune_model=SageMakerTuningOperator(task_id='tune_model',config=TUNING_CONFIG,# Waits by default, setting as False to demonstrate the Sensor below.wait_for_completion=False,do_xcom_push=False,)# [END howto_operator_sagemaker_tuning]# [START howto_sensor_sagemaker_tuning]await_tune=SageMakerTuningSensor(task_id='await_tuning',job_name=TUNING_JOB_NAME,)# [END howto_sensor_sagemaker_tuning]# [START howto_operator_sagemaker_transform]test_model=SageMakerTransformOperator(task_id='test_model',config=TRANSFORM_CONFIG,# Waits by default, setting as False to demonstrate the Sensor below.wait_for_completion=False,do_xcom_push=False,)# [END howto_operator_sagemaker_transform]# [START howto_sensor_sagemaker_transform]await_transform=SageMakerTransformSensor(task_id='await_transform',job_name=TRANSFORM_JOB_NAME,)# [END howto_sensor_sagemaker_transform]# Trigger rule set to "all_done" so clean up will run regardless of success on other tasks.# [START howto_operator_sagemaker_delete_model]delete_model=SageMakerDeleteModelOperator(task_id='delete_model',config={'ModelName':MODEL_NAME},trigger_rule='all_done',)# [END howto_operator_sagemaker_delete_model](upload_dataset_to_s3()>>build_and_upload_docker_image()>>preprocess_raw_data>>train_model>>await_training>>create_model>>tune_model>>await_tune>>test_model>>await_transform>>cleanup()>>delete_model)