Source code for tests.system.providers.google.cloud.automl.example_automl_translation
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG that uses Google AutoML Translation services."""from__future__importannotationsimportosfromdatetimeimportdatetimefromtypingimportcast# The storage module cannot be imported yet https://github.com/googleapis/python-storage/issues/393fromgoogle.cloudimportstorage# type: ignore[attr-defined]fromairflow.decoratorsimporttaskfromairflow.models.dagimportDAGfromairflow.models.xcom_argimportXComArgfromairflow.providers.google.cloud.operators.automlimport(AutoMLCreateDatasetOperator,AutoMLDeleteDatasetOperator,AutoMLDeleteModelOperator,AutoMLImportDataOperator,AutoMLTrainModelOperator,)fromairflow.providers.google.cloud.operators.gcsimportGCSCreateBucketOperator,GCSDeleteBucketOperatorfromairflow.providers.google.cloud.transfers.gcs_to_gcsimportGCSToGCSOperatorfromairflow.utils.trigger_ruleimportTriggerRule
# Example DAG for AutoML TranslationwithDAG(DAG_ID,schedule="@once",start_date=datetime(2021,1,1),catchup=False,tags=["example","automl","translate"],)asdag:
@taskdefupload_csv_file_to_gcs():# download file into memorystorage_client=storage.Client()bucket=storage_client.bucket(RESOURCE_DATA_BUCKET)blob=bucket.blob(GCS_FILE_PATH)contents=blob.download_as_string().decode()# update memory contentupdated_contents=contents.replace("template-bucket",DATA_SAMPLE_GCS_BUCKET_NAME)# upload updated content to bucketdestination_bucket=storage_client.bucket(DATA_SAMPLE_GCS_BUCKET_NAME)destination_blob=destination_bucket.blob(f"automl/{CSV_FILE_NAME}")destination_blob.upload_from_string(updated_contents)upload_csv_file_to_gcs_task=upload_csv_file_to_gcs()copy_dataset_file=GCSToGCSOperator(task_id="copy_dataset_file",source_bucket=RESOURCE_DATA_BUCKET,source_object=f"automl/datasets/translate/{TSV_FILE_NAME}",destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,destination_object=f"automl/{TSV_FILE_NAME}",)create_dataset=AutoMLCreateDatasetOperator(task_id="create_dataset",dataset=DATASET,location=GCP_AUTOML_LOCATION)dataset_id=cast(str,XComArg(create_dataset,key="dataset_id"))import_dataset=AutoMLImportDataOperator(task_id="import_dataset",dataset_id=dataset_id,location=GCP_AUTOML_LOCATION,input_config=IMPORT_INPUT_CONFIG,)MODEL["dataset_id"]=dataset_idcreate_model=AutoMLTrainModelOperator(task_id="create_model",model=MODEL,location=GCP_AUTOML_LOCATION)model_id=cast(str,XComArg(create_model,key="model_id"))delete_model=AutoMLDeleteModelOperator(task_id="delete_model",model_id=model_id,location=GCP_AUTOML_LOCATION,project_id=GCP_PROJECT_ID,)delete_dataset=AutoMLDeleteDatasetOperator(task_id="delete_dataset",dataset_id=dataset_id,location=GCP_AUTOML_LOCATION,project_id=GCP_PROJECT_ID,)delete_bucket=GCSDeleteBucketOperator(task_id="delete_bucket",bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,trigger_rule=TriggerRule.ALL_DONE,)(# TEST SETUP[create_bucket>>upload_csv_file_to_gcs_task>>copy_dataset_file]# TEST BODY>>create_dataset>>import_dataset>>create_model# TEST TEARDOWN>>delete_dataset>>delete_model>>delete_bucket)fromtests.system.utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)