Source code for tests.system.providers.amazon.aws.example_batch
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromdatetimeimportdatetimeimportboto3fromairflowimportDAGfromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.providers.amazon.aws.operators.batchimportBatchCreateComputeEnvironmentOperator,BatchOperatorfromairflow.providers.amazon.aws.sensors.batchimport(BatchComputeEnvironmentSensor,BatchJobQueueSensor,BatchSensor,)fromairflow.utils.trigger_ruleimportTriggerRulefromtests.system.providers.amazon.aws.utilsimportENV_ID_KEY,SystemTestContextBuilder,split_string
batch_job_name:str=f'{test_context[ENV_ID_KEY]}-test-job'batch_job_definition_name:str=f'{test_context[ENV_ID_KEY]}-test-job-definition'batch_job_compute_environment_name:str=f'{test_context[ENV_ID_KEY]}-test-job-compute-environment'batch_job_queue_name:str=f'{test_context[ENV_ID_KEY]}-test-job-queue'security_groups=split_string(test_context[SECURITY_GROUPS_KEY])subnets=split_string(test_context[SUBNETS_KEY])# [START howto_operator_batch_create_compute_environment]create_compute_environment=BatchCreateComputeEnvironmentOperator(task_id='create_compute_environment',compute_environment_name=batch_job_compute_environment_name,environment_type='MANAGED',state='ENABLED',compute_resources={'type':'FARGATE','maxvCpus':10,'securityGroupIds':security_groups,'subnets':subnets,},)# [END howto_operator_batch_create_compute_environment]# [START howto_sensor_batch_compute_environment]wait_for_compute_environment_valid=BatchComputeEnvironmentSensor(task_id='wait_for_compute_environment_valid',compute_environment=batch_job_compute_environment_name,)# [END howto_sensor_batch_compute_environment]# [START howto_sensor_batch_job_queue]wait_for_job_queue_valid=BatchJobQueueSensor(task_id='wait_for_job_queue_valid',job_queue=batch_job_queue_name,)# [END howto_sensor_batch_job_queue]# [START howto_operator_batch]submit_batch_job=BatchOperator(task_id='submit_batch_job',job_name=batch_job_name,job_queue=batch_job_queue_name,job_definition=batch_job_definition_name,overrides=JOB_OVERRIDES,# Set this flag to False, so we can test the sensor belowwait_for_completion=False,)# [END howto_operator_batch]# [START howto_sensor_batch]wait_for_batch_job=BatchSensor(task_id='wait_for_batch_job',job_id=submit_batch_job.output,)# [END howto_sensor_batch]wait_for_compute_environment_disabled=BatchComputeEnvironmentSensor(task_id='wait_for_compute_environment_disabled',compute_environment=batch_job_compute_environment_name,)wait_for_job_queue_modified=BatchJobQueueSensor(task_id='wait_for_job_queue_modified',job_queue=batch_job_queue_name,)wait_for_job_queue_deleted=BatchJobQueueSensor(task_id='wait_for_job_queue_deleted',job_queue=batch_job_queue_name,treat_non_existing_as_deleted=True,)chain(# TEST SETUPtest_context,security_groups,subnets,create_job_definition(test_context[ROLE_ARN_KEY],batch_job_definition_name),# TEST BODYcreate_compute_environment,wait_for_compute_environment_valid,# ``create_job_queue`` is part of test setup but need the compute-environment to be created beforecreate_job_queue(batch_job_compute_environment_name,batch_job_queue_name),wait_for_job_queue_valid,submit_batch_job,wait_for_batch_job,# TEST TEARDOWNdisable_job_queue(batch_job_queue_name),wait_for_job_queue_modified,delete_job_queue(batch_job_queue_name),wait_for_job_queue_deleted,disable_compute_environment(batch_job_compute_environment_name),wait_for_compute_environment_disabled,delete_compute_environment(batch_job_compute_environment_name),delete_job_definition(batch_job_definition_name),)fromtests.system.utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)