airflow.providers.amazon.aws.hooks.batch_client

A client for AWS Batch services

Module Contents

Classes

BatchProtocol

A structured Protocol for boto3.client('batch') -> botocore.client.Batch.

BatchClientHook

Interact with AWS Batch.

class airflow.providers.amazon.aws.hooks.batch_client.BatchProtocol[source]

Bases: airflow.typing_compat.Protocol

A structured Protocol for boto3.client('batch') -> botocore.client.Batch. This is used for type hints on BatchClient.client(); it covers only the subset of client methods required.

describe_jobs(jobs)[source]

Get job descriptions from AWS Batch

Parameters

jobs (list[str]) – a list of JobId to describe

Returns

an API response to describe jobs

Return type

dict

get_waiter(waiterName)[source]

Get an AWS Batch service waiter

Parameters

waiterName (str) – The name of the waiter. The name should match the name (including the casing) of the key name in the waiter model file (typically this is CamelCasing).

Returns

a waiter object for the named AWS Batch service

Return type

botocore.waiter.Waiter

Note

AWS Batch might not have any waiters (until botocore PR-1307 is released).

import boto3

boto3.client("batch").waiter_names == []
submit_job(jobName, jobQueue, jobDefinition, arrayProperties, parameters, containerOverrides, tags)[source]

Submit a Batch job

Parameters
  • jobName (str) – the name for the AWS Batch job

  • jobQueue (str) – the queue name on AWS Batch

  • jobDefinition (str) – the job definition name on AWS Batch

  • arrayProperties (dict) – the same parameter that boto3 will receive

  • parameters (dict) – the same parameter that boto3 will receive

  • containerOverrides (dict) – the same parameter that boto3 will receive

  • tags (dict) – the same parameter that boto3 will receive

Returns

an API response

Return type

dict

terminate_job(jobId, reason)[source]

Terminate a Batch job

Parameters
  • jobId (str) – a job ID to terminate

  • reason (str) – a reason to terminate job ID

Returns

an API response

Return type

dict

class airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook(*args, max_retries=None, status_retries=None, **kwargs)[source]

Bases: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

Interact with AWS Batch. Provide thick wrapper around boto3.client("batch").

Parameters
  • max_retries (int | None) – exponential back-off retries, 4200 = 48 hours; polling is only used when waiters is None

  • status_retries (int | None) – number of HTTP retries to get job status, 10; polling is only used when waiters is None

Note

Several methods use a default random delay to check or poll for job status, i.e. random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX) Using a random interval helps to avoid AWS API throttle limits when many concurrent tasks request job-descriptions.

To modify the global defaults for the range of jitter allowed when a random delay is used to check Batch job status, modify these defaults, e.g.: .. code-block:

BatchClient.DEFAULT_DELAY_MIN = 0
BatchClient.DEFAULT_DELAY_MAX = 5

