Source code for tests.system.google.cloud.video_intelligence.example_video_intelligence
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG that demonstrates operators for the Google Cloud Video Intelligence service in the GoogleCloud Platform.This DAG relies on the following OS environment variables:* BUCKET_NAME - Google Cloud Storage bucket where the file exists."""from__future__importannotationsimportosfromdatetimeimportdatetimefromgoogle.api_core.retryimportRetryfromairflow.models.baseoperatorimportchainfromairflow.models.dagimportDAGfromairflow.providers.google.cloud.operators.gcsimportGCSCreateBucketOperator,GCSDeleteBucketOperatorfromairflow.providers.google.cloud.operators.video_intelligenceimport(CloudVideoIntelligenceDetectVideoExplicitContentOperator,CloudVideoIntelligenceDetectVideoLabelsOperator,CloudVideoIntelligenceDetectVideoShotsOperator,)fromairflow.providers.google.cloud.transfers.gcs_to_gcsimportGCSToGCSOperatorfromairflow.providers.standard.operators.bashimportBashOperatorfromairflow.utils.trigger_ruleimportTriggerRule
copy_single_file=GCSToGCSOperator(task_id="copy_single_gcs_file",source_bucket=BUCKET_NAME_SRC,source_object=PATH_SRC,destination_bucket=BUCKET_NAME_DST,destination_object=FILE_NAME,)# [START howto_operator_video_intelligence_detect_labels]detect_video_label=CloudVideoIntelligenceDetectVideoLabelsOperator(input_uri=INPUT_URI,output_uri=None,video_context=None,timeout=5,task_id="detect_video_label",)# [END howto_operator_video_intelligence_detect_labels]# [START howto_operator_video_intelligence_detect_labels_result]detect_video_label_result=BashOperator(bash_command="echo {{ task_instance.xcom_pull('detect_video_label')""['annotationResults'][0]['shotLabelAnnotations'][0]['entity']}}",task_id="detect_video_label_result",)# [END howto_operator_video_intelligence_detect_labels_result]# [START howto_operator_video_intelligence_detect_explicit_content]detect_video_explicit_content=CloudVideoIntelligenceDetectVideoExplicitContentOperator(input_uri=INPUT_URI,output_uri=None,video_context=None,retry=Retry(maximum=10.0),timeout=5,task_id="detect_video_explicit_content",)# [END howto_operator_video_intelligence_detect_explicit_content]# [START howto_operator_video_intelligence_detect_explicit_content_result]detect_video_explicit_content_result=BashOperator(bash_command="echo {{ task_instance.xcom_pull('detect_video_explicit_content')""['annotationResults'][0]['explicitAnnotation']['frames'][0]}}",task_id="detect_video_explicit_content_result",)# [END howto_operator_video_intelligence_detect_explicit_content_result]# [START howto_operator_video_intelligence_detect_video_shots]detect_video_shots=CloudVideoIntelligenceDetectVideoShotsOperator(input_uri=INPUT_URI,output_uri=None,video_context=None,retry=Retry(maximum=10.0),timeout=5,task_id="detect_video_shots",)# [END howto_operator_video_intelligence_detect_video_shots]# [START howto_operator_video_intelligence_detect_video_shots_result]detect_video_shots_result=BashOperator(bash_command="echo {{ task_instance.xcom_pull('detect_video_shots')""['annotationResults'][0]['shotAnnotations'][0]}}",task_id="detect_video_shots_result",)# [END howto_operator_video_intelligence_detect_video_shots_result]delete_bucket=GCSDeleteBucketOperator(task_id="delete_bucket",bucket_name=BUCKET_NAME_DST,trigger_rule=TriggerRule.ALL_DONE)chain(# TEST SETUPcreate_bucket,copy_single_file,# TEST BODYdetect_video_label,detect_video_label_result,detect_video_explicit_content,detect_video_explicit_content_result,detect_video_shots,detect_video_shots_result,# TEST TEARDOWNdelete_bucket,)fromdev.tests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromdev.tests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)