# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import TYPE_CHECKING
from airflow.providers.google.cloud.links.base import BaseGoogleLink
if TYPE_CHECKING :
from airflow.utils.context import Context
[docs] VERTEX_AI_BASE_LINK = "/vertex-ai"
[docs] VERTEX_AI_MODEL_LINK = (
VERTEX_AI_BASE_LINK + "/locations/ {region} /models/ {model_id} /deploy?project= {project_id} "
)
[docs] VERTEX_AI_MODEL_LIST_LINK = VERTEX_AI_BASE_LINK + "/models?project= {project_id} "
[docs] VERTEX_AI_MODEL_EXPORT_LINK = "/storage/browser/ {bucket_name} /model- {model_id} ?project= {project_id} "
[docs] VERTEX_AI_TRAINING_LINK = (
VERTEX_AI_BASE_LINK + "/locations/ {region} /training/ {training_id} /cpu?project= {project_id} "
)
[docs] VERTEX_AI_TRAINING_PIPELINES_LINK = VERTEX_AI_BASE_LINK + "/training/training-pipelines?project= {project_id} "
[docs] VERTEX_AI_DATASET_LINK = (
VERTEX_AI_BASE_LINK + "/locations/ {region} /datasets/ {dataset_id} /analyze?project= {project_id} "
)
[docs] VERTEX_AI_DATASET_LIST_LINK = VERTEX_AI_BASE_LINK + "/datasets?project= {project_id} "
[docs] VERTEX_AI_HYPERPARAMETER_TUNING_JOB_LIST_LINK = (
VERTEX_AI_BASE_LINK + "/training/hyperparameter-tuning-jobs?project= {project_id} "
)
[docs] VERTEX_AI_BATCH_PREDICTION_JOB_LINK = (
VERTEX_AI_BASE_LINK
+ "/locations/ {region} /batch-predictions/ {batch_prediction_job_id} ?project= {project_id} "
)
[docs] VERTEX_AI_BATCH_PREDICTION_JOB_LIST_LINK = VERTEX_AI_BASE_LINK + "/batch-predictions?project= {project_id} "
[docs] VERTEX_AI_ENDPOINT_LINK = (
VERTEX_AI_BASE_LINK + "/locations/ {region} /endpoints/ {endpoint_id} ?project= {project_id} "
)
[docs] VERTEX_AI_ENDPOINT_LIST_LINK = VERTEX_AI_BASE_LINK + "/endpoints?project= {project_id} "
[docs] VERTEX_AI_PIPELINE_JOB_LINK = (
VERTEX_AI_BASE_LINK + "/locations/ {region} /pipelines/runs/ {pipeline_id} ?project= {project_id} "
)
[docs] VERTEX_AI_PIPELINE_JOB_LIST_LINK = VERTEX_AI_BASE_LINK + "/pipelines/runs?project= {project_id} "
[docs] class VertexAIModelLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI Model link."""
[docs] name = "Vertex AI Model"
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
model_id : str ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIModelLink . key ,
value = {
"model_id" : model_id ,
"region" : task_instance . region ,
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIModelListLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI Models Link."""
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIModelListLink . key ,
value = {
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIModelExportLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI Model Export Link."""
@staticmethod
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIModelExportLink . key ,
value = {
"project_id" : task_instance . project_id ,
"model_id" : task_instance . model_id ,
"bucket_name" : VertexAIModelExportLink . extract_bucket_name ( task_instance . output_config ),
},
)
[docs] class VertexAITrainingLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI Training link."""
[docs] name = "Vertex AI Training"
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
training_id : str ,
):
task_instance . xcom_push (
context = context ,
key = VertexAITrainingLink . key ,
value = {
"training_id" : training_id ,
"region" : task_instance . region ,
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAITrainingPipelinesLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI Training Pipelines link."""
[docs] name = "Vertex AI Training Pipelines"
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
):
task_instance . xcom_push (
context = context ,
key = VertexAITrainingPipelinesLink . key ,
value = {
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIDatasetLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI Dataset link."""
@staticmethod
[docs] def persist ( context : Context , task_instance , dataset_id : str ):
task_instance . xcom_push (
context = context ,
key = VertexAIDatasetLink . key ,
value = {
"dataset_id" : dataset_id ,
"region" : task_instance . region ,
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIDatasetListLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI Datasets Link."""
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIDatasetListLink . key ,
value = {
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIHyperparameterTuningJobListLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI HyperparameterTuningJobs Link."""
[docs] name = "Hyperparameter Tuning Job List"
[docs] key = "hyperparameter_tuning_jobs_conf"
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIHyperparameterTuningJobListLink . key ,
value = {
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIBatchPredictionJobLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI BatchPredictionJob link."""
[docs] name = "Batch Prediction Job"
[docs] key = "batch_prediction_job_conf"
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
batch_prediction_job_id : str ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIBatchPredictionJobLink . key ,
value = {
"batch_prediction_job_id" : batch_prediction_job_id ,
"region" : task_instance . region ,
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIBatchPredictionJobListLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI BatchPredictionJobList link."""
[docs] name = "Batch Prediction Job List"
[docs] key = "batch_prediction_jobs_conf"
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIBatchPredictionJobListLink . key ,
value = {
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIEndpointLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI Endpoint link."""
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
endpoint_id : str ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIEndpointLink . key ,
value = {
"endpoint_id" : endpoint_id ,
"region" : task_instance . region ,
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIEndpointListLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI EndpointList link."""
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIEndpointListLink . key ,
value = {
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIPipelineJobLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI PipelineJob link."""
[docs] key = "pipeline_job_conf"
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
pipeline_id : str ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIPipelineJobLink . key ,
value = {
"pipeline_id" : pipeline_id ,
"region" : task_instance . region ,
"project_id" : task_instance . project_id ,
},
)
[docs] class VertexAIPipelineJobListLink ( BaseGoogleLink ):
"""Helper class for constructing Vertex AI PipelineJobList link."""
[docs] name = "Pipeline Job List"
[docs] key = "pipeline_job_list_conf"
@staticmethod
[docs] def persist (
context : Context ,
task_instance ,
):
task_instance . xcom_push (
context = context ,
key = VertexAIPipelineJobListLink . key ,
value = {
"project_id" : task_instance . project_id ,
},
)
Copy to clipboard