airflow.providers.amazon.aws.hooks.batch_waiters

AWS batch service waiters

Module Contents

class airflow.providers.amazon.aws.hooks.batch_waiters.AwsBatchWaitersHook(*args, waiter_config: Optional[Dict] = None, **kwargs)[source]

Bases: airflow.providers.amazon.aws.hooks.batch_client.AwsBatchClientHook

A utility to manage waiters for AWS batch services.

import random
from airflow.providers.amazon.aws.operators.batch_waiters import AwsBatchWaiters

# to inspect default waiters
waiters = AwsBatchWaiters()
config = waiters.default_config  # type: Dict
waiter_names = waiters.list_waiters()  # -> ["JobComplete", "JobExists", "JobRunning"]

# The default_config is a useful stepping stone to creating custom waiters, e.g.
custom_config = waiters.default_config  # this is a deepcopy
# modify custom_config['waiters'] as necessary and get a new instance:
waiters = AwsBatchWaiters(waiter_config=custom_config)
waiters.waiter_config  # check the custom configuration (this is a deepcopy)
waiters.list_waiters()  # names of custom waiters

# During the init for AwsBatchWaiters, the waiter_config is used to build a waiter_model;
# and note that this only occurs during the class init, to avoid any accidental mutations
# of waiter_config leaking into the waiter_model.
waiters.waiter_model  # -> botocore.waiter.WaiterModel object

# The waiter_model is combined with the waiters.client to get a specific waiter
# and the details of the config on that waiter can be further modified without any
# accidental impact on the generation of new waiters from the defined waiter_model, e.g.
waiters.get_waiter("JobExists").config.delay  # -> 5
waiter = waiters.get_waiter(
    "JobExists"
)  # -> botocore.waiter.Batch.Waiter.JobExists object
waiter.config.delay = 10
waiters.get_waiter("JobExists").config.delay  # -> 5 as defined by waiter_model

# To use a specific waiter, update the config and call the `wait()` method for jobId, e.g.
waiter = waiters.get_waiter(
    "JobExists"
)  # -> botocore.waiter.Batch.Waiter.JobExists object
waiter.config.delay = random.uniform(1, 10)  # seconds
waiter.config.max_attempts = 10
waiter.wait(jobs=[jobId])
Parameters
  • waiter_config (Optional[Dict]) -- a custom waiter configuration for AWS batch services

  • aws_conn_id (Optional[str]) -- connection id of AWS credentials / region name. If None, credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).

  • region_name (Optional[str]) -- region name to use in AWS client. Override the AWS region in connection (if provided)

default_config[source]

An immutable default waiter configuration

Returns

a waiter configuration for AWS batch services

Return type

Dict

waiter_config[source]

An immutable waiter configuration for this instance; a deepcopy is returned by this property. During the init for AwsBatchWaiters, the waiter_config is used to build a waiter_model and this only occurs during the class init, to avoid any accidental mutations of waiter_config leaking into the waiter_model.

Returns

a waiter configuration for AWS batch services

Return type

Dict

waiter_model[source]

A configured waiter model used to generate waiters on AWS batch services.

Returns

a waiter model for AWS batch services

Return type

botocore.waiter.WaiterModel

get_waiter(self, waiter_name: str)[source]

Get an AWS Batch service waiter, using the configured .waiter_model.

The .waiter_model is combined with the .client to get a specific waiter and the properties of that waiter can be modified without any accidental impact on the generation of new waiters from the .waiter_model, e.g.

waiters.get_waiter("JobExists").config.delay  # -> 5
waiter = waiters.get_waiter("JobExists")  # a new waiter object
waiter.config.delay = 10
waiters.get_waiter("JobExists").config.delay  # -> 5 as defined by waiter_model

To use a specific waiter, update the config and call the wait() method for jobId, e.g.

import random

waiter = waiters.get_waiter("JobExists")  # a new waiter object
waiter.config.delay = random.uniform(1, 10)  # seconds
waiter.config.max_attempts = 10
waiter.wait(jobs=[jobId])
Parameters

waiter_name (str) -- The name of the waiter. The name should match the name (including the casing) of the key name in the waiter model file (typically this is CamelCasing); see .list_waiters.

Returns

a waiter object for the named AWS batch service

Return type

botocore.waiter.Waiter

list_waiters(self)[source]

List the waiters in a waiter configuration for AWS Batch services.

Returns

waiter names for AWS batch services

Return type

List[str]

wait_for_job(self, job_id: str, delay: Union[int, float, None] = None)[source]

Wait for batch job to complete. This assumes that the .waiter_model is configured using some variation of the .default_config so that it can generate waiters with the following names: "JobExists", "JobRunning" and "JobComplete".

Parameters
  • job_id (str) -- a batch job ID

  • delay (Union[int, float, None]) -- A delay before polling for job status

Raises

AirflowException

Note

This method adds a small random jitter to the delay (+/- 2 sec, >= 1 sec). Using a random interval helps to avoid AWS API throttle limits when many concurrent tasks request job-descriptions.

It also modifies the max_attempts to use the sys.maxsize, which allows Airflow to manage the timeout on waiting.

Was this entry helpful?