airflow.providers.amazon.aws.hooks.batch_client

A client for AWS batch services

Module Contents

class airflow.providers.amazon.aws.hooks.batch_client.AwsBatchProtocol[source]

Bases: airflow.typing_compat.Protocol

A structured Protocol for boto3.client('batch') -> botocore.client.Batch. This is used for type hints on AwsBatchClient.client(); it covers only the subset of client methods required.

describe_jobs(self, jobs: List[str])[source]

Get job descriptions from AWS batch

Parameters

jobs (List[str]) -- a list of JobId to describe

Returns

an API response to describe jobs

Return type

Dict

get_waiter(self, waiterName: str)[source]

Get an AWS Batch service waiter

Parameters

waiterName (str) -- The name of the waiter. The name should match the name (including the casing) of the key name in the waiter model file (typically this is CamelCasing).

Returns

a waiter object for the named AWS batch service

Return type

botocore.waiter.Waiter

Note

AWS batch might not have any waiters (until botocore PR-1307 is released).

import boto3
boto3.client('batch').waiter_names == []
submit_job(self, jobName: str, jobQueue: str, jobDefinition: str, arrayProperties: Dict, parameters: Dict, containerOverrides: Dict)[source]

Submit a batch job

Parameters
  • jobName (str) -- the name for the AWS batch job

  • jobQueue (str) -- the queue name on AWS Batch

  • jobDefinition (str) -- the job definition name on AWS Batch

  • arrayProperties (Dict) -- the same parameter that boto3 will receive

  • parameters (Dict) -- the same parameter that boto3 will receive

  • containerOverrides (Dict) -- the same parameter that boto3 will receive

Returns

an API response

Return type

Dict

terminate_job(self, jobId: str, reason: str)[source]

Terminate a batch job

Parameters
  • jobId (str) -- a job ID to terminate

  • reason (str) -- a reason to terminate job ID

Returns

an API response

Return type

Dict

class airflow.providers.amazon.aws.hooks.batch_client.AwsBatchClientHook(*args, max_retries: Optional[int] = None, status_retries: Optional[int] = None, **kwargs)[source]

Bases: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

A client for AWS batch services.

Parameters
  • max_retries (Optional[int]) -- exponential back-off retries, 4200 = 48 hours; polling is only used when waiters is None

  • status_retries (Optional[int]) -- number of HTTP retries to get job status, 10; polling is only used when waiters is None

Note

Several methods use a default random delay to check or poll for job status, i.e. random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX) Using a random interval helps to avoid AWS API throttle limits when many concurrent tasks request job-descriptions.

To modify the global defaults for the range of jitter allowed when a random delay is used to check batch job status, modify these defaults, e.g.: .. code-block:

AwsBatchClient.DEFAULT_DELAY_MIN = 0
AwsBatchClient.DEFAULT_DELAY_MAX = 5

