Source code for airflow.providers.google.cloud.example_dags.example_vertex_ai

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# mypy ignore arg types (for templated fields)
# type: ignore[arg-type]

"""
Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
Cloud Platform.

This DAG relies on the following OS environment variables:

* GCP_VERTEX_AI_BUCKET - Google Cloud Storage bucket where the model will be saved
  after training process was finished.
* CUSTOM_CONTAINER_URI - path to container with model.
* PYTHON_PACKAGE_GSC_URI - path to test model in archive.
* LOCAL_TRAINING_SCRIPT_PATH - path to local training script.
* DATASET_ID - ID of dataset which will be used in training process.
* MODEL_ID - ID of model which will be used in predict process.
* MODEL_ARTIFACT_URI - The artifact_uri should be the path to a GCS directory containing saved model
  artifacts.
"""

import os
from datetime import datetime
from uuid import uuid4

from google.cloud import aiplatform
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value

from airflow import models
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
    CreateAutoMLForecastingTrainingJobOperator,
    CreateAutoMLImageTrainingJobOperator,
    CreateAutoMLTabularTrainingJobOperator,
    CreateAutoMLTextTrainingJobOperator,
    CreateAutoMLVideoTrainingJobOperator,
    DeleteAutoMLTrainingJobOperator,
    ListAutoMLTrainingJobOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job import (
    CreateBatchPredictionJobOperator,
    DeleteBatchPredictionJobOperator,
    ListBatchPredictionJobsOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
    CreateCustomContainerTrainingJobOperator,
    CreateCustomPythonPackageTrainingJobOperator,
    CreateCustomTrainingJobOperator,
    DeleteCustomTrainingJobOperator,
    ListCustomTrainingJobOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
    CreateDatasetOperator,
    DeleteDatasetOperator,
    ExportDataOperator,
    GetDatasetOperator,
    ImportDataOperator,
    ListDatasetsOperator,
    UpdateDatasetOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.endpoint_service import (
    CreateEndpointOperator,
    DeleteEndpointOperator,
    DeployModelOperator,
    ListEndpointsOperator,
    UndeployModelOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job import (
    CreateHyperparameterTuningJobOperator,
    DeleteHyperparameterTuningJobOperator,
    GetHyperparameterTuningJobOperator,
    ListHyperparameterTuningJobOperator,
)
from airflow.providers.google.cloud.operators.vertex_ai.model_service import (
    DeleteModelOperator,
    ExportModelOperator,
    ListModelsOperator,
    UploadModelOperator,
)

[docs]PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
[docs]REGION = os.environ.get("GCP_LOCATION", "us-central1")
[docs]BUCKET = os.environ.get("GCP_VERTEX_AI_BUCKET", "vertex-ai-system-tests")
[docs]STAGING_BUCKET = f"gs://{BUCKET}"
[docs]DISPLAY_NAME = str(uuid4()) # Create random display name
[docs]CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
[docs]CUSTOM_CONTAINER_URI = os.environ.get("CUSTOM_CONTAINER_URI", "path_to_container_with_model")
[docs]MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
[docs]REPLICA_COUNT = 1
[docs]MACHINE_TYPE = "n1-standard-4"
[docs]ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
[docs]ACCELERATOR_COUNT = 0
[docs]TRAINING_FRACTION_SPLIT = 0.7
[docs]TEST_FRACTION_SPLIT = 0.15
[docs]VALIDATION_FRACTION_SPLIT = 0.15
[docs]PYTHON_PACKAGE_GCS_URI = os.environ.get("PYTHON_PACKAGE_GSC_URI", "path_to_test_model_in_arch")
[docs]PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
[docs]LOCAL_TRAINING_SCRIPT_PATH = os.environ.get("LOCAL_TRAINING_SCRIPT_PATH", "path_to_training_script")
[docs]TRAINING_PIPELINE_ID = "test-training-pipeline-id"
[docs]CUSTOM_JOB_ID = "test-custom-job-id"
[docs]IMAGE_DATASET = { "display_name": str(uuid4()), "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml", "metadata": Value(string_value="test-image-dataset"),
}
[docs]TABULAR_DATASET = { "display_name": str(uuid4()), "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml", "metadata": Value(string_value="test-tabular-dataset"),
}
[docs]TEXT_DATASET = { "display_name": str(uuid4()), "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml", "metadata": Value(string_value="test-text-dataset"),
}
[docs]VIDEO_DATASET = { "display_name": str(uuid4()), "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml", "metadata": Value(string_value="test-video-dataset"),
}
[docs]TIME_SERIES_DATASET = { "display_name": str(uuid4()), "metadata_schema_uri": "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml", "metadata": Value(string_value="test-video-dataset"),
}
[docs]DATASET_ID = os.environ.get("DATASET_ID", "test-dataset-id")
[docs]TEST_EXPORT_CONFIG = {"gcs_destination": {"output_uri_prefix": "gs://test-vertex-ai-bucket/exports"}}
[docs]TEST_IMPORT_CONFIG = [ { "data_item_labels": { "test-labels-name": "test-labels-value", }, "import_schema_uri": ( "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml" ), "gcs_source": { "uris": ["gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"]
}, }, ]
[docs]DATASET_TO_UPDATE = {"display_name": "test-name"}
[docs]TEST_UPDATE_MASK = {"paths": ["displayName"]}
[docs]TEST_TIME_COLUMN = "date"
[docs]TEST_TIME_SERIES_IDENTIFIER_COLUMN = "store_name"
[docs]TEST_TARGET_COLUMN = "sale_dollars"
[docs]COLUMN_SPECS = { TEST_TIME_COLUMN: "timestamp", TEST_TARGET_COLUMN: "numeric", "city": "categorical", "zip_code": "categorical", "county": "categorical",
}
[docs]COLUMN_TRANSFORMATIONS = [ {"categorical": {"column_name": "Type"}}, {"numeric": {"column_name": "Age"}}, {"categorical": {"column_name": "Breed1"}}, {"categorical": {"column_name": "Color1"}}, {"categorical": {"column_name": "Color2"}}, {"categorical": {"column_name": "MaturitySize"}}, {"categorical": {"column_name": "FurLength"}}, {"categorical": {"column_name": "Vaccinated"}}, {"categorical": {"column_name": "Sterilized"}}, {"categorical": {"column_name": "Health"}}, {"numeric": {"column_name": "Fee"}}, {"numeric": {"column_name": "PhotoAmt"}},
]
[docs]MODEL_ID = os.environ.get("MODEL_ID", "test-model-id")
[docs]MODEL_ARTIFACT_URI = os.environ.get("MODEL_ARTIFACT_URI", "path_to_folder_with_model_artifacts")
[docs]MODEL_NAME = f"projects/{PROJECT_ID}/locations/{REGION}/models/{MODEL_ID}"
[docs]JOB_DISPLAY_NAME = f"temp_create_batch_prediction_job_test_{uuid4()}"
[docs]BIGQUERY_SOURCE = f"bq://{PROJECT_ID}.test_iowa_liquor_sales_forecasting_us.2021_sales_predict"
[docs]GCS_DESTINATION_PREFIX = "gs://test-vertex-ai-bucket-us/output"
[docs]MODEL_PARAMETERS = json_format.ParseDict({}, Value())
[docs]ENDPOINT_CONF = { "display_name": f"endpoint_test_{uuid4()}",
}
[docs]DEPLOYED_MODEL = { # format: 'projects/{project}/locations/{location}/models/{model}' 'model': f"projects/{PROJECT_ID}/locations/{REGION}/models/{MODEL_ID}", 'display_name': f"temp_endpoint_test_{uuid4()}", "dedicated_resources": { "machine_spec": { "machine_type": "n1-standard-2", "accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80, "accelerator_count": 1, }, 'min_replica_count': 1, "max_replica_count": 1,
}, }
[docs]MODEL_OUTPUT_CONFIG = { "artifact_destination": { "output_uri_prefix": STAGING_BUCKET, }, "export_format_id": "custom-trained",
}
[docs]MODEL_OBJ = { "display_name": f"model-{str(uuid4())}", "artifact_uri": MODEL_ARTIFACT_URI, "container_spec": { "image_uri": MODEL_SERVING_CONTAINER_URI, "command": [], "args": [], "env": [], "ports": [], "predict_route": "", "health_route": "",
}, } with models.DAG( "example_gcp_vertex_ai_custom_jobs", schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, ) as custom_jobs_dag: # [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
[docs] create_custom_container_training_job = CreateCustomContainerTrainingJobOperator( task_id="custom_container_task", staging_bucket=STAGING_BUCKET, display_name=f"train-housing-container-{DISPLAY_NAME}", container_uri=CUSTOM_CONTAINER_URI, model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI, # run params dataset_id=DATASET_ID, command=["python3", "task.py"], model_display_name=f"container-housing-model-{DISPLAY_NAME}", replica_count=REPLICA_COUNT, machine_type=MACHINE_TYPE, accelerator_type=ACCELERATOR_TYPE, accelerator_count=ACCELERATOR_COUNT, training_fraction_split=TRAINING_FRACTION_SPLIT, validation_fraction_split=VALIDATION_FRACTION_SPLIT, test_fraction_split=TEST_FRACTION_SPLIT, region=REGION, project_id=PROJECT_ID,
) # [END how_to_cloud_vertex_ai_create_custom_container_training_job_operator] # [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator] create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator( task_id="python_package_task", staging_bucket=STAGING_BUCKET, display_name=f"train-housing-py-package-{DISPLAY_NAME}", python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI, python_module_name=PYTHON_MODULE_NAME, container_uri=CONTAINER_URI, model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI, # run params dataset_id=DATASET_ID, model_display_name=f"py-package-housing-model-{DISPLAY_NAME}", replica_count=REPLICA_COUNT, machine_type=MACHINE_TYPE, accelerator_type=ACCELERATOR_TYPE, accelerator_count=ACCELERATOR_COUNT, training_fraction_split=TRAINING_FRACTION_SPLIT, validation_fraction_split=VALIDATION_FRACTION_SPLIT, test_fraction_split=TEST_FRACTION_SPLIT, region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator] # [START how_to_cloud_vertex_ai_create_custom_training_job_operator] create_custom_training_job = CreateCustomTrainingJobOperator( task_id="custom_task", staging_bucket=STAGING_BUCKET, display_name=f"train-housing-custom-{DISPLAY_NAME}", script_path=LOCAL_TRAINING_SCRIPT_PATH, container_uri=CONTAINER_URI, requirements=["gcsfs==0.7.1"], model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI, # run params dataset_id=DATASET_ID, replica_count=1, model_display_name=f"custom-housing-model-{DISPLAY_NAME}", sync=False, region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_create_custom_training_job_operator] # [START how_to_cloud_vertex_ai_delete_custom_training_job_operator] delete_custom_training_job = DeleteCustomTrainingJobOperator( task_id="delete_custom_training_job", training_pipeline_id=TRAINING_PIPELINE_ID, custom_job_id=CUSTOM_JOB_ID, region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_delete_custom_training_job_operator] # [START how_to_cloud_vertex_ai_list_custom_training_job_operator] list_custom_training_job = ListCustomTrainingJobOperator( task_id="list_custom_training_job", region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_list_custom_training_job_operator] with models.DAG( "example_gcp_vertex_ai_dataset", schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, ) as dataset_dag: # [START how_to_cloud_vertex_ai_create_dataset_operator]
[docs] create_image_dataset_job = CreateDatasetOperator( task_id="image_dataset", dataset=IMAGE_DATASET, region=REGION, project_id=PROJECT_ID,
) create_tabular_dataset_job = CreateDatasetOperator( task_id="tabular_dataset", dataset=TABULAR_DATASET, region=REGION, project_id=PROJECT_ID, ) create_text_dataset_job = CreateDatasetOperator( task_id="text_dataset", dataset=TEXT_DATASET, region=REGION, project_id=PROJECT_ID, ) create_video_dataset_job = CreateDatasetOperator( task_id="video_dataset", dataset=VIDEO_DATASET, region=REGION, project_id=PROJECT_ID, ) create_time_series_dataset_job = CreateDatasetOperator( task_id="time_series_dataset", dataset=TIME_SERIES_DATASET, region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_create_dataset_operator] # [START how_to_cloud_vertex_ai_delete_dataset_operator] delete_dataset_job = DeleteDatasetOperator( task_id="delete_dataset", dataset_id=create_text_dataset_job.output['dataset_id'], region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_delete_dataset_operator] # [START how_to_cloud_vertex_ai_get_dataset_operator] get_dataset = GetDatasetOperator( task_id="get_dataset", project_id=PROJECT_ID, region=REGION, dataset_id=create_tabular_dataset_job.output['dataset_id'], ) # [END how_to_cloud_vertex_ai_get_dataset_operator] # [START how_to_cloud_vertex_ai_export_data_operator] export_data_job = ExportDataOperator( task_id="export_data", dataset_id=create_image_dataset_job.output['dataset_id'], region=REGION, project_id=PROJECT_ID, export_config=TEST_EXPORT_CONFIG, ) # [END how_to_cloud_vertex_ai_export_data_operator] # [START how_to_cloud_vertex_ai_import_data_operator] import_data_job = ImportDataOperator( task_id="import_data", dataset_id=create_image_dataset_job.output['dataset_id'], region=REGION, project_id=PROJECT_ID, import_configs=TEST_IMPORT_CONFIG, ) # [END how_to_cloud_vertex_ai_import_data_operator] # [START how_to_cloud_vertex_ai_list_dataset_operator] list_dataset_job = ListDatasetsOperator( task_id="list_dataset", region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_list_dataset_operator] # [START how_to_cloud_vertex_ai_update_dataset_operator] update_dataset_job = UpdateDatasetOperator( task_id="update_dataset", project_id=PROJECT_ID, region=REGION, dataset_id=create_video_dataset_job.output['dataset_id'], dataset=DATASET_TO_UPDATE, update_mask=TEST_UPDATE_MASK, ) # [END how_to_cloud_vertex_ai_update_dataset_operator] create_time_series_dataset_job create_text_dataset_job >> delete_dataset_job create_tabular_dataset_job >> get_dataset create_image_dataset_job >> import_data_job >> export_data_job create_video_dataset_job >> update_dataset_job list_dataset_job with models.DAG( "example_gcp_vertex_ai_auto_ml", schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, ) as auto_ml_dag: # [START how_to_cloud_vertex_ai_create_auto_ml_forecasting_training_job_operator]
[docs] create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator( task_id="auto_ml_forecasting_task", display_name=f"auto-ml-forecasting-{DISPLAY_NAME}", optimization_objective="minimize-rmse", column_specs=COLUMN_SPECS, # run params dataset_id=DATASET_ID, target_column=TEST_TARGET_COLUMN, time_column=TEST_TIME_COLUMN, time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN, available_at_forecast_columns=[TEST_TIME_COLUMN], unavailable_at_forecast_columns=[TEST_TARGET_COLUMN], time_series_attribute_columns=["city", "zip_code", "county"], forecast_horizon=30, context_window=30, data_granularity_unit="day", data_granularity_count=1, weight_column=None, budget_milli_node_hours=1000, model_display_name=f"auto-ml-forecasting-model-{DISPLAY_NAME}", predefined_split_column_name=None, region=REGION, project_id=PROJECT_ID,
) # [END how_to_cloud_vertex_ai_create_auto_ml_forecasting_training_job_operator] # [START how_to_cloud_vertex_ai_create_auto_ml_image_training_job_operator] create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator( task_id="auto_ml_image_task", display_name=f"auto-ml-image-{DISPLAY_NAME}", dataset_id=DATASET_ID, prediction_type="classification", multi_label=False, model_type="CLOUD", training_fraction_split=0.6, validation_fraction_split=0.2, test_fraction_split=0.2, budget_milli_node_hours=8000, model_display_name=f"auto-ml-image-model-{DISPLAY_NAME}", disable_early_stopping=False, region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_create_auto_ml_image_training_job_operator] # [START how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator] create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator( task_id="auto_ml_tabular_task", display_name=f"auto-ml-tabular-{DISPLAY_NAME}", optimization_prediction_type="classification", column_transformations=COLUMN_TRANSFORMATIONS, dataset_id=DATASET_ID, target_column="Adopted", training_fraction_split=0.8, validation_fraction_split=0.1, test_fraction_split=0.1, model_display_name="adopted-prediction-model", disable_early_stopping=False, region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator] # [START how_to_cloud_vertex_ai_create_auto_ml_text_training_job_operator] create_auto_ml_text_training_job = CreateAutoMLTextTrainingJobOperator( task_id="auto_ml_text_task", display_name=f"auto-ml-text-{DISPLAY_NAME}", prediction_type="classification", multi_label=False, dataset_id=DATASET_ID, model_display_name=f"auto-ml-text-model-{DISPLAY_NAME}", training_fraction_split=0.7, validation_fraction_split=0.2, test_fraction_split=0.1, sync=True, region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_create_auto_ml_text_training_job_operator] # [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator] create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator( task_id="auto_ml_video_task", display_name=f"auto-ml-video-{DISPLAY_NAME}", prediction_type="classification", model_type="CLOUD", dataset_id=DATASET_ID, model_display_name=f"auto-ml-video-model-{DISPLAY_NAME}", region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator] # [START how_to_cloud_vertex_ai_delete_auto_ml_training_job_operator] delete_auto_ml_training_job = DeleteAutoMLTrainingJobOperator( task_id="delete_auto_ml_training_job", training_pipeline_id=TRAINING_PIPELINE_ID, region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_delete_auto_ml_training_job_operator] # [START how_to_cloud_vertex_ai_list_auto_ml_training_job_operator] list_auto_ml_training_job = ListAutoMLTrainingJobOperator( task_id="list_auto_ml_training_job", region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_list_auto_ml_training_job_operator] with models.DAG( "example_gcp_vertex_ai_batch_prediction_job", schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, ) as batch_prediction_job_dag: # [START how_to_cloud_vertex_ai_create_batch_prediction_job_operator]
[docs] create_batch_prediction_job = CreateBatchPredictionJobOperator( task_id="create_batch_prediction_job", job_display_name=JOB_DISPLAY_NAME, model_name=MODEL_NAME, predictions_format="csv", bigquery_source=BIGQUERY_SOURCE, gcs_destination_prefix=GCS_DESTINATION_PREFIX, model_parameters=MODEL_PARAMETERS, region=REGION, project_id=PROJECT_ID,
) # [END how_to_cloud_vertex_ai_create_batch_prediction_job_operator] # [START how_to_cloud_vertex_ai_list_batch_prediction_job_operator] list_batch_prediction_job = ListBatchPredictionJobsOperator( task_id="list_batch_prediction_jobs", region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_list_batch_prediction_job_operator] # [START how_to_cloud_vertex_ai_delete_batch_prediction_job_operator] delete_batch_prediction_job = DeleteBatchPredictionJobOperator( task_id="delete_batch_prediction_job", batch_prediction_job_id=create_batch_prediction_job.output['batch_prediction_job_id'], region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_delete_batch_prediction_job_operator] create_batch_prediction_job >> delete_batch_prediction_job list_batch_prediction_job with models.DAG( "example_gcp_vertex_ai_endpoint", schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, ) as endpoint_dag: # [START how_to_cloud_vertex_ai_create_endpoint_operator]
[docs] create_endpoint = CreateEndpointOperator( task_id="create_endpoint", endpoint=ENDPOINT_CONF, region=REGION, project_id=PROJECT_ID,
) # [END how_to_cloud_vertex_ai_create_endpoint_operator] # [START how_to_cloud_vertex_ai_delete_endpoint_operator] delete_endpoint = DeleteEndpointOperator( task_id="delete_endpoint", endpoint_id=create_endpoint.output['endpoint_id'], region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_delete_endpoint_operator] # [START how_to_cloud_vertex_ai_list_endpoints_operator] list_endpoints = ListEndpointsOperator( task_id="list_endpoints", region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_list_endpoints_operator] # [START how_to_cloud_vertex_ai_deploy_model_operator] deploy_model = DeployModelOperator( task_id="deploy_model", endpoint_id=create_endpoint.output['endpoint_id'], deployed_model=DEPLOYED_MODEL, traffic_split={'0': 100}, region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_deploy_model_operator] # [START how_to_cloud_vertex_ai_undeploy_model_operator] undeploy_model = UndeployModelOperator( task_id="undeploy_model", endpoint_id=create_endpoint.output['endpoint_id'], deployed_model_id=deploy_model.output['deployed_model_id'], region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_undeploy_model_operator] create_endpoint >> deploy_model >> undeploy_model >> delete_endpoint list_endpoints with models.DAG( "example_gcp_vertex_ai_hyperparameter_tuning_job", schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, ) as hyperparameter_tuning_job_dag: # [START how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]
[docs] create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator( task_id="create_hyperparameter_tuning_job", staging_bucket=STAGING_BUCKET, display_name=f"horses-humans-hyptertune-{DISPLAY_NAME}", worker_pool_specs=[ { "machine_spec": { "machine_type": MACHINE_TYPE, "accelerator_type": ACCELERATOR_TYPE, "accelerator_count": ACCELERATOR_COUNT, }, "replica_count": REPLICA_COUNT, "container_spec": { "image_uri": f"gcr.io/{PROJECT_ID}/horse-human:hypertune", }, } ], sync=False, region=REGION, project_id=PROJECT_ID, parameter_spec={ 'learning_rate': aiplatform.hyperparameter_tuning.DoubleParameterSpec( min=0.01, max=1, scale='log' ), 'momentum': aiplatform.hyperparameter_tuning.DoubleParameterSpec(min=0, max=1, scale='linear'), 'num_neurons': aiplatform.hyperparameter_tuning.DiscreteParameterSpec( values=[64, 128, 512], scale='linear' ), }, metric_spec={ 'accuracy': 'maximize', }, max_trial_count=15, parallel_trial_count=3,
) # [END how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator] # [START how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator] get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator( task_id="get_hyperparameter_tuning_job", project_id=PROJECT_ID, region=REGION, hyperparameter_tuning_job_id=create_hyperparameter_tuning_job.output["hyperparameter_tuning_job_id"], ) # [END how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator] # [START how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator] delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator( task_id="delete_hyperparameter_tuning_job", project_id=PROJECT_ID, region=REGION, hyperparameter_tuning_job_id=create_hyperparameter_tuning_job.output["hyperparameter_tuning_job_id"], ) # [END how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator] # [START how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator] list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator( task_id="list_hyperparameter_tuning_job", region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator] create_hyperparameter_tuning_job >> get_hyperparameter_tuning_job >> delete_hyperparameter_tuning_job list_hyperparameter_tuning_job with models.DAG( "example_gcp_vertex_ai_model_service", schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, ) as model_service_dag: # [START how_to_cloud_vertex_ai_upload_model_operator]
[docs] upload_model = UploadModelOperator( task_id="upload_model", region=REGION, project_id=PROJECT_ID, model=MODEL_OBJ,
) # [END how_to_cloud_vertex_ai_upload_model_operator] # [START how_to_cloud_vertex_ai_export_model_operator] export_model = ExportModelOperator( task_id="export_model", project_id=PROJECT_ID, region=REGION, model_id=upload_model.output["model_id"], output_config=MODEL_OUTPUT_CONFIG, ) # [END how_to_cloud_vertex_ai_export_model_operator] # [START how_to_cloud_vertex_ai_delete_model_operator] delete_model = DeleteModelOperator( task_id="delete_model", project_id=PROJECT_ID, region=REGION, model_id=upload_model.output["model_id"], ) # [END how_to_cloud_vertex_ai_delete_model_operator] # [START how_to_cloud_vertex_ai_list_models_operator] list_models = ListModelsOperator( task_id="list_models", region=REGION, project_id=PROJECT_ID, ) # [END how_to_cloud_vertex_ai_list_models_operator] upload_model >> export_model >> delete_model list_models

Was this entry helpful?