Google Cloud AI Platform Operators

Google Cloud AI Platform (formerly known as ML Engine) can be used to train machine learning models at scale, host trained models in the cloud, and use models to make predictions for new data. AI Platform is a collection of tools for training, evaluating, and tuning machine learning models. AI Platform can also be used to deploy a trained model, make predictions, and manage various model versions.

Prerequisite tasks

To use these operators, you must do a few things:

Launching a Job

To start a machine learning operation with AI Platform, you must launch a training job. This creates a virtual machine that can run code specified in the trainer file, which contains the main application code. A job can be initiated with the MLEngineStartTrainingJobOperator.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

training = MLEngineStartTrainingJobOperator(
    task_id="training",
    project_id=PROJECT_ID,
    region="us-central1",
    job_id="training-job-{{ ts_nodash }}-{{ params.model_name }}",
    runtime_version="1.15",
    python_version="3.7",
    job_dir=JOB_DIR,
    package_uris=[TRAINER_URI],
    training_python_module=TRAINER_PY_MODULE,
    training_args=[],
    labels={"job_type": "training"},
    hyperparameters=hyperparams,
)

Creating a model

A model is a container that can hold multiple model versions. A new model can be created through the MLEngineCreateModelOperator. The model field should be defined with a dictionary containing the information about the model. name is a required field in this dictionary.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

create_model = MLEngineCreateModelOperator(
    task_id="create-model",
    project_id=PROJECT_ID,
    model={
        "name": MODEL_NAME,
    },
)

Getting a model

The MLEngineGetModelOperator can be used to obtain a model previously created. To obtain the correct model, model_name must be defined in the operator.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

get_model = MLEngineGetModelOperator(
    task_id="get-model",
    project_id=PROJECT_ID,
    model_name=MODEL_NAME,
)

You can use Jinja templating with the project_id and model fields to dynamically determine their values. The result are saved to XCom, allowing them to be used by other operators. In this case, the BashOperator is used to print the model information.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

get_model_result = BashOperator(
    bash_command=f"echo {get_model.output}",
    task_id="get-model-result",
)

Creating model versions

A model version is a subset of the model container where the code runs. A new version of the model can be created through the MLEngineCreateVersionOperator. The model must be specified by model_name, and the version parameter should contain a dictionary of all the information about the version. Within the version parameter’s dictionary, the name field is required.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

create_version = MLEngineCreateVersionOperator(
    task_id="create-version",
    project_id=PROJECT_ID,
    model_name=MODEL_NAME,
    version={
        "name": "v1",
        "description": "First-version",
        "deployment_uri": f'{JOB_DIR}/keras_export/',
        "runtime_version": "1.15",
        "machineType": "mls1-c1-m2",
        "framework": "TENSORFLOW",
        "pythonVersion": "3.7",
    },
)

The MLEngineCreateVersionOperator can also be used to create more versions with varying parameters.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

create_version_2 = MLEngineCreateVersionOperator(
    task_id="create-version-2",
    project_id=PROJECT_ID,
    model_name=MODEL_NAME,
    version={
        "name": "v2",
        "description": "Second version",
        "deployment_uri": SAVED_MODEL_PATH,
        "runtime_version": "1.15",
        "machineType": "mls1-c1-m2",
        "framework": "TENSORFLOW",
        "pythonVersion": "3.7",
    },
)

Managing model versions

By default, the model code will run using the default model version. You can set the model version through the MLEngineSetDefaultVersionOperator by specifying the model_name and version_name parameters.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

set_defaults_version = MLEngineSetDefaultVersionOperator(
    task_id="set-default-version",
    project_id=PROJECT_ID,
    model_name=MODEL_NAME,
    version_name="v2",
)

To list the model versions available, use the MLEngineListVersionsOperator while specifying the model_name parameter.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

list_version = MLEngineListVersionsOperator(
    task_id="list-version",
    project_id=PROJECT_ID,
    model_name=MODEL_NAME,
)

You can use Jinja templating with the project_id and model fields to dynamically determine their values. The result are saved to XCom, allowing them to be used by other operators. In this case, the BashOperator is used to print the version information.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

list_version_result = BashOperator(
    bash_command=f"echo {list_version.output}",
    task_id="list-version-result",
)

Making predictions

A Google Cloud AI Platform prediction job can be started with the MLEngineStartBatchPredictionJobOperator. For specifying the model origin, you need to provide either the model_name, uri, or model_name and version_name. If you do not provide the version_name, the operator will use the default model version.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

prediction = MLEngineStartBatchPredictionJobOperator(
    task_id="prediction",
    project_id=PROJECT_ID,
    job_id="prediction-{{ ts_nodash }}-{{ params.model_name }}",
    region="us-central1",
    model_name=MODEL_NAME,
    data_format="TEXT",
    input_paths=[PREDICTION_INPUT],
    output_path=PREDICTION_OUTPUT,
    labels={"job_type": "prediction"},
)

Cleaning up

A model version can be deleted with the MLEngineDeleteVersionOperator by the version_name and model_name parameters.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

delete_version = MLEngineDeleteVersionOperator(
    task_id="delete-version", project_id=PROJECT_ID, model_name=MODEL_NAME, version_name="v1"
)

You can also delete a model with the MLEngineDeleteModelOperator by providing the model_name parameter.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

delete_model = MLEngineDeleteModelOperator(
    task_id="delete-model", project_id=PROJECT_ID, model_name=MODEL_NAME, delete_contents=True
)

Evaluating a model

To evaluate a prediction and model, specify a metric function to generate a summary and customize the evaluation of the model. This function receives a dictionary derived from a json in the batch prediction result, then returns a tuple of metrics.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

def get_metric_fn_and_keys():
    """
    Gets metric function and keys used to generate summary
    """

    def normalize_value(inst: dict):
        val = float(inst['dense_4'][0])
        return tuple([val])  # returns a tuple.

    return normalize_value, ['val']  # key order must match.

To evaluate a prediction and model, it’s useful to have a function to validate the summary result. This function receives a dictionary of the averaged metrics the function above generated. It then raises an exception if a task fails or should not proceed.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

def validate_err_and_count(summary: dict) -> dict:
    """
    Validate summary result
    """
    if summary['val'] > 1:
        raise ValueError(f'Too high val>1; summary={summary}')
    if summary['val'] < 0:
        raise ValueError(f'Too low val<0; summary={summary}')
    if summary['count'] != 20:
        raise ValueError(f'Invalid value val != 20; summary={summary}')
    return summary

Prediction results and a model summary can be generated through a function such as create_evaluate_ops. It makes predictions using the specified inputs and then summarizes and validates the result. The functions created above should be passed in through the metric_fn_and_keys and validate_fn fields.

airflow/providers/google/cloud/example_dags/example_mlengine.py[source]

evaluate_prediction, evaluate_summary, evaluate_validation = mlengine_operator_utils.create_evaluate_ops(
    task_prefix="evaluate-ops",
    data_format="TEXT",
    input_paths=[PREDICTION_INPUT],
    prediction_path=PREDICTION_OUTPUT,
    metric_fn_and_keys=get_metric_fn_and_keys(),
    validate_fn=validate_err_and_count,
    batch_prediction_job_id="evaluate-ops-{{ ts_nodash }}-{{ params.model_name }}",
    project_id=PROJECT_ID,
    region="us-central1",
    dataflow_options={
        'project': PROJECT_ID,
        'tempLocation': SUMMARY_TMP,
        'stagingLocation': SUMMARY_STAGING,
    },
    model_name=MODEL_NAME,
    version_name="v1",
    py_interpreter="python3",
)

Reference

For further information, look at:

Was this entry helpful?