Amazon SageMaker¶
Amazon SageMaker is a fully managed machine learning service. With Amazon SageMaker, data scientists and developers can quickly build and train machine learning models, and then deploy them into a production-ready hosted environment.
Airflow provides operators to create and interact with SageMaker Jobs and Pipelines.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'
Detailed information is available Installation
Operators¶
Create an Amazon SageMaker processing job¶
To create an Amazon Sagemaker processing job to sanitize your dataset you can use
SageMakerProcessingOperator
.
preprocess_raw_data = SageMakerProcessingOperator(
task_id="preprocess_raw_data",
config=test_setup["processing_config"],
)
Create an Amazon SageMaker training job¶
To create an Amazon Sagemaker training job you can use
SageMakerTrainingOperator
.
train_model = SageMakerTrainingOperator(
task_id="train_model",
config=test_setup["training_config"],
)
Create an Amazon SageMaker model¶
To create an Amazon Sagemaker model you can use
SageMakerModelOperator
.
create_model = SageMakerModelOperator(
task_id="create_model",
config=test_setup["model_config"],
)
Start a hyperparameter tuning job¶
To start a hyperparameter tuning job for an Amazon Sagemaker model you can use
SageMakerTuningOperator
.
tune_model = SageMakerTuningOperator(
task_id="tune_model",
config=test_setup["tuning_config"],
)
Delete an Amazon SageMaker model¶
To delete an Amazon Sagemaker model you can use
SageMakerDeleteModelOperator
.
delete_model = SageMakerDeleteModelOperator(
task_id="delete_model",
config={"ModelName": test_setup["model_name"]},
)
Create an Amazon SageMaker transform job¶
To create an Amazon Sagemaker transform job you can use
SageMakerTransformOperator
.
test_model = SageMakerTransformOperator(
task_id="test_model",
config=test_setup["transform_config"],
)
Create an Amazon SageMaker endpoint config job¶
To create an Amazon Sagemaker endpoint config job you can use
SageMakerEndpointConfigOperator
.
configure_endpoint = SageMakerEndpointConfigOperator(
task_id="configure_endpoint",
config=test_setup["endpoint_config_config"],
)
Create an Amazon SageMaker endpoint job¶
To create an Amazon Sagemaker endpoint you can use
SageMakerEndpointOperator
.
deploy_endpoint = SageMakerEndpointOperator(
task_id="deploy_endpoint",
config=test_setup["deploy_endpoint_config"],
)
Start an Amazon SageMaker pipeline execution¶
To trigger an execution run for an already-defined Amazon Sagemaker pipeline, you can use
SageMakerStartPipelineOperator
.
start_pipeline1 = SageMakerStartPipelineOperator(
task_id="start_pipeline1",
pipeline_name=test_setup["pipeline_name"],
)
Stop an Amazon SageMaker pipeline execution¶
To stop an Amazon Sagemaker pipeline execution that is currently running, you can use
SageMakerStopPipelineOperator
.
stop_pipeline1 = SageMakerStopPipelineOperator(
task_id="stop_pipeline1",
pipeline_exec_arn=start_pipeline1.output,
)
Register a Sagemaker Model Version¶
To register a model version, you can use SageMakerRegisterModelVersionOperator
.
The result of executing this operator is a model package.
A model package is a reusable model artifacts abstraction that packages all ingredients necessary for inference.
It consists of an inference specification that defines the inference image to use along with a model weights location.
A model package group is a collection of model packages.
You can use this operator to add a new version and model package to the group for every DAG run.
register_model = SageMakerRegisterModelVersionOperator(
task_id="register_model",
image_uri=test_setup["inference_code_image"],
model_url=test_setup["model_trained_weights"],
package_group_name=test_setup["model_package_group_name"],
)
Sensors¶
Wait on an Amazon SageMaker training job state¶
To check the state of an Amazon Sagemaker training job until it reaches a terminal state
you can use SageMakerTrainingSensor
.
await_training = SageMakerTrainingSensor(
task_id="await_training",
job_name=test_setup["training_job_name"],
)
Wait on an Amazon SageMaker transform job state¶
To check the state of an Amazon Sagemaker transform job until it reaches a terminal state
you can use SageMakerTransformOperator
.
await_transform = SageMakerTransformSensor(
task_id="await_transform",
job_name=test_setup["transform_job_name"],
)
Wait on an Amazon SageMaker tuning job state¶
To check the state of an Amazon Sagemaker hyperparameter tuning job until it reaches a terminal state
you can use SageMakerTuningSensor
.
await_tuning = SageMakerTuningSensor(
task_id="await_tuning",
job_name=test_setup["tuning_job_name"],
)
Wait on an Amazon SageMaker endpoint state¶
To check the state of an Amazon Sagemaker endpoint until it reaches a terminal state
you can use SageMakerEndpointSensor
.
await_endpoint = SageMakerEndpointSensor(
task_id="await_endpoint",
endpoint_name=test_setup["endpoint_name"],
)
Wait on an Amazon SageMaker pipeline execution state¶
To check the state of an Amazon Sagemaker pipeline execution until it reaches a terminal state
you can use SageMakerPipelineSensor
.
await_pipeline2 = SageMakerPipelineSensor(
task_id="await_pipeline2",
pipeline_exec_arn=start_pipeline2.output,
)