When explicit delay values are used, a 1 second random jitter is applied to the delay (e.g. a delay of 0 sec will be a random.uniform(0, 1) delay. It is generally recommended that random jitter is added to API requests. A convenience method is provided for this, e.g. to get a random delay of 10 sec +/- 5 sec: delay = BatchClient.add_jitter(10, width=5, minima=0)

Additional arguments (such as aws_conn_id) may be specified and are passed down to the underlying AwsBaseHook.

property client: BatchProtocol | botocore.client.BaseClient[source]

An AWS API client for Batch services.

Returns

a boto3 ‘batch’ client for the .region_name

Return type

BatchProtocol | botocore.client.BaseClient

MAX_RETRIES = 4200[source]
STATUS_RETRIES = 10[source]
DEFAULT_DELAY_MIN = 1[source]
DEFAULT_DELAY_MAX = 10[source]
FAILURE_STATE = 'FAILED'[source]
SUCCESS_STATE = 'SUCCEEDED'[source]
RUNNING_STATE = 'RUNNING'[source]
INTERMEDIATE_STATES = ('SUBMITTED', 'PENDING', 'RUNNABLE', 'STARTING')[source]
COMPUTE_ENVIRONMENT_TERMINAL_STATUS = ('VALID', 'DELETED')[source]
COMPUTE_ENVIRONMENT_INTERMEDIATE_STATUS = ('CREATING', 'UPDATING', 'DELETING')[source]
JOB_QUEUE_TERMINAL_STATUS = ('VALID', 'DELETED')[source]
JOB_QUEUE_INTERMEDIATE_STATUS = ('CREATING', 'UPDATING', 'DELETING')[source]
terminate_job(job_id, reason)[source]

Terminate a Batch job

Parameters
  • job_id (str) – a job ID to terminate

  • reason (str) – a reason to terminate job ID

Returns

an API response

Return type

dict

check_job_success(job_id)[source]

Check the final status of the Batch job; return True if the job ‘SUCCEEDED’, else raise an AirflowException

Parameters

job_id (str) – a Batch job ID

Raises

AirflowException

wait_for_job(job_id, delay=None)[source]

Wait for Batch job to complete

Parameters
  • job_id (str) – a Batch job ID

  • delay (int | float | None) – a delay before polling for job status

Raises

AirflowException

poll_for_job_running(job_id, delay=None)[source]

Poll for job running. The status that indicates a job is running or already complete are: ‘RUNNING’|’SUCCEEDED’|’FAILED’.

So the status options that this will wait for are the transitions from: ‘SUBMITTED’>’PENDING’>’RUNNABLE’>’STARTING’>’RUNNING’|’SUCCEEDED’|’FAILED’

The completed status options are included for cases where the status changes too quickly for polling to detect a RUNNING status that moves quickly from STARTING to RUNNING to completed (often a failure).

Parameters
  • job_id (str) – a Batch job ID

  • delay (int | float | None) – a delay before polling for job status

Raises

AirflowException

poll_for_job_complete(job_id, delay=None)[source]

Poll for job completion. The status that indicates job completion are: ‘SUCCEEDED’|’FAILED’.

So the status options that this will wait for are the transitions from: ‘SUBMITTED’>’PENDING’>’RUNNABLE’>’STARTING’>’RUNNING’>’SUCCEEDED’|’FAILED’

Parameters
  • job_id (str) – a Batch job ID

  • delay (int | float | None) – a delay before polling for job status

Raises

AirflowException

poll_job_status(job_id, match_status)[source]

Poll for job status using an exponential back-off strategy (with max_retries).

Parameters
  • job_id (str) – a Batch job ID

  • match_status (list[str]) – a list of job status to match; the Batch job status are: ‘SUBMITTED’|’PENDING’|’RUNNABLE’|’STARTING’|’RUNNING’|’SUCCEEDED’|’FAILED’

Raises

AirflowException

get_job_description(job_id)[source]

Get job description (using status_retries).

Parameters

job_id (str) – a Batch job ID

Returns

an API response for describe jobs

Raises

AirflowException

Return type

dict

static parse_job_description(job_id, response)[source]

Parse job description to extract description for job_id

Parameters
  • job_id (str) – a Batch job ID

  • response (dict) – an API response for describe jobs

Returns

an API response to describe job_id

Raises

AirflowException

Return type

dict

get_job_awslogs_info(job_id)[source]

Parse job description to extract AWS CloudWatch information.

Parameters

job_id (str) – AWS Batch Job ID

static add_jitter(delay, width=1, minima=0)[source]

Use delay +/- width for random jitter

Adding jitter to status polling can help to avoid AWS Batch API limits for monitoring Batch jobs with a high concurrency in Airflow tasks.

Parameters
  • delay (int | float) – number of seconds to pause; delay is assumed to be a positive number

  • width (int | float) – delay +/- width for random jitter; width is assumed to be a positive number

  • minima (int | float) – minimum delay allowed; minima is assumed to be a non-negative number

Returns

uniform(delay - width, delay + width) jitter and it is a non-negative number

Return type

float

static delay(delay=None)[source]

Pause execution for delay seconds.

Parameters

delay (int | float | None) – a delay to pause execution using time.sleep(delay); a small 1 second jitter is applied to the delay.

Note

This method uses a default random delay, i.e. random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX); using a random interval helps to avoid AWS API throttle limits when many concurrent tasks request job-descriptions.

static exponential_delay(tries)[source]

An exponential back-off delay, with random jitter. There is a maximum interval of 10 minutes (with random jitter between 3 and 10 minutes). This is used in the poll_for_job_status() method.

Parameters

tries (int) – Number of tries

Examples of behavior:

def exp(tries):
    max_interval = 600.0  # 10 minutes in seconds
    delay = 1 + pow(tries * 0.6, 2)
    delay = min(max_interval, delay)
    print(delay / 3, delay)


for tries in range(10):
    exp(tries)

#  0.33  1.0
#  0.45  1.35
#  0.81  2.44
#  1.41  4.23
#  2.25  6.76
#  3.33 10.00
#  4.65 13.95
#  6.21 18.64
#  8.01 24.04
# 10.05 30.15

Was this entry helpful?