Google Cloud VertexAI Operators¶
The Google Cloud VertexAI brings AutoML and AI Platform together into a unified API, client library, and user interface. AutoML lets you train models on image, tabular, text, and video datasets without writing code, while training in AI Platform lets you run custom training code. With Vertex AI, both AutoML training and custom training are available options. Whichever option you choose for training, you can save models, deploy models, and request predictions with Vertex AI.
Creating Datasets¶
To create a Google VertexAI dataset you can use
CreateDatasetOperator
.
The operator returns dataset id in XCom under dataset_id
key.
create_image_dataset_job = CreateDatasetOperator(
task_id="image_dataset",
dataset=IMAGE_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_tabular_dataset_job = CreateDatasetOperator(
task_id="tabular_dataset",
dataset=TABULAR_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_text_dataset_job = CreateDatasetOperator(
task_id="text_dataset",
dataset=TEXT_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_video_dataset_job = CreateDatasetOperator(
task_id="video_dataset",
dataset=VIDEO_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
create_time_series_dataset_job = CreateDatasetOperator(
task_id="time_series_dataset",
dataset=TIME_SERIES_DATASET,
region=REGION,
project_id=PROJECT_ID,
)
After creating a dataset you can use it to import some data using
ImportDataOperator
.
import_data_job = ImportDataOperator(
task_id="import_data",
dataset_id=create_image_dataset_job.output["dataset_id"],
region=REGION,
project_id=PROJECT_ID,
import_configs=TEST_IMPORT_CONFIG,
)
To export dataset you can use
ExportDataOperator
.
export_data_job = ExportDataOperator(
task_id="export_data",
dataset_id=create_image_dataset_job.output["dataset_id"],
region=REGION,
project_id=PROJECT_ID,
export_config=TEST_EXPORT_CONFIG,
)
To delete dataset you can use
DeleteDatasetOperator
.
delete_dataset_job = DeleteDatasetOperator(
task_id="delete_dataset",
dataset_id=create_text_dataset_job.output["dataset_id"],
region=REGION,
project_id=PROJECT_ID,
)
To get dataset you can use
GetDatasetOperator
.
get_dataset = GetDatasetOperator(
task_id="get_dataset",
project_id=PROJECT_ID,
region=REGION,
dataset_id=create_tabular_dataset_job.output["dataset_id"],
)
To get a dataset list you can use
ListDatasetsOperator
.
list_dataset_job = ListDatasetsOperator(
task_id="list_dataset",
region=REGION,
project_id=PROJECT_ID,
)
To update dataset you can use
UpdateDatasetOperator
.
update_dataset_job = UpdateDatasetOperator(
task_id="update_dataset",
project_id=PROJECT_ID,
region=REGION,
dataset_id=create_video_dataset_job.output["dataset_id"],
dataset=DATASET_TO_UPDATE,
update_mask=TEST_UPDATE_MASK,
)
Creating a Training Jobs¶
To create a Google Vertex AI training jobs you have three operators
CreateCustomContainerTrainingJobOperator
,
CreateCustomPythonPackageTrainingJobOperator
,
CreateCustomTrainingJobOperator
.
Each of them will wait for the operation to complete. The results of each operator will be a model
which was trained by user using these operators.
Preparation step
For each operator you must prepare and create dataset. Then put dataset id to dataset_id
parameter in operator.
How to run Container Training Job
CreateCustomContainerTrainingJobOperator
Before start running this Job you should create a docker image with training script inside. Documentation how to
create image you can find by this link: https://cloud.google.com/vertex-ai/docs/training/create-custom-container
After that you should put link to the image in container_uri
parameter. Also you can type executing command
for container which will be created from this image in command
parameter.
create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
task_id="custom_container_task",
staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
display_name=CONTAINER_DISPLAY_NAME,
container_uri=CUSTOM_CONTAINER_URI,
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
command=["python3", "task.py"],
model_display_name=MODEL_DISPLAY_NAME,
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
)
How to run Python Package Training Job
CreateCustomPythonPackageTrainingJobOperator
Before start running this Job you should create a python package with training script inside. Documentation how to
create you can find by this link: https://cloud.google.com/vertex-ai/docs/training/create-python-pre-built-container
Next you should put link to the package in python_package_gcs_uri
parameter, also python_module_name
parameter should has the name of script which will run your training task.
create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
task_id="python_package_task",
staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
display_name=PACKAGE_DISPLAY_NAME,
python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
python_module_name=PYTHON_MODULE_NAME,
container_uri=CONTAINER_URI,
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
replica_count=REPLICA_COUNT,
machine_type=MACHINE_TYPE,
accelerator_type=ACCELERATOR_TYPE,
accelerator_count=ACCELERATOR_COUNT,
training_fraction_split=TRAINING_FRACTION_SPLIT,
validation_fraction_split=VALIDATION_FRACTION_SPLIT,
test_fraction_split=TEST_FRACTION_SPLIT,
region=REGION,
project_id=PROJECT_ID,
)
How to run Training Job
CreateCustomTrainingJobOperator
.
For this Job you should put path to your local training script inside script_path
parameter.
create_custom_training_job = CreateCustomTrainingJobOperator(
task_id="custom_task",
staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
display_name=CUSTOM_DISPLAY_NAME,
script_path=LOCAL_TRAINING_SCRIPT_PATH,
container_uri=CONTAINER_URI,
requirements=["gcsfs==0.7.1"],
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
# run params
dataset_id=tabular_dataset_id,
replica_count=REPLICA_COUNT,
model_display_name=MODEL_DISPLAY_NAME,
sync=False,
region=REGION,
project_id=PROJECT_ID,
)
model_id_v1 = create_custom_training_job.output["model_id"]
Additionally, you can create new version of existing Training Job instead. In this case, the result will be new
version of existing Model instead of new Model created in Model Registry. This can be done by specifying
parent_model
parameter when running Training Job.
create_custom_training_job_v2 = CreateCustomTrainingJobOperator(
task_id="custom_task_v2",
staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
display_name=CUSTOM_DISPLAY_NAME,
script_path=LOCAL_TRAINING_SCRIPT_PATH,
container_uri=CONTAINER_URI,
requirements=["gcsfs==0.7.1"],
model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
parent_model=model_id_v1,
# run params
dataset_id=tabular_dataset_id,
replica_count=REPLICA_COUNT,
model_display_name=MODEL_DISPLAY_NAME,
sync=False,
region=REGION,
project_id=PROJECT_ID,
)
You can get a list of Training Jobs using
ListCustomTrainingJobOperator
.
list_custom_training_job = ListCustomTrainingJobOperator(
task_id="list_custom_training_job",
region=REGION,
project_id=PROJECT_ID,
)
If you wish to delete a Custom Training Job you can use
DeleteCustomTrainingJobOperator
.
delete_custom_training_job = DeleteCustomTrainingJobOperator(
task_id="delete_custom_training_job",
training_pipeline_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='training_id') }}",
custom_job_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='custom_job_id') }}",
region=REGION,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
Creating an AutoML Training Jobs¶
To create a Google Vertex AI Auto ML training jobs you have five operators
CreateAutoMLForecastingTrainingJobOperator
CreateAutoMLImageTrainingJobOperator
CreateAutoMLTabularTrainingJobOperator
CreateAutoMLTextTrainingJobOperator
CreateAutoMLVideoTrainingJobOperator
Each of them will wait for the operation to complete. The results of each operator will be a model
which was trained by user using these operators.
How to run AutoML Forecasting Training Job
CreateAutoMLForecastingTrainingJobOperator
Before start running this Job you must prepare and create TimeSeries
dataset. After that you should
put dataset id to dataset_id
parameter in operator.
create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
task_id="auto_ml_forecasting_task",
display_name=FORECASTING_DISPLAY_NAME,
optimization_objective="minimize-rmse",
column_specs=COLUMN_SPECS,
# run params
dataset_id=forecast_dataset_id,
target_column=TEST_TARGET_COLUMN,
time_column=TEST_TIME_COLUMN,
time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
available_at_forecast_columns=[TEST_TIME_COLUMN],
unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
time_series_attribute_columns=["city", "zip_code", "county"],
forecast_horizon=30,
context_window=30,
data_granularity_unit="day",
data_granularity_count=1,
weight_column=None,
budget_milli_node_hours=1000,
model_display_name=MODEL_DISPLAY_NAME,
predefined_split_column_name=None,
region=REGION,
project_id=PROJECT_ID,
)
How to run AutoML Image Training Job
CreateAutoMLImageTrainingJobOperator
Before start running this Job you must prepare and create Image
dataset. After that you should
put dataset id to dataset_id
parameter in operator.
create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
task_id="auto_ml_image_task",
display_name=IMAGE_DISPLAY_NAME,
dataset_id=image_dataset_id,
prediction_type="classification",
multi_label=False,
model_type="CLOUD",
training_fraction_split=0.6,
validation_fraction_split=0.2,
test_fraction_split=0.2,
budget_milli_node_hours=8000,
model_display_name=MODEL_DISPLAY_NAME,
disable_early_stopping=False,
region=REGION,
project_id=PROJECT_ID,
)
How to run AutoML Tabular Training Job
CreateAutoMLTabularTrainingJobOperator
Before start running this Job you must prepare and create Tabular
dataset. After that you should
put dataset id to dataset_id
parameter in operator.
create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
task_id="auto_ml_tabular_task",
display_name=TABULAR_DISPLAY_NAME,
optimization_prediction_type="classification",
column_transformations=COLUMN_TRANSFORMATIONS,
dataset_id=tabular_dataset_id,
target_column="Adopted",
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
model_display_name=MODEL_DISPLAY_NAME,
disable_early_stopping=False,
region=REGION,
project_id=PROJECT_ID,
)
How to run AutoML Text Training Job
CreateAutoMLTextTrainingJobOperator
Before start running this Job you must prepare and create Text
dataset. After that you should
put dataset id to dataset_id
parameter in operator.
create_auto_ml_text_training_job = CreateAutoMLTextTrainingJobOperator(
task_id="auto_ml_text_task",
display_name=TEXT_DISPLAY_NAME,
prediction_type="classification",
multi_label=False,
dataset_id=text_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
training_fraction_split=0.7,
validation_fraction_split=0.2,
test_fraction_split=0.1,
sync=True,
region=REGION,
project_id=PROJECT_ID,
)
How to run AutoML Video Training Job
CreateAutoMLVideoTrainingJobOperator
Before start running this Job you must prepare and create Video
dataset. After that you should
put dataset id to dataset_id
parameter in operator.
create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
task_id="auto_ml_video_task",
display_name=VIDEO_DISPLAY_NAME,
prediction_type="classification",
model_type="CLOUD",
dataset_id=video_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
region=REGION,
project_id=PROJECT_ID,
)
model_id_v1 = create_auto_ml_video_training_job.output["model_id"]
Additionally, you can create new version of existing AutoML Video Training Job. In this case, the result will be new
version of existing Model instead of new Model created in Model Registry. This can be done by specifying
parent_model
parameter when running AutoML Video Training Job.
create_auto_ml_video_training_job_v2 = CreateAutoMLVideoTrainingJobOperator(
task_id="auto_ml_video_v2_task",
display_name=VIDEO_DISPLAY_NAME,
prediction_type="classification",
model_type="CLOUD",
dataset_id=video_dataset_id,
model_display_name=MODEL_DISPLAY_NAME,
parent_model=model_id_v1,
region=REGION,
project_id=PROJECT_ID,
)
You can get a list of AutoML Training Jobs using
ListAutoMLTrainingJobOperator
.
list_auto_ml_training_job = ListAutoMLTrainingJobOperator(
task_id="list_auto_ml_training_job",
region=REGION,
project_id=PROJECT_ID,
)
If you wish to delete a Auto ML Training Job you can use
DeleteAutoMLTrainingJobOperator
.
delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
task_id="delete_auto_ml_forecasting_training_job",
training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_forecasting_task', "
"key='training_id') }}",
region=REGION,
project_id=PROJECT_ID,
)
Creating a Batch Prediction Jobs¶
To create a Google VertexAI Batch Prediction Job you can use
CreateBatchPredictionJobOperator
.
The operator returns batch prediction job id in XCom under batch_prediction_job_id
key.
create_batch_prediction_job = CreateBatchPredictionJobOperator(
task_id="create_batch_prediction_job",
job_display_name=JOB_DISPLAY_NAME,
model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
predictions_format="csv",
bigquery_source=BIGQUERY_SOURCE,
gcs_destination_prefix=GCS_DESTINATION_PREFIX,
model_parameters=MODEL_PARAMETERS,
region=REGION,
project_id=PROJECT_ID,
)
To delete batch prediction job you can use
DeleteBatchPredictionJobOperator
.
delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
task_id="delete_batch_prediction_job",
batch_prediction_job_id=create_batch_prediction_job.output["batch_prediction_job_id"],
region=REGION,
project_id=PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
To get a batch prediction job list you can use
ListBatchPredictionJobsOperator
.
list_batch_prediction_job = ListBatchPredictionJobsOperator(
task_id="list_batch_prediction_jobs",
region=REGION,
project_id=PROJECT_ID,
)
Creating an Endpoint Service¶
To create a Google VertexAI endpoint you can use
CreateEndpointOperator
.
The operator returns endpoint id in XCom under endpoint_id
key.
create_endpoint = CreateEndpointOperator(
task_id="create_endpoint",
endpoint=ENDPOINT_CONF,
region=REGION,
project_id=PROJECT_ID,
)
After creating an endpoint you can use it to deploy some model using
DeployModelOperator
.
deploy_model = DeployModelOperator(
task_id="deploy_model",
endpoint_id=create_endpoint.output["endpoint_id"],
deployed_model=DEPLOYED_MODEL,
traffic_split={"0": 100},
region=REGION,
project_id=PROJECT_ID,
)
To un deploy model you can use
UndeployModelOperator
.
undeploy_model = UndeployModelOperator(
task_id="undeploy_model",
endpoint_id=create_endpoint.output["endpoint_id"],
deployed_model_id=deploy_model.output["deployed_model_id"],
region=REGION,
project_id=PROJECT_ID,
)
To delete endpoint you can use
DeleteEndpointOperator
.
delete_endpoint = DeleteEndpointOperator(
task_id="delete_endpoint",
endpoint_id=create_endpoint.output["endpoint_id"],
region=REGION,
project_id=PROJECT_ID,
)
To get an endpoint list you can use
ListEndpointsOperator
.
list_endpoints = ListEndpointsOperator(
task_id="list_endpoints",
region=REGION,
project_id=PROJECT_ID,
)
Creating a Hyperparameter Tuning Jobs¶
To create a Google VertexAI hyperparameter tuning job you can use
CreateHyperparameterTuningJobOperator
.
The operator returns hyperparameter tuning job id in XCom under hyperparameter_tuning_job_id
key.
create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
task_id="create_hyperparameter_tuning_job",
staging_bucket=STAGING_BUCKET,
display_name=DISPLAY_NAME,
worker_pool_specs=WORKER_POOL_SPECS,
sync=False,
region=REGION,
project_id=PROJECT_ID,
parameter_spec=PARAM_SPECS,
metric_spec=METRIC_SPEC,
max_trial_count=15,
parallel_trial_count=3,
)
CreateHyperparameterTuningJobOperator
also supports deferrable mode:
create_hyperparameter_tuning_job_def = CreateHyperparameterTuningJobOperator(
task_id="create_hyperparameter_tuning_job_def",
staging_bucket=STAGING_BUCKET,
display_name=DISPLAY_NAME,
worker_pool_specs=WORKER_POOL_SPECS,
sync=False,
region=REGION,
project_id=PROJECT_ID,
parameter_spec=PARAM_SPECS,
metric_spec=METRIC_SPEC,
max_trial_count=15,
parallel_trial_count=3,
deferrable=True,
)
To delete hyperparameter tuning job you can use
DeleteHyperparameterTuningJobOperator
.
delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
task_id="delete_hyperparameter_tuning_job",
project_id=PROJECT_ID,
region=REGION,
hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
"task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
trigger_rule=TriggerRule.ALL_DONE,
)
To get hyperparameter tuning job you can use
GetHyperparameterTuningJobOperator
.
get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
task_id="get_hyperparameter_tuning_job",
project_id=PROJECT_ID,
region=REGION,
hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
"task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
)
To get a hyperparameter tuning job list you can use
ListHyperparameterTuningJobOperator
.
list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
task_id="list_hyperparameter_tuning_job",
region=REGION,
project_id=PROJECT_ID,
)
Creating a Model Service¶
To upload a Google VertexAI model you can use
UploadModelOperator
.
The operator returns model id in XCom under model_id
key.
upload_model = UploadModelOperator(
task_id="upload_model",
region=REGION,
project_id=PROJECT_ID,
model=MODEL_OBJ,
)
To export model you can use
ExportModelOperator
.
export_model = ExportModelOperator(
task_id="export_model",
project_id=PROJECT_ID,
region=REGION,
model_id=upload_model.output["model_id"],
output_config=MODEL_OUTPUT_CONFIG,
)
To delete model you can use
DeleteModelOperator
.
delete_model = DeleteModelOperator(
task_id="delete_model",
project_id=PROJECT_ID,
region=REGION,
model_id=upload_model.output["model_id"],
trigger_rule=TriggerRule.ALL_DONE,
)
To get a model list you can use
ListModelsOperator
.
list_models = ListModelsOperator(
task_id="list_models",
region=REGION,
project_id=PROJECT_ID,
)
To retrieve model by its ID you can use
GetModelOperator
.
get_model = GetModelOperator(
task_id="get_model", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)
To list all model versions you can use
ListModelVersionsOperator
.
list_model_versions = ListModelVersionsOperator(
task_id="list_model_versions", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)
To set a specific version of model as a default one you can use
SetDefaultVersionOnModelOperator
.
set_default_version = SetDefaultVersionOnModelOperator(
task_id="set_default_version",
project_id=PROJECT_ID,
region=REGION,
model_id=model_id_v2,
)
To add aliases to specific version of model you can use
AddVersionAliasesOnModelOperator
.
add_version_alias = AddVersionAliasesOnModelOperator(
task_id="add_version_alias",
project_id=PROJECT_ID,
region=REGION,
version_aliases=["new-version", "beta"],
model_id=model_id_v2,
)
To delete aliases from specific version of model you can use
DeleteVersionAliasesOnModelOperator
.
delete_version_alias = DeleteVersionAliasesOnModelOperator(
task_id="delete_version_alias",
project_id=PROJECT_ID,
region=REGION,
version_aliases=["new-version"],
model_id=model_id_v2,
)
To delete specific version of model you can use
DeleteModelVersionOperator
.
delete_model_version = DeleteModelVersionOperator(
task_id="delete_model_version",
project_id=PROJECT_ID,
region=REGION,
model_id=model_id_v1,
trigger_rule=TriggerRule.ALL_DONE,
)
Running a Pipeline Jobs¶
To run a Google VertexAI Pipeline Job you can use
RunPipelineJobOperator
.
The operator returns pipeline job id in XCom under pipeline_job_id
key.
run_pipeline_job = RunPipelineJobOperator(
task_id="run_pipeline_job",
display_name=DISPLAY_NAME,
template_path=TEMPLATE_PATH,
parameter_values=PARAMETER_VALUES,
region=REGION,
project_id=PROJECT_ID,
)
To delete pipeline job you can use
DeletePipelineJobOperator
.
delete_pipeline_job = DeletePipelineJobOperator(
task_id="delete_pipeline_job",
project_id=PROJECT_ID,
region=REGION,
pipeline_job_id="{{ task_instance.xcom_pull("
"task_ids='run_pipeline_job', key='pipeline_job_id') }}",
trigger_rule=TriggerRule.ALL_DONE,
)
To get pipeline job you can use
GetPipelineJobOperator
.
get_pipeline_job = GetPipelineJobOperator(
task_id="get_pipeline_job",
project_id=PROJECT_ID,
region=REGION,
pipeline_job_id="{{ task_instance.xcom_pull("
"task_ids='run_pipeline_job', key='pipeline_job_id') }}",
)
To get a pipeline job list you can use
ListPipelineJobOperator
.
list_pipeline_job = ListPipelineJobOperator(
task_id="list_pipeline_job",
region=REGION,
project_id=PROJECT_ID,
)