Source code for tests.system.providers.amazon.aws.example_batch
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportloggingfromdatetimeimportdatetimeimportboto3fromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.models.dagimportDAGfromairflow.providers.amazon.aws.operators.batchimportBatchCreateComputeEnvironmentOperator,BatchOperatorfromairflow.providers.amazon.aws.sensors.batchimport(BatchComputeEnvironmentSensor,BatchJobQueueSensor,BatchSensor,)fromairflow.utils.trigger_ruleimportTriggerRulefromtests.system.providers.amazon.aws.utilsimport(ENV_ID_KEY,SystemTestContextBuilder,prune_logs,split_string,)
# Only describe the job if a previous task failed, to help diagnose@task(trigger_rule=TriggerRule.ONE_FAILED)
[docs]defdescribe_job(job_id):client=boto3.client("batch")response=client.describe_jobs(jobs=[job_id])log.info("Describing the job %s for debugging purposes",job_id)log.info(response["jobs"])
env_id=test_context[ENV_ID_KEY]batch_job_name:str=f"{env_id}-test-job"batch_job_definition_name:str=f"{env_id}-test-job-definition"batch_job_compute_environment_name:str=f"{env_id}-test-job-compute-environment"batch_job_queue_name:str=f"{env_id}-test-job-queue"security_groups=split_string(test_context[SECURITY_GROUPS_KEY])subnets=split_string(test_context[SUBNETS_KEY])# [START howto_operator_batch_create_compute_environment]create_compute_environment=BatchCreateComputeEnvironmentOperator(task_id="create_compute_environment",compute_environment_name=batch_job_compute_environment_name,environment_type="MANAGED",state="ENABLED",compute_resources={"type":"FARGATE","maxvCpus":10,"securityGroupIds":security_groups,"subnets":subnets,},)# [END howto_operator_batch_create_compute_environment]# [START howto_sensor_batch_compute_environment]wait_for_compute_environment_valid=BatchComputeEnvironmentSensor(task_id="wait_for_compute_environment_valid",compute_environment=batch_job_compute_environment_name,)# [END howto_sensor_batch_compute_environment]wait_for_compute_environment_valid.poke_interval=1# [START howto_sensor_batch_job_queue]wait_for_job_queue_valid=BatchJobQueueSensor(task_id="wait_for_job_queue_valid",job_queue=batch_job_queue_name,)# [END howto_sensor_batch_job_queue]wait_for_job_queue_valid.poke_interval=1# [START howto_operator_batch]submit_batch_job=BatchOperator(task_id="submit_batch_job",job_name=batch_job_name,job_queue=batch_job_queue_name,job_definition=batch_job_definition_name,overrides=JOB_OVERRIDES,)# [END howto_operator_batch]# BatchOperator waits by default, setting as False to test the Sensor below.submit_batch_job.wait_for_completion=False# [START howto_sensor_batch]wait_for_batch_job=BatchSensor(task_id="wait_for_batch_job",job_id=submit_batch_job.output,)# [END howto_sensor_batch]wait_for_batch_job.poke_interval=10wait_for_compute_environment_disabled=BatchComputeEnvironmentSensor(task_id="wait_for_compute_environment_disabled",compute_environment=batch_job_compute_environment_name,poke_interval=1,)wait_for_job_queue_modified=BatchJobQueueSensor(task_id="wait_for_job_queue_modified",job_queue=batch_job_queue_name,poke_interval=1,)wait_for_job_queue_deleted=BatchJobQueueSensor(task_id="wait_for_job_queue_deleted",job_queue=batch_job_queue_name,treat_non_existing_as_deleted=True,poke_interval=10,)log_cleanup=prune_logs([# Format: ('log group name', 'log stream prefix')("/aws/batch/job",env_id)],)chain(# TEST SETUPtest_context,security_groups,subnets,create_job_definition(test_context[ROLE_ARN_KEY],batch_job_definition_name),# TEST BODYcreate_compute_environment,wait_for_compute_environment_valid,# ``create_job_queue`` is part of test setup but need the compute-environment to be created beforecreate_job_queue(batch_job_compute_environment_name,batch_job_queue_name),wait_for_job_queue_valid,submit_batch_job,wait_for_batch_job,# TEST TEARDOWNdescribe_job(submit_batch_job.output),disable_job_queue(batch_job_queue_name),wait_for_job_queue_modified,delete_job_queue(batch_job_queue_name),wait_for_job_queue_deleted,disable_compute_environment(batch_job_compute_environment_name),wait_for_compute_environment_disabled,delete_compute_environment(batch_job_compute_environment_name),delete_job_definition(batch_job_definition_name),log_cleanup,)fromtests.system.utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)