Google Cloud AI Platform Operators¶
Google Cloud AI Platform (formerly known as ML Engine) can be used to train machine learning models at scale, host trained models in the cloud, and use models to make predictions for new data. AI Platform is a collection of tools for training, evaluating, and tuning machine learning models. AI Platform can also be used to deploy a trained model, make predictions, and manage various model versions.
Prerequisite tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'
Detailed information is available for Installation.
Launching a Job¶
To start a machine learning operation with AI Platform, you must launch a training job.
This creates a virtual machine that can run code specified in the trainer file, which
contains the main application code. A job can be initiated with the
MLEngineStartTrainingJobOperator
.
training = MLEngineStartTrainingJobOperator(
task_id="training",
project_id=PROJECT_ID,
region="us-central1",
job_id="training-job-{{ ts_nodash }}-{{ params.model_name }}",
runtime_version="1.15",
python_version="3.7",
job_dir=JOB_DIR,
package_uris=[TRAINER_URI],
training_python_module=TRAINER_PY_MODULE,
training_args=[],
labels={"job_type": "training"},
)
Creating a model¶
A model is a container that can hold multiple model versions. A new model can be created through the
MLEngineCreateModelOperator
.
The model
field should be defined with a dictionary containing the information about the model.
name
is a required field in this dictionary.
create_model = MLEngineCreateModelOperator(
task_id="create-model",
project_id=PROJECT_ID,
model={
"name": MODEL_NAME,
},
)
Getting a model¶
The MLEngineGetModelOperator
can be used to obtain a model previously created. To obtain the correct model, model_name
must be defined in the operator.
get_model = MLEngineGetModelOperator(
task_id="get-model",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
)
You can use Jinja templating with the project_id
and model
fields to dynamically determine their values. The result are saved to XCom,
allowing them to be used by other operators. In this case, the
BashOperator
is used to print the model information.
get_model_result = BashOperator(
bash_command=f"echo {get_model.output}",
task_id="get-model-result",
)
Creating model versions¶
A model version is a subset of the model container where the code runs. A new version of the model can be created
through the MLEngineCreateVersionOperator
.
The model must be specified by model_name
, and the version
parameter should contain a dictionary of
all the information about the version. Within the version
parameter’s dictionary, the name
field is
required.
create_version_v1 = MLEngineCreateVersionOperator(
task_id="create-version-v1",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version={
"name": "v1",
"description": "First-version",
"deployment_uri": JOB_DIR,
"runtime_version": "1.15",
"machineType": "mls1-c1-m2",
"framework": "TENSORFLOW",
"pythonVersion": "3.7",
},
)
The MLEngineCreateVersionOperator
can also be used to create more versions with varying parameters.
create_version_v2 = MLEngineCreateVersionOperator(
task_id="create-version-v2",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version={
"name": "v2",
"description": "Second version",
"deployment_uri": JOB_DIR,
"runtime_version": "1.15",
"machineType": "mls1-c1-m2",
"framework": "TENSORFLOW",
"pythonVersion": "3.7",
},
)
Managing model versions¶
By default, the model code will run using the default model version. You can set the model version through the
MLEngineSetDefaultVersionOperator
by specifying the model_name
and version_name
parameters.
set_defaults_version = MLEngineSetDefaultVersionOperator(
task_id="set-default-version",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version_name="v2",
)
To list the model versions available, use the
MLEngineListVersionsOperator
while specifying the model_name
parameter.
list_version = MLEngineListVersionsOperator(
task_id="list-version",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
)
You can use Jinja templating with the project_id
and model
fields to dynamically determine their values. The result are saved to XCom,
allowing them to be used by other operators. In this case, the
BashOperator
is used to print the version information.
list_version_result = BashOperator(
bash_command=f"echo {list_version.output}",
task_id="list-version-result",
)
Making predictions¶
A Google Cloud AI Platform prediction job can be started with the
MLEngineStartBatchPredictionJobOperator
.
For specifying the model origin, you need to provide either the model_name
, uri
, or model_name
and
version_name
. If you do not provide the version_name
, the operator will use the default model version.
prediction = MLEngineStartBatchPredictionJobOperator(
task_id="prediction",
project_id=PROJECT_ID,
job_id="prediction-{{ ts_nodash }}-{{ params.model_name }}",
region="us-central1",
model_name=MODEL_NAME,
data_format="TEXT",
input_paths=[PREDICTION_INPUT],
output_path=PREDICTION_OUTPUT,
labels={"job_type": "prediction"},
)
Cleaning up¶
A model version can be deleted with the
MLEngineDeleteVersionOperator
by
the version_name
and model_name
parameters.
delete_version_v1 = MLEngineDeleteVersionOperator(
task_id="delete-version-v1",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version_name="v1",
trigger_rule=TriggerRule.ALL_DONE,
)
You can also delete a model with the
MLEngineDeleteModelOperator
by providing the model_name
parameter.
delete_model = MLEngineDeleteModelOperator(
task_id="delete-model",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
delete_contents=True,
trigger_rule=TriggerRule.ALL_DONE,
)
Evaluating a model¶
To evaluate a prediction and model, specify a metric function to generate a summary and customize the evaluation of the model. This function receives a dictionary derived from a json in the batch prediction result, then returns a tuple of metrics.
def get_metric_fn_and_keys():
"""
Gets metric function and keys used to generate summary
"""
def normalize_value(inst: dict):
val = float(inst["output_layer"][0])
return tuple([val]) # returns a tuple.
return normalize_value, ["val"] # key order must match.
To evaluate a prediction and model, it’s useful to have a function to validate the summary result. This function receives a dictionary of the averaged metrics the function above generated. It then raises an exception if a task fails or should not proceed.
def validate_err_and_count(summary: dict) -> dict:
"""
Validate summary result
"""
summary = summary.get("val", 0)
initial_values = generate_model_predict_input_data()
initial_summary = sum(initial_values) / len(initial_values)
multiplier = ceil(summary / initial_summary)
if multiplier != 2:
raise ValueError(f"Multiplier is not equal 2; multiplier: {multiplier}")
return summary
Prediction results and a model summary can be generated through a function such as
create_evaluate_ops
.
It makes predictions using the specified inputs and then summarizes and validates the result. The
functions created above should be passed in through the metric_fn_and_keys
and validate_fn
fields.
evaluate_prediction, evaluate_summary, evaluate_validation = mlengine_operator_utils.create_evaluate_ops(
task_prefix="evaluate-ops",
data_format="TEXT",
input_paths=[PREDICTION_INPUT],
prediction_path=PREDICTION_OUTPUT,
metric_fn_and_keys=get_metric_fn_and_keys(),
validate_fn=validate_err_and_count,
batch_prediction_job_id="evaluate-ops-{{ ts_nodash }}-{{ params.model_name }}",
project_id=PROJECT_ID,
region="us-central1",
dataflow_options={
"project": PROJECT_ID,
"tempLocation": SUMMARY_TMP,
"stagingLocation": SUMMARY_STAGING,
},
model_name=MODEL_NAME,
version_name="v1",
py_interpreter="python3",
)