Source code for tests.system.google.cloud.vision.example_vision_annotate_image
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportosfromdatetimeimportdatetimefromairflow.models.baseoperatorimportchainfromairflow.models.dagimportDAGfromairflow.providers.google.cloud.operators.gcsimportGCSCreateBucketOperator,GCSDeleteBucketOperatorfromairflow.providers.google.cloud.operators.visionimport(CloudVisionDetectImageLabelsOperator,CloudVisionDetectImageSafeSearchOperator,CloudVisionDetectTextOperator,CloudVisionImageAnnotateOperator,CloudVisionTextDetectOperator,)fromairflow.providers.google.cloud.transfers.gcs_to_gcsimportGCSToGCSOperatorfromairflow.providers.standard.operators.bashimportBashOperatorfromairflow.utils.trigger_ruleimportTriggerRule# [START howto_operator_vision_retry_import]fromgoogle.api_core.retryimportRetry# isort:skip# [END howto_operator_vision_retry_import]# [START howto_operator_vision_enums_import]fromgoogle.cloud.vision_v1importFeature# isort:skipfromproviders.tests.system.googleimportDEFAULT_GCP_SYSTEM_TEST_PROJECT_ID# [END howto_operator_vision_enums_import]
copy_single_file=GCSToGCSOperator(task_id="copy_single_gcs_file",source_bucket=BUCKET_NAME_SRC,source_object=[PATH_SRC],destination_bucket=BUCKET_NAME,destination_object=FILE_NAME,)# [START howto_operator_vision_annotate_image]annotate_image=CloudVisionImageAnnotateOperator(request=annotate_image_request,retry=Retry(maximum=10.0),timeout=5,task_id="annotate_image",)# [END howto_operator_vision_annotate_image]# [START howto_operator_vision_annotate_image_result]annotate_image_result=BashOperator(bash_command="echo {{ task_instance.xcom_pull('annotate_image')""['logoAnnotations'][0]['description'] }}",task_id="annotate_image_result",)# [END howto_operator_vision_annotate_image_result]# [START howto_operator_vision_detect_text]detect_text=CloudVisionDetectTextOperator(image=DETECT_IMAGE,retry=Retry(maximum=10.0),timeout=5,task_id="detect_text",language_hints="en",web_detection_params={"include_geo_results":True},)# [END howto_operator_vision_detect_text]# [START howto_operator_vision_detect_text_result]detect_text_result=BashOperator(bash_command="echo {{ task_instance.xcom_pull('detect_text')['textAnnotations'][0] }}",task_id="detect_text_result",)# [END howto_operator_vision_detect_text_result]# [START howto_operator_vision_document_detect_text]document_detect_text=CloudVisionTextDetectOperator(image=DETECT_IMAGE,retry=Retry(maximum=10.0),timeout=5,task_id="document_detect_text")# [END howto_operator_vision_document_detect_text]# [START howto_operator_vision_document_detect_text_result]document_detect_text_result=BashOperator(bash_command="echo {{ task_instance.xcom_pull('document_detect_text')['textAnnotations'][0] }}",task_id="document_detect_text_result",)# [END howto_operator_vision_document_detect_text_result]# [START howto_operator_vision_detect_labels]detect_labels=CloudVisionDetectImageLabelsOperator(image=DETECT_IMAGE,retry=Retry(maximum=10.0),timeout=5,task_id="detect_labels")# [END howto_operator_vision_detect_labels]# [START howto_operator_vision_detect_labels_result]detect_labels_result=BashOperator(bash_command="echo {{ task_instance.xcom_pull('detect_labels')['labelAnnotations'][0] }}",task_id="detect_labels_result",)# [END howto_operator_vision_detect_labels_result]# [START howto_operator_vision_detect_safe_search]detect_safe_search=CloudVisionDetectImageSafeSearchOperator(image=DETECT_IMAGE,retry=Retry(maximum=10.0),timeout=5,task_id="detect_safe_search")# [END howto_operator_vision_detect_safe_search]# [START howto_operator_vision_detect_safe_search_result]detect_safe_search_result=BashOperator(bash_command=f"echo {detect_safe_search.output}",task_id="detect_safe_search_result",)# [END howto_operator_vision_detect_safe_search_result]delete_bucket=GCSDeleteBucketOperator(task_id="delete_bucket",bucket_name=BUCKET_NAME,trigger_rule=TriggerRule.ALL_DONE)chain(# TEST SETUPcreate_bucket,copy_single_file,# TEST BODYannotate_image,annotate_image_result,detect_text,detect_text_result,document_detect_text,document_detect_text_result,detect_labels,detect_labels_result,detect_safe_search,detect_safe_search_result,# TEST TEARDOWNdelete_bucket,)fromtests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "teardown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)