Google Cloud AI Platform Operators¶
Google Cloud AI Platform (formerly known as ML Engine) can be used to train machine learning models at scale, host trained models in the cloud, and use models to make predictions for new data. AI Platform is a collection of tools for training, evaluating, and tuning machine learning models. AI Platform can also be used to deploy a trained model, make predictions, and manage various model versions.
The legacy versions of AI Platform Training, AI Platform Prediction, AI Platform Pipelines, and AI Platform Data Labeling Service are deprecated and will no longer be available on Google Cloud after their shutdown date. All the functionality of legacy AI Platform and new features are available on the Vertex AI platform.
Prerequisite tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Launching a Job¶
To start a machine learning operation with AI Platform, you must launch a training job.
This creates a virtual machine that can run code specified in the trainer file, which
contains the main application code. A job can be initiated with the
MLEngineStartTrainingJobOperator
.
This operator is deprecated. Please, use
CreateCustomPythonPackageTrainingJobOperator
instead.
create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
task_id="create_custom_python_package_training_job",
staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
display_name=PACKAGE_DISPLAY_NAME,
python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
python_module_name=PYTHON_MODULE_NAME,
container_uri=TRAIN_IMAGE,
model_serving_container_image_uri=DEPLOY_IMAGE,
bigquery_destination=f"bq://{PROJECT_ID}",
# run params
dataset_id=tabular_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
)
Creating a model¶
A model is a container that can hold multiple model versions. A new model can be created through the
MLEngineCreateModelOperator
.
The model
field should be defined with a dictionary containing the information about the model.
name
is a required field in this dictionary.
This operator is deprecated. The model is created as a result of running Vertex AI operators that create training jobs
of any types. For example, you can use
CreateCustomPythonPackageTrainingJobOperator
.
The result of running this operator will be ready-to-use model saved in Model Registry.
create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
task_id="create_custom_python_package_training_job",
staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
display_name=PACKAGE_DISPLAY_NAME,
python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
python_module_name=PYTHON_MODULE_NAME,
container_uri=TRAIN_IMAGE,
model_serving_container_image_uri=DEPLOY_IMAGE,
bigquery_destination=f"bq://{PROJECT_ID}",
# run params
dataset_id=tabular_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
)
Getting a model¶
The MLEngineGetModelOperator
can be used to obtain a model previously created. To obtain the correct model, model_name
must be defined in the operator.
This operator is deprecated. Please, use
GetModelOperator
instead.
get_model = GetModelOperator(
task_id="get_model", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)
You can use Jinja templating with the project_id
and model
fields to dynamically determine their values. The result are saved to XCom,
allowing them to be used by other operators. In this case, the
BashOperator
is used to print the model information.
get_model_result = BashOperator(
bash_command=f"echo {get_model.output}",
task_id="get_model_result",
)
Creating model versions¶
A model version is a subset of the model container where the code runs. A new version of the model can be created
through the MLEngineCreateVersionOperator
.
The model must be specified by model_name
, and the version
parameter should contain a dictionary of
all the information about the version. Within the version
parameter’s dictionary, the name
field is
required.
This operator is deprecated. Please, use
CreateCustomPythonPackageTrainingJobOperator
instead. In this case, the new version of specific model could be created by specifying existing model id in
parent_model
parameter when running Training Job. This will ensure that new version of model will be trained except
of creating new model.
create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
task_id="create_custom_python_package_training_job",
staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
display_name=PACKAGE_DISPLAY_NAME,
python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
python_module_name=PYTHON_MODULE_NAME,
container_uri=TRAIN_IMAGE,
model_serving_container_image_uri=DEPLOY_IMAGE,
bigquery_destination=f"bq://{PROJECT_ID}",
# run params
dataset_id=tabular_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
)
The CreateCustomPythonPackageTrainingJobOperator
can also be used to create more versions with varying parameters.
create_custom_python_package_training_job_v2 = CreateCustomPythonPackageTrainingJobOperator(
task_id="create_custom_python_package_training_job_v2",
staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
display_name=PACKAGE_DISPLAY_NAME,
python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
python_module_name=PYTHON_MODULE_NAME,
container_uri=TRAIN_IMAGE,
model_serving_container_image_uri=DEPLOY_IMAGE,
bigquery_destination=f"bq://{PROJECT_ID}",
parent_model=model_id_v1,
# run params
dataset_id=tabular_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
)
Managing model versions¶
By default, the model code will run using the default model version. You can set the model version through the
MLEngineSetDefaultVersionOperator
by specifying the model_name
and version_name
parameters.
This operator is deprecated. Please, use
SetDefaultVersionOnModelOperator
instead. The desired model version to be set as default could be passed with the model ID in model_id
parameter
in format projects/{project}/locations/{location}/models/{model_id}@{version_id}
or
projects/{project}/locations/{location}/models/{model_id}@{version_alias}
. By default, the first model version
created will be marked as default.
set_default_version = SetDefaultVersionOnModelOperator(
task_id="set_default_version",
project_id=PROJECT_ID,
region=REGION,
model_id=model_id_v2,
)
To list the model versions available, use the
MLEngineListVersionsOperator
while specifying the model_name
parameter.
This operator is deprecated. Please, use
ListModelVersionsOperator
instead. You can pass the name of the desired model in model_id
parameter. If the model ID is passed
with version aliases, the operator will output all the versions available for this model.
list_model_versions = ListModelVersionsOperator(
task_id="list_model_versions", region=REGION, project_id=PROJECT_ID, model_id=model_id_v2
)
Making predictions¶
A Google Cloud AI Platform prediction job can be started with the
MLEngineStartBatchPredictionJobOperator
.
For specifying the model origin, you need to provide either the model_name
, uri
, or model_name
and
version_name
. If you do not provide the version_name
, the operator will use the default model version.
This operator is deprecated. Please, use
CreateBatchPredictionJobOperator
instead.
create_batch_prediction_job = CreateBatchPredictionJobOperator(
task_id="create_batch_prediction_job",
job_display_name=JOB_DISPLAY_NAME,
model_name=model_id_v2,
predictions_format="bigquery",
bigquery_source=BQ_SOURCE,
bigquery_destination_prefix=f"bq://{PROJECT_ID}",
region=REGION,
project_id=PROJECT_ID,
machine_type=MACHINE_TYPE,
)
Cleaning up¶
A model version can be deleted with the
MLEngineDeleteVersionOperator
by
the version_name
and model_name
parameters.
This operator is deprecated. Please, use
DeleteModelVersionOperator
instead. The default version could not be deleted on the model.
delete_model_version_1 = DeleteModelVersionOperator(
task_id="delete_model_version_1",
project_id=PROJECT_ID,
region=REGION,
model_id=model_id_v2,
trigger_rule=TriggerRule.ALL_DONE,
)
You can also delete a model with the
MLEngineDeleteModelOperator
by providing the model_name
parameter.
This operator is deprecated. Please, use
DeleteModelOperator
instead.
delete_model = DeleteModelOperator(
task_id="delete_model",
project_id=PROJECT_ID,
region=REGION,
model_id=model_id_v1,
trigger_rule=TriggerRule.ALL_DONE,
)
Evaluating a model¶
This function is deprecated. All the functionality of legacy MLEngine and new features are available on the Vertex AI platform. To create and view Model Evaluation, please check the documentation: Evaluate models using Vertex AI