Google Cloud VertexAI Operators

The Google Cloud VertexAI brings AutoML and AI Platform together into a unified API, client library, and user interface. AutoML lets you train models on image, tabular, text, and video datasets without writing code, while training in AI Platform lets you run custom training code. With Vertex AI, both AutoML training and custom training are available options. Whichever option you choose for training, you can save models, deploy models, and request predictions with Vertex AI.

Creating Datasets

To create a Google VertexAI dataset you can use CreateDatasetOperator. The operator returns dataset id in XCom under dataset_id key.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_image_dataset_job = CreateDatasetOperator(
    task_id="image_dataset",
    dataset=IMAGE_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_tabular_dataset_job = CreateDatasetOperator(
    task_id="tabular_dataset",
    dataset=TABULAR_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_text_dataset_job = CreateDatasetOperator(
    task_id="text_dataset",
    dataset=TEXT_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_video_dataset_job = CreateDatasetOperator(
    task_id="video_dataset",
    dataset=VIDEO_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_time_series_dataset_job = CreateDatasetOperator(
    task_id="time_series_dataset",
    dataset=TIME_SERIES_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)

After creating a dataset you can use it to import some data using ImportDataOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

import_data_job = ImportDataOperator(
    task_id="import_data",
    dataset_id=create_image_dataset_job.output['dataset_id'],
    region=REGION,
    project_id=PROJECT_ID,
    import_configs=TEST_IMPORT_CONFIG,
)

To export dataset you can use ExportDataOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

export_data_job = ExportDataOperator(
    task_id="export_data",
    dataset_id=create_image_dataset_job.output['dataset_id'],
    region=REGION,
    project_id=PROJECT_ID,
    export_config=TEST_EXPORT_CONFIG,
)

To delete dataset you can use DeleteDatasetOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

delete_dataset_job = DeleteDatasetOperator(
    task_id="delete_dataset",
    dataset_id=create_text_dataset_job.output['dataset_id'],
    region=REGION,
    project_id=PROJECT_ID,
)

To get dataset you can use GetDatasetOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

get_dataset = GetDatasetOperator(
    task_id="get_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_tabular_dataset_job.output['dataset_id'],
)

To get a dataset list you can use ListDatasetsOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

list_dataset_job = ListDatasetsOperator(
    task_id="list_dataset",
    region=REGION,
    project_id=PROJECT_ID,
)

To update dataset you can use UpdateDatasetOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

update_dataset_job = UpdateDatasetOperator(
    task_id="update_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_video_dataset_job.output['dataset_id'],
    dataset=DATASET_TO_UPDATE,
    update_mask=TEST_UPDATE_MASK,
)

Creating a Training Jobs

To create a Google Vertex AI training jobs you have three operators CreateCustomContainerTrainingJobOperator, CreateCustomPythonPackageTrainingJobOperator, CreateCustomTrainingJobOperator. Each of them will wait for the operation to complete. The results of each operator will be a model which was trained by user using these operators.

Preparation step

For each operator you must prepare and create dataset. Then put dataset id to dataset_id parameter in operator.

How to run Container Training Job CreateCustomContainerTrainingJobOperator

Before start running this Job you should create a docker image with training script inside. Documentation how to create image you can find by this link: https://cloud.google.com/vertex-ai/docs/training/create-custom-container After that you should put link to the image in container_uri parameter. Also you can type executing command for container which will be created from this image in command parameter.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
    task_id="custom_container_task",
    staging_bucket=STAGING_BUCKET,
    display_name=f"train-housing-container-{DISPLAY_NAME}",
    container_uri=CUSTOM_CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=DATASET_ID,
    command=["python3", "task.py"],
    model_display_name=f"container-housing-model-{DISPLAY_NAME}",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run Python Package Training Job CreateCustomPythonPackageTrainingJobOperator

Before start running this Job you should create a python package with training script inside. Documentation how to create you can find by this link: https://cloud.google.com/vertex-ai/docs/training/create-python-pre-built-container Next you should put link to the package in python_package_gcs_uri parameter, also python_module_name parameter should has the name of script which will run your training task.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
    task_id="python_package_task",
    staging_bucket=STAGING_BUCKET,
    display_name=f"train-housing-py-package-{DISPLAY_NAME}",
    python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
    python_module_name=PYTHON_MODULE_NAME,
    container_uri=CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=DATASET_ID,
    model_display_name=f"py-package-housing-model-{DISPLAY_NAME}",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run Training Job CreateCustomTrainingJobOperator.

For this Job you should put path to your local training script inside script_path parameter.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_custom_training_job = CreateCustomTrainingJobOperator(
    task_id="custom_task",
    staging_bucket=STAGING_BUCKET,
    display_name=f"train-housing-custom-{DISPLAY_NAME}",
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=DATASET_ID,
    replica_count=1,
    model_display_name=f"custom-housing-model-{DISPLAY_NAME}",
    sync=False,
    region=REGION,
    project_id=PROJECT_ID,
)

You can get a list of Training Jobs using ListCustomTrainingJobOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

list_custom_training_job = ListCustomTrainingJobOperator(
    task_id="list_custom_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

If you wish to delete a Custom Training Job you can use DeleteCustomTrainingJobOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

delete_custom_training_job = DeleteCustomTrainingJobOperator(
    task_id="delete_custom_training_job",
    training_pipeline_id=TRAINING_PIPELINE_ID,
    custom_job_id=CUSTOM_JOB_ID,
    region=REGION,
    project_id=PROJECT_ID,
)

Creating an AutoML Training Jobs

To create a Google Vertex AI Auto ML training jobs you have five operators CreateAutoMLForecastingTrainingJobOperator CreateAutoMLImageTrainingJobOperator CreateAutoMLTabularTrainingJobOperator CreateAutoMLTextTrainingJobOperator CreateAutoMLVideoTrainingJobOperator Each of them will wait for the operation to complete. The results of each operator will be a model which was trained by user using these operators.

How to run AutoML Forecasting Training Job CreateAutoMLForecastingTrainingJobOperator

Before start running this Job you must prepare and create TimeSeries dataset. After that you should put dataset id to dataset_id parameter in operator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
    task_id="auto_ml_forecasting_task",
    display_name=f"auto-ml-forecasting-{DISPLAY_NAME}",
    optimization_objective="minimize-rmse",
    column_specs=COLUMN_SPECS,
    # run params
    dataset_id=DATASET_ID,
    target_column=TEST_TARGET_COLUMN,
    time_column=TEST_TIME_COLUMN,
    time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
    available_at_forecast_columns=[TEST_TIME_COLUMN],
    unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
    time_series_attribute_columns=["city", "zip_code", "county"],
    forecast_horizon=30,
    context_window=30,
    data_granularity_unit="day",
    data_granularity_count=1,
    weight_column=None,
    budget_milli_node_hours=1000,
    model_display_name=f"auto-ml-forecasting-model-{DISPLAY_NAME}",
    predefined_split_column_name=None,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run AutoML Image Training Job CreateAutoMLImageTrainingJobOperator

Before start running this Job you must prepare and create Image dataset. After that you should put dataset id to dataset_id parameter in operator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
    task_id="auto_ml_image_task",
    display_name=f"auto-ml-image-{DISPLAY_NAME}",
    dataset_id=DATASET_ID,
    prediction_type="classification",
    multi_label=False,
    model_type="CLOUD",
    training_fraction_split=0.6,
    validation_fraction_split=0.2,
    test_fraction_split=0.2,
    budget_milli_node_hours=8000,
    model_display_name=f"auto-ml-image-model-{DISPLAY_NAME}",
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run AutoML Tabular Training Job CreateAutoMLTabularTrainingJobOperator

Before start running this Job you must prepare and create Tabular dataset. After that you should put dataset id to dataset_id parameter in operator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
    task_id="auto_ml_tabular_task",
    display_name=f"auto-ml-tabular-{DISPLAY_NAME}",
    optimization_prediction_type="classification",
    column_transformations=COLUMN_TRANSFORMATIONS,
    dataset_id=DATASET_ID,
    target_column="Adopted",
    training_fraction_split=0.8,
    validation_fraction_split=0.1,
    test_fraction_split=0.1,
    model_display_name="adopted-prediction-model",
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run AutoML Text Training Job CreateAutoMLTextTrainingJobOperator

Before start running this Job you must prepare and create Text dataset. After that you should put dataset id to dataset_id parameter in operator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_auto_ml_text_training_job = CreateAutoMLTextTrainingJobOperator(
    task_id="auto_ml_text_task",
    display_name=f"auto-ml-text-{DISPLAY_NAME}",
    prediction_type="classification",
    multi_label=False,
    dataset_id=DATASET_ID,
    model_display_name=f"auto-ml-text-model-{DISPLAY_NAME}",
    training_fraction_split=0.7,
    validation_fraction_split=0.2,
    test_fraction_split=0.1,
    sync=True,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run AutoML Video Training Job CreateAutoMLVideoTrainingJobOperator

Before start running this Job you must prepare and create Video dataset. After that you should put dataset id to dataset_id parameter in operator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_task",
    display_name=f"auto-ml-video-{DISPLAY_NAME}",
    prediction_type="classification",
    model_type="CLOUD",
    dataset_id=DATASET_ID,
    model_display_name=f"auto-ml-video-model-{DISPLAY_NAME}",
    region=REGION,
    project_id=PROJECT_ID,
)

You can get a list of AutoML Training Jobs using ListAutoMLTrainingJobOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

list_auto_ml_training_job = ListAutoMLTrainingJobOperator(
    task_id="list_auto_ml_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

If you wish to delete a Auto ML Training Job you can use DeleteAutoMLTrainingJobOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

delete_auto_ml_training_job = DeleteAutoMLTrainingJobOperator(
    task_id="delete_auto_ml_training_job",
    training_pipeline_id=TRAINING_PIPELINE_ID,
    region=REGION,
    project_id=PROJECT_ID,
)

Creating a Batch Prediction Jobs

To create a Google VertexAI Batch Prediction Job you can use CreateBatchPredictionJobOperator. The operator returns batch prediction job id in XCom under batch_prediction_job_id key.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_batch_prediction_job = CreateBatchPredictionJobOperator(
    task_id="create_batch_prediction_job",
    job_display_name=JOB_DISPLAY_NAME,
    model_name=MODEL_NAME,
    predictions_format="csv",
    bigquery_source=BIGQUERY_SOURCE,
    gcs_destination_prefix=GCS_DESTINATION_PREFIX,
    model_parameters=MODEL_PARAMETERS,
    region=REGION,
    project_id=PROJECT_ID,
)

To delete batch prediction job you can use DeleteBatchPredictionJobOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
    task_id="delete_batch_prediction_job",
    batch_prediction_job_id=create_batch_prediction_job.output['batch_prediction_job_id'],
    region=REGION,
    project_id=PROJECT_ID,
)

To get a batch prediction job list you can use ListBatchPredictionJobsOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

list_batch_prediction_job = ListBatchPredictionJobsOperator(
    task_id="list_batch_prediction_jobs",
    region=REGION,
    project_id=PROJECT_ID,
)

Creating an Endpoint Service

To create a Google VertexAI endpoint you can use CreateEndpointOperator. The operator returns endpoint id in XCom under endpoint_id key.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_endpoint = CreateEndpointOperator(
    task_id="create_endpoint",
    endpoint=ENDPOINT_CONF,
    region=REGION,
    project_id=PROJECT_ID,
)

After creating an endpoint you can use it to deploy some model using DeployModelOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

deploy_model = DeployModelOperator(
    task_id="deploy_model",
    endpoint_id=create_endpoint.output['endpoint_id'],
    deployed_model=DEPLOYED_MODEL,
    traffic_split={'0': 100},
    region=REGION,
    project_id=PROJECT_ID,
)

To un deploy model you can use UndeployModelOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

undeploy_model = UndeployModelOperator(
    task_id="undeploy_model",
    endpoint_id=create_endpoint.output['endpoint_id'],
    deployed_model_id=deploy_model.output['deployed_model_id'],
    region=REGION,
    project_id=PROJECT_ID,
)

To delete endpoint you can use DeleteEndpointOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

delete_endpoint = DeleteEndpointOperator(
    task_id="delete_endpoint",
    endpoint_id=create_endpoint.output['endpoint_id'],
    region=REGION,
    project_id=PROJECT_ID,
)

To get an endpoint list you can use ListEndpointsOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

list_endpoints = ListEndpointsOperator(
    task_id="list_endpoints",
    region=REGION,
    project_id=PROJECT_ID,
)

Creating a Hyperparameter Tuning Jobs

To create a Google VertexAI hyperparameter tuning job you can use CreateHyperparameterTuningJobOperator. The operator returns hyperparameter tuning job id in XCom under hyperparameter_tuning_job_id key.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
    task_id="create_hyperparameter_tuning_job",
    staging_bucket=STAGING_BUCKET,
    display_name=f"horses-humans-hyptertune-{DISPLAY_NAME}",
    worker_pool_specs=[
        {
            "machine_spec": {
                "machine_type": MACHINE_TYPE,
                "accelerator_type": ACCELERATOR_TYPE,
                "accelerator_count": ACCELERATOR_COUNT,
            },
            "replica_count": REPLICA_COUNT,
            "container_spec": {
                "image_uri": f"gcr.io/{PROJECT_ID}/horse-human:hypertune",
            },
        }
    ],
    sync=False,
    region=REGION,
    project_id=PROJECT_ID,
    parameter_spec={
        'learning_rate': aiplatform.hyperparameter_tuning.DoubleParameterSpec(
            min=0.01, max=1, scale='log'
        ),
        'momentum': aiplatform.hyperparameter_tuning.DoubleParameterSpec(min=0, max=1, scale='linear'),
        'num_neurons': aiplatform.hyperparameter_tuning.DiscreteParameterSpec(
            values=[64, 128, 512], scale='linear'
        ),
    },
    metric_spec={
        'accuracy': 'maximize',
    },
    max_trial_count=15,
    parallel_trial_count=3,
)

To delete hyperparameter tuning job you can use DeleteHyperparameterTuningJobOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
    task_id="delete_hyperparameter_tuning_job",
    project_id=PROJECT_ID,
    region=REGION,
    hyperparameter_tuning_job_id=create_hyperparameter_tuning_job.output["hyperparameter_tuning_job_id"],
)

To get hyperparameter tuning job you can use GetHyperparameterTuningJobOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
    task_id="get_hyperparameter_tuning_job",
    project_id=PROJECT_ID,
    region=REGION,
    hyperparameter_tuning_job_id=create_hyperparameter_tuning_job.output["hyperparameter_tuning_job_id"],
)

To get a hyperparameter tuning job list you can use ListHyperparameterTuningJobOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
    task_id="list_hyperparameter_tuning_job",
    region=REGION,
    project_id=PROJECT_ID,
)

Creating a Model Service

To upload a Google VertexAI model you can use UploadModelOperator. The operator returns model id in XCom under model_id key.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

upload_model = UploadModelOperator(
    task_id="upload_model",
    region=REGION,
    project_id=PROJECT_ID,
    model=MODEL_OBJ,
)

To export model you can use ExportModelOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

export_model = ExportModelOperator(
    task_id="export_model",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=upload_model.output["model_id"],
    output_config=MODEL_OUTPUT_CONFIG,
)

To delete model you can use DeleteModelOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

delete_model = DeleteModelOperator(
    task_id="delete_model",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=upload_model.output["model_id"],
)

To get a model list you can use ListModelsOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

list_models = ListModelsOperator(
    task_id="list_models",
    region=REGION,
    project_id=PROJECT_ID,
)

Reference

For further information, look at:

Was this entry helpful?