airflow.providers.amazon.aws.hooks.batch_client
¶
A client for AWS batch services
See also
Module Contents¶
-
class
airflow.providers.amazon.aws.hooks.batch_client.
AwsBatchProtocol
[source]¶ Bases:
airflow.typing_compat.Protocol
A structured Protocol for
boto3.client('batch') -> botocore.client.Batch
. This is used for type hints onAwsBatchClient.client()
; it covers only the subset of client methods required.See also
-
describe_jobs
(self, jobs: List[str])[source]¶ Get job descriptions from AWS batch
- Parameters
jobs (List[str]) – a list of JobId to describe
- Returns
an API response to describe jobs
- Return type
Dict
-
get_waiter
(self, waiterName: str)[source]¶ Get an AWS Batch service waiter
- Parameters
waiterName (str) – The name of the waiter. The name should match the name (including the casing) of the key name in the waiter model file (typically this is CamelCasing).
- Returns
a waiter object for the named AWS batch service
- Return type
botocore.waiter.Waiter
Note
AWS batch might not have any waiters (until botocore PR-1307 is released).
import boto3 boto3.client("batch").waiter_names == []
-
submit_job
(self, jobName: str, jobQueue: str, jobDefinition: str, arrayProperties: Dict, parameters: Dict, containerOverrides: Dict, tags: Dict)[source]¶ Submit a batch job
- Parameters
jobName (str) – the name for the AWS batch job
jobQueue (str) – the queue name on AWS Batch
jobDefinition (str) – the job definition name on AWS Batch
arrayProperties (Dict) – the same parameter that boto3 will receive
parameters (Dict) – the same parameter that boto3 will receive
containerOverrides (Dict) – the same parameter that boto3 will receive
tags (Dict) – the same parameter that boto3 will receive
- Returns
an API response
- Return type
Dict
-
-
class
airflow.providers.amazon.aws.hooks.batch_client.
AwsBatchClientHook
(*args, max_retries: Optional[int] = None, status_retries: Optional[int] = None, **kwargs)[source]¶ Bases:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook
A client for AWS batch services.
- Parameters
Note
Several methods use a default random delay to check or poll for job status, i.e.
random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX)
Using a random interval helps to avoid AWS API throttle limits when many concurrent tasks request job-descriptions.To modify the global defaults for the range of jitter allowed when a random delay is used to check batch job status, modify these defaults, e.g.: .. code-block:
AwsBatchClient.DEFAULT_DELAY_MIN = 0 AwsBatchClient.DEFAULT_DELAY_MAX = 5
When explicit delay values are used, a 1 second random jitter is applied to the delay (e.g. a delay of 0 sec will be a
random.uniform(0, 1)
delay. It is generally recommended that random jitter is added to API requests. A convenience method is provided for this, e.g. to get a random delay of 10 sec +/- 5 sec:delay = AwsBatchClient.add_jitter(10, width=5, minima=0)
See also
-
client
[source]¶ An AWS API client for batch services.
- Returns
a boto3 ‘batch’ client for the
.region_name
- Return type
Union[AwsBatchProtocol, botocore.client.BaseClient]
-
check_job_success
(self, job_id: str)[source]¶ Check the final status of the batch job; return True if the job ‘SUCCEEDED’, else raise an AirflowException
-
wait_for_job
(self, job_id: str, delay: Union[int, float, None] = None)[source]¶ Wait for batch job to complete
-
poll_for_job_running
(self, job_id: str, delay: Union[int, float, None] = None)[source]¶ Poll for job running. The status that indicates a job is running or already complete are: ‘RUNNING’|’SUCCEEDED’|’FAILED’.
So the status options that this will wait for are the transitions from: ‘SUBMITTED’>’PENDING’>’RUNNABLE’>’STARTING’>’RUNNING’|’SUCCEEDED’|’FAILED’
The completed status options are included for cases where the status changes too quickly for polling to detect a RUNNING status that moves quickly from STARTING to RUNNING to completed (often a failure).
-
poll_for_job_complete
(self, job_id: str, delay: Union[int, float, None] = None)[source]¶ Poll for job completion. The status that indicates job completion are: ‘SUCCEEDED’|’FAILED’.
So the status options that this will wait for are the transitions from: ‘SUBMITTED’>’PENDING’>’RUNNABLE’>’STARTING’>’RUNNING’>’SUCCEEDED’|’FAILED’
-
poll_job_status
(self, job_id: str, match_status: List[str])[source]¶ Poll for job status using an exponential back-off strategy (with max_retries).
-
get_job_description
(self, job_id: str)[source]¶ Get job description (using status_retries).
- Parameters
job_id (str) – a batch job ID
- Returns
an API response for describe jobs
- Return type
Dict
- Raises
AirflowException
-
static
parse_job_description
(job_id: str, response: Dict)[source]¶ Parse job description to extract description for job_id
- Parameters
job_id (str) – a batch job ID
response (Dict) – an API response for describe jobs
- Returns
an API response to describe job_id
- Return type
Dict
- Raises
AirflowException
-
static
add_jitter
(delay: Union[int, float], width: Union[int, float] = 1, minima: Union[int, float] = 0)[source]¶ Use delay +/- width for random jitter
Adding jitter to status polling can help to avoid AWS batch API limits for monitoring batch jobs with a high concurrency in Airflow tasks.
- Parameters
- Returns
uniform(delay - width, delay + width) jitter and it is a non-negative number
- Return type
-
static
delay
(delay: Union[int, float, None] = None)[source]¶ Pause execution for
delay
seconds.- Parameters
delay (Optional[Union[int, float]]) – a delay to pause execution using
time.sleep(delay)
; a small 1 second jitter is applied to the delay.
Note
This method uses a default random delay, i.e.
random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX)
; using a random interval helps to avoid AWS API throttle limits when many concurrent tasks request job-descriptions.
-
static
exponential_delay
(tries: int)[source]¶ An exponential back-off delay, with random jitter. There is a maximum interval of 10 minutes (with random jitter between 3 and 10 minutes). This is used in the
poll_for_job_status()
method.Examples of behavior:
def exp(tries): max_interval = 600.0 # 10 minutes in seconds delay = 1 + pow(tries * 0.6, 2) delay = min(max_interval, delay) print(delay / 3, delay) for tries in range(10): exp(tries) # 0.33 1.0 # 0.45 1.35 # 0.81 2.44 # 1.41 4.23 # 2.25 6.76 # 3.33 10.00 # 4.65 13.95 # 6.21 18.64 # 8.01 24.04 # 10.05 30.15