When explict delay values are used, a 1 second random jitter is applied to the delay (e.g. a delay of 0 sec will be a random.uniform(0, 1) delay. It is generally recommended that random jitter is added to API requests. A convenience method is provided for this, e.g. to get a random delay of 10 sec +/- 5 sec: delay = AwsBatchClient.add_jitter(10, width=5, minima=0)

MAX_RETRIES = 4200[source]
STATUS_RETRIES = 10[source]
DEFAULT_DELAY_MIN = 1[source]
DEFAULT_DELAY_MAX = 10[source]
client[source]

An AWS API client for batch services, like boto3.client('batch')

Returns

a boto3 'batch' client for the .region_name

Return type

Union[AwsBatchProtocol, botocore.client.BaseClient]

terminate_job(self, job_id: str, reason: str)[source]

Terminate a batch job

Parameters
  • job_id (str) -- a job ID to terminate

  • reason (str) -- a reason to terminate job ID

Returns

an API response

Return type

Dict

check_job_success(self, job_id: str)[source]

Check the final status of the batch job; return True if the job 'SUCCEEDED', else raise an AirflowException

Parameters

job_id (str) -- a batch job ID

Return type

bool

Raises

AirflowException

wait_for_job(self, job_id: str, delay: Union[int, float, None] = None)[source]

Wait for batch job to complete

Parameters
  • job_id (str) -- a batch job ID

  • delay (Optional[Union[int, float]]) -- a delay before polling for job status

Raises

AirflowException

poll_for_job_running(self, job_id: str, delay: Union[int, float, None] = None)[source]

Poll for job running. The status that indicates a job is running or already complete are: 'RUNNING'|'SUCCEEDED'|'FAILED'.

So the status options that this will wait for are the transitions from: 'SUBMITTED'>'PENDING'>'RUNNABLE'>'STARTING'>'RUNNING'|'SUCCEEDED'|'FAILED'

The completed status options are included for cases where the status changes too quickly for polling to detect a RUNNING status that moves quickly from STARTING to RUNNING to completed (often a failure).

Parameters
  • job_id (str) -- a batch job ID

  • delay (Optional[Union[int, float]]) -- a delay before polling for job status

Raises

AirflowException

poll_for_job_complete(self, job_id: str, delay: Union[int, float, None] = None)[source]

Poll for job completion. The status that indicates job completion are: 'SUCCEEDED'|'FAILED'.

So the status options that this will wait for are the transitions from: 'SUBMITTED'>'PENDING'>'RUNNABLE'>'STARTING'>'RUNNING'>'SUCCEEDED'|'FAILED'

Parameters
  • job_id (str) -- a batch job ID

  • delay (Optional[Union[int, float]]) -- a delay before polling for job status

Raises

AirflowException

poll_job_status(self, job_id: str, match_status: List[str])[source]

Poll for job status using an exponential back-off strategy (with max_retries).

Parameters
  • job_id (str) -- a batch job ID

  • match_status (List[str]) -- a list of job status to match; the batch job status are: 'SUBMITTED'|'PENDING'|'RUNNABLE'|'STARTING'|'RUNNING'|'SUCCEEDED'|'FAILED'

Return type

bool

Raises

AirflowException

get_job_description(self, job_id: str)[source]

Get job description (using status_retries).

Parameters

job_id (str) -- a batch job ID

Returns

an API response for describe jobs

Return type

Dict

Raises

AirflowException

static parse_job_description(job_id: str, response: Dict)[source]

Parse job description to extract description for job_id

Parameters
  • job_id (str) -- a batch job ID

  • response (Dict) -- an API response for describe jobs

Returns

an API response to describe job_id

Return type

Dict

Raises

AirflowException

static add_jitter(delay: Union[int, float], width: Union[int, float] = 1, minima: Union[int, float] = 0)[source]

Use delay +/- width for random jitter

Adding jitter to status polling can help to avoid AWS batch API limits for monitoring batch jobs with a high concurrency in Airflow tasks.

Parameters
  • delay (Union[int, float]) -- number of seconds to pause; delay is assumed to be a positive number

  • width (Union[int, float]) -- delay +/- width for random jitter; width is assumed to be a positive number

  • minima (Union[int, float]) -- minimum delay allowed; minima is assumed to be a non-negative number

Returns

uniform(delay - width, delay + width) jitter and it is a non-negative number

Return type

float

static delay(delay: Union[int, float, None] = None)[source]

Pause execution for delay seconds.

Parameters

delay (Optional[Union[int, float]]) -- a delay to pause execution using time.sleep(delay); a small 1 second jitter is applied to the delay.

Note

This method uses a default random delay, i.e. random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX); using a random interval helps to avoid AWS API throttle limits when many concurrent tasks request job-descriptions.

static exponential_delay(tries: int)[source]

An exponential back-off delay, with random jitter. There is a maximum interval of 10 minutes (with random jitter between 3 and 10 minutes). This is used in the poll_for_job_status() method.

Parameters

tries (int) -- Number of tries

Return type

float

Examples of behavior:

def exp(tries):
    max_interval = 600.0  # 10 minutes in seconds
    delay = 1 + pow(tries * 0.6, 2)
    delay = min(max_interval, delay)
    print(delay / 3, delay)

for tries in range(10):
    exp(tries)

#  0.33  1.0
#  0.45  1.35
#  0.81  2.44
#  1.41  4.23
#  2.25  6.76
#  3.33 10.00
#  4.65 13.95
#  6.21 18.64
#  8.01 24.04
# 10.05 30.15

Was this entry helpful?