Amazon SageMaker¶
Amazon SageMaker is a fully managed machine learning service. With Amazon SageMaker, data scientists and developers can quickly build and train machine learning models, and then deploy them into a production-ready hosted environment.
Airflow provides operators to create and interact with SageMaker Jobs and Pipelines.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Airflow®
Generic Parameters¶
- aws_conn_id
- Reference to Amazon Web Services Connection ID. If this parameter is set to - Nonethen the default boto3 behaviour is used without a connection lookup. Otherwise use the credentials stored in the Connection. Default:- aws_default
- region_name
- AWS Region Name. If this parameter is set to - Noneor omitted then region_name from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:- None
- verify
- Whether or not to verify SSL certificates. - False- Do not validate SSL certificates.
- path/to/cert/bundle.pem - A filename of the CA cert bundle to use. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. 
 - If this parameter is set to - Noneor is omitted then verify from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:- None
- botocore_config
- The provided dictionary is used to construct a botocore.config.Config. This configuration can be used to configure Avoid Throttling exceptions, timeouts, etc. Example, for more detail about parameters please have a look botocore.config.Config¶- { "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, } - If this parameter is set to - Noneor omitted then config_kwargs from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:- None- Note - Specifying an empty dictionary, - {}, will overwrite the connection configuration for botocore.config.Config
Operators¶
Create an Amazon SageMaker processing job¶
To create an Amazon Sagemaker processing job to sanitize your dataset you can use
SageMakerProcessingOperator.
preprocess_raw_data = SageMakerProcessingOperator(
    task_id="preprocess_raw_data",
    config=test_setup["processing_config"],
)
# SageMakerProcessingOperator waits by default, setting as False to test the Sensor below.
preprocess_raw_data.wait_for_completion = False
Create an Amazon SageMaker training job¶
To create an Amazon Sagemaker training job you can use
SageMakerTrainingOperator.
train_model = SageMakerTrainingOperator(
    task_id="train_model",
    config=test_setup["training_config"],
)
Create an Amazon SageMaker model¶
To create an Amazon Sagemaker model you can use
SageMakerModelOperator.
create_model = SageMakerModelOperator(
    task_id="create_model",
    config=test_setup["model_config"],
)
Start a hyperparameter tuning job¶
To start a hyperparameter tuning job for an Amazon Sagemaker model you can use
SageMakerTuningOperator.
tune_model = SageMakerTuningOperator(
    task_id="tune_model",
    config=test_setup["tuning_config"],
)
Delete an Amazon SageMaker model¶
To delete an Amazon Sagemaker model you can use
SageMakerDeleteModelOperator.
delete_model = SageMakerDeleteModelOperator(
    task_id="delete_model",
    config={"ModelName": test_setup["model_name"]},
)
Create an Amazon SageMaker transform job¶
To create an Amazon Sagemaker transform job you can use
SageMakerTransformOperator.
test_model = SageMakerTransformOperator(
    task_id="test_model",
    config=test_setup["transform_config"],
)
Create an Amazon SageMaker endpoint config job¶
To create an Amazon Sagemaker endpoint config job you can use
SageMakerEndpointConfigOperator.
configure_endpoint = SageMakerEndpointConfigOperator(
    task_id="configure_endpoint",
    config=test_setup["endpoint_config_config"],
)
Create an Amazon SageMaker endpoint job¶
To create an Amazon Sagemaker endpoint you can use
SageMakerEndpointOperator.
deploy_endpoint = SageMakerEndpointOperator(
    task_id="deploy_endpoint",
    config=test_setup["deploy_endpoint_config"],
)
Start an Amazon SageMaker pipeline execution¶
To trigger an execution run for an already-defined Amazon Sagemaker pipeline, you can use
SageMakerStartPipelineOperator.
start_pipeline1 = SageMakerStartPipelineOperator(
    task_id="start_pipeline1",
    pipeline_name=pipeline_name,
)
Stop an Amazon SageMaker pipeline execution¶
To stop an Amazon Sagemaker pipeline execution that is currently running, you can use
SageMakerStopPipelineOperator.
stop_pipeline1 = SageMakerStopPipelineOperator(
    task_id="stop_pipeline1",
    pipeline_exec_arn=start_pipeline1.output,
)
Register a Sagemaker Model Version¶
To register a model version, you can use SageMakerRegisterModelVersionOperator.
The result of executing this operator is a model package.
A model package is a reusable model artifacts abstraction that packages all ingredients necessary for inference.
It consists of an inference specification that defines the inference image to use along with a model weights location.
A model package group is a collection of model packages.
You can use this operator to add a new version and model package to the group for every DAG run.
register_model = SageMakerRegisterModelVersionOperator(
    task_id="register_model",
    image_uri=test_setup["inference_code_image"],
    model_url=test_setup["model_trained_weights"],
    package_group_name=test_setup["model_package_group_name"],
)
Launch an AutoML experiment¶
To launch an AutoML experiment, a.k.a. SageMaker Autopilot, you can use SageMakerAutoMLOperator.
An AutoML experiment will take some input data in CSV and the column it should learn to predict,
and train models on it without needing human supervision.
The output is placed in an S3 bucket, and automatically deployed if configured for it.
automl = SageMakerAutoMLOperator(
    task_id="auto_ML",
    job_name=test_setup["auto_ml_job_name"],
    s3_input=test_setup["input_data_uri"],
    target_attribute="class",
    s3_output=test_setup["output_data_uri"],
    role_arn=test_context[ROLE_ARN_KEY],
    time_limit=30,  # will stop the job before it can do anything, but it's not the point here
)
Create an Experiment for later use¶
To create a SageMaker experiment, you can use SageMakerCreateExperimentOperator.
This creates an experiment so that it’s ready to be associated with processing, training and transform jobs.
create_experiment = SageMakerCreateExperimentOperator(
    task_id="create_experiment", name=test_setup["experiment_name"]
)
Create a SageMaker Notebook Instance¶
To create a SageMaker Notebook Instance , you can use SageMakerCreateNotebookOperator.
This creates a SageMaker Notebook Instance ready to run Jupyter notebooks.
instance = SageMakerCreateNotebookOperator(
    task_id="create_instance",
    instance_name=instance_name,
    instance_type="ml.t3.medium",
    role_arn=role_arn,
    wait_for_completion=True,
)
Stop a SageMaker Notebook Instance¶
To terminate SageMaker Notebook Instance , you can use SageMakerStopNotebookOperator.
This terminates the ML compute instance and disconnects the ML storage volume.
stop_instance = SageMakerStopNotebookOperator(
    task_id="stop_instance",
    instance_name=instance_name,
)
Start a SageMaker Notebook Instance¶
To launch a SageMaker Notebook Instance and re-attach an ML storage volume, you can use SageMakerStartNotebookOperator.
This launches a new ML compute instance with the latest version of the libraries and attached your ML storage volume.
start_instance = SageMakerStartNoteBookOperator(
    task_id="start_instance",
    instance_name=instance_name,
)
Delete a SageMaker Notebook Instance¶
To delete a SageMaker Notebook Instance, you can use SageMakerDeleteNotebookOperator.
This terminates the instance and deletes the ML storage volume and network interface associated with the instance. The instance must be stopped before it can be deleted.
delete_instance = SageMakerDeleteNotebookOperator(task_id="delete_instance", instance_name=instance_name)
Sensors¶
Wait on an Amazon SageMaker training job state¶
To check the state of an Amazon Sagemaker training job until it reaches a terminal state
you can use SageMakerTrainingSensor.
await_training = SageMakerTrainingSensor(
    task_id="await_training",
    job_name=test_setup["training_job_name"],
)
Wait on an Amazon SageMaker transform job state¶
To check the state of an Amazon Sagemaker transform job until it reaches a terminal state
you can use SageMakerTransformOperator.
await_transform = SageMakerTransformSensor(
    task_id="await_transform",
    job_name=test_setup["transform_job_name"],
)
Wait on an Amazon SageMaker tuning job state¶
To check the state of an Amazon Sagemaker hyperparameter tuning job until it reaches a terminal state
you can use SageMakerTuningSensor.
await_tuning = SageMakerTuningSensor(
    task_id="await_tuning",
    job_name=test_setup["tuning_job_name"],
)
Wait on an Amazon SageMaker endpoint state¶
To check the state of an Amazon Sagemaker endpoint until it reaches a terminal state
you can use SageMakerEndpointSensor.
await_endpoint = SageMakerEndpointSensor(
    task_id="await_endpoint",
    endpoint_name=test_setup["endpoint_name"],
)
Wait on an Amazon SageMaker pipeline execution state¶
To check the state of an Amazon Sagemaker pipeline execution until it reaches a terminal state
you can use SageMakerPipelineSensor.
await_pipeline2 = SageMakerPipelineSensor(
    task_id="await_pipeline2",
    pipeline_exec_arn=start_pipeline2.output,
)
Wait on an Amazon SageMaker AutoML experiment state¶
To check the state of an Amazon Sagemaker AutoML job until it reaches a terminal state
you can use SageMakerAutoMLSensor.
automl = SageMakerAutoMLOperator(
    task_id="auto_ML",
    job_name=test_setup["auto_ml_job_name"],
    s3_input=test_setup["input_data_uri"],
    target_attribute="class",
    s3_output=test_setup["output_data_uri"],
    role_arn=test_context[ROLE_ARN_KEY],
    time_limit=30,  # will stop the job before it can do anything, but it's not the point here
)
Wait on an Amazon SageMaker processing job state¶
To check the state of an Amazon Sagemaker processing job until it reaches a terminal state
you can use SageMakerProcessingSensor.
await_preprocess = SageMakerProcessingSensor(
    task_id="await_preprocess", job_name=test_setup["processing_job_name"]
)