Google Cloud VertexAI Operators

The Google Cloud VertexAI brings AutoML and AI Platform together into a unified API, client library, and user interface. AutoML lets you train models on image, tabular, text, and video datasets without writing code, while training in AI Platform lets you run custom training code. With Vertex AI, both AutoML training and custom training are available options. Whichever option you choose for training, you can save models, deploy models, and request predictions with Vertex AI.

Creating Datasets

To create a Google VertexAI dataset you can use CreateDatasetOperator. The operator returns dataset id in XCom under dataset_id key.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_image_dataset_job = CreateDatasetOperator(
    task_id="image_dataset",
    dataset=IMAGE_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_tabular_dataset_job = CreateDatasetOperator(
    task_id="tabular_dataset",
    dataset=TABULAR_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_text_dataset_job = CreateDatasetOperator(
    task_id="text_dataset",
    dataset=TEXT_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_video_dataset_job = CreateDatasetOperator(
    task_id="video_dataset",
    dataset=VIDEO_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_time_series_dataset_job = CreateDatasetOperator(
    task_id="time_series_dataset",
    dataset=TIME_SERIES_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)

After creating a dataset you can use it to import some data using ImportDataOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

import_data_job = ImportDataOperator(
    task_id="import_data",
    dataset_id=create_image_dataset_job.output['dataset_id'],
    region=REGION,
    project_id=PROJECT_ID,
    import_configs=TEST_IMPORT_CONFIG,
)

To export dataset you can use ExportDataOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

export_data_job = ExportDataOperator(
    task_id="export_data",
    dataset_id=create_image_dataset_job.output['dataset_id'],
    region=REGION,
    project_id=PROJECT_ID,
    export_config=TEST_EXPORT_CONFIG,
)

To delete dataset you can use DeleteDatasetOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

delete_dataset_job = DeleteDatasetOperator(
    task_id="delete_dataset",
    dataset_id=create_text_dataset_job.output['dataset_id'],
    region=REGION,
    project_id=PROJECT_ID,
)

To get dataset you can use GetDatasetOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

get_dataset = GetDatasetOperator(
    task_id="get_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_tabular_dataset_job.output['dataset_id'],
)

To get a dataset list you can use ListDatasetsOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

list_dataset_job = ListDatasetsOperator(
    task_id="list_dataset",
    region=REGION,
    project_id=PROJECT_ID,
)

To update dataset you can use UpdateDatasetOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

update_dataset_job = UpdateDatasetOperator(
    task_id="update_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_video_dataset_job.output['dataset_id'],
    dataset=DATASET_TO_UPDATE,
    update_mask=TEST_UPDATE_MASK,
)

Creating a Training Jobs

To create a Google Vertex AI training jobs you have three operators CreateCustomContainerTrainingJobOperator, CreateCustomPythonPackageTrainingJobOperator, CreateCustomTrainingJobOperator. Each of them will wait for the operation to complete. The results of each operator will be a model which was trained by user using these operators.

Preparation step

For each operator you must prepare and create dataset. Then put dataset id to dataset_id parameter in operator.

How to run Container Training Job CreateCustomContainerTrainingJobOperator

Before start running this Job you should create a docker image with training script inside. Documentation how to create image you can find by this link: https://cloud.google.com/vertex-ai/docs/training/create-custom-container After that you should put link to the image in container_uri parameter. Also you can type executing command for container which will be created from this image in command parameter.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
    task_id="custom_container_task",
    staging_bucket=STAGING_BUCKET,
    display_name=f"train-housing-container-{DISPLAY_NAME}",
    container_uri=CUSTOM_CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=DATASET_ID,
    command=["python3", "task.py"],
    model_display_name=f"container-housing-model-{DISPLAY_NAME}",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run Python Package Training Job CreateCustomPythonPackageTrainingJobOperator

Before start running this Job you should create a python package with training script inside. Documentation how to create you can find by this link: https://cloud.google.com/vertex-ai/docs/training/create-python-pre-built-container Next you should put link to the package in python_package_gcs_uri parameter, also python_module_name parameter should has the name of script which will run your training task.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
    task_id="python_package_task",
    staging_bucket=STAGING_BUCKET,
    display_name=f"train-housing-py-package-{DISPLAY_NAME}",
    python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
    python_module_name=PYTHON_MODULE_NAME,
    container_uri=CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=DATASET_ID,
    model_display_name=f"py-package-housing-model-{DISPLAY_NAME}",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

How to run Training Job CreateCustomTrainingJobOperator.

For this Job you should put path to your local training script inside script_path parameter.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

create_custom_training_job = CreateCustomTrainingJobOperator(
    task_id="custom_task",
    staging_bucket=STAGING_BUCKET,
    display_name=f"train-housing-custom-{DISPLAY_NAME}",
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=DATASET_ID,
    replica_count=1,
    model_display_name=f"custom-housing-model-{DISPLAY_NAME}",
    sync=False,
    region=REGION,
    project_id=PROJECT_ID,
)

You can get a list of Training Jobs using ListCustomTrainingJobOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

list_custom_training_job = ListCustomTrainingJobOperator(
    task_id="list_custom_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

If you wish to delete a Custom Training Job you can use DeleteCustomTrainingJobOperator.

airflow/providers/google/cloud/example_dags/example_vertex_ai.py[source]

delete_custom_training_job = DeleteCustomTrainingJobOperator(
    task_id="delete_custom_training_job",
    training_pipeline_id=TRAINING_PIPELINE_ID,
    custom_job_id=CUSTOM_JOB_ID,
    region=REGION,
    project_id=PROJECT_ID,
)

Reference

For further information, look at:

Was this entry helpful?