airflow.providers.amazon.aws.operators.s3

This module contains AWS S3 operators.

Module Contents

Classes

S3CreateBucketOperator

This operator creates an S3 bucket.

S3DeleteBucketOperator

This operator deletes an S3 bucket.

S3GetBucketTaggingOperator

This operator gets tagging from an S3 bucket.

S3PutBucketTaggingOperator

This operator puts tagging for an S3 bucket.

S3DeleteBucketTaggingOperator

This operator deletes tagging from an S3 bucket.

S3CopyObjectOperator

Creates a copy of an object that is already stored in S3.

S3CreateObjectOperator

Creates a new object from data as string or bytes.

S3DeleteObjectsOperator

To enable users to delete single object or multiple objects from a bucket using a single HTTP request.

S3FileTransformOperator

Copies data from a source S3 location to a temporary location on the local filesystem.

S3ListOperator

List all objects from the bucket with the given string prefix in name.

S3ListPrefixesOperator

List all subfolders from the bucket with the given string prefix in name.

Attributes

BUCKET_DOES_NOT_EXIST_MSG

airflow.providers.amazon.aws.operators.s3.BUCKET_DOES_NOT_EXIST_MSG = "Bucket with name: %s doesn't exist"[source]
class airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator(*, bucket_name, aws_conn_id='aws_default', region_name=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

This operator creates an S3 bucket.

See also

For more information on how to use this operator, take a look at the guide: Create an Amazon S3 bucket

Parameters
  • bucket_name (str) – This is bucket name you want to create

  • aws_conn_id (str | None) – The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node).

  • region_name (str | None) – AWS region_name. If not specified fetched from connection.

template_fields: Sequence[str] = ('bucket_name',)[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.amazon.aws.operators.s3.S3DeleteBucketOperator(bucket_name, force_delete=False, aws_conn_id='aws_default', **kwargs)[source]

Bases: airflow.models.BaseOperator

This operator deletes an S3 bucket.

See also

For more information on how to use this operator, take a look at the guide: Delete an Amazon S3 bucket

Parameters
  • bucket_name (str) – This is bucket name you want to delete

  • force_delete (bool) – Forcibly delete all objects in the bucket before deleting the bucket

  • aws_conn_id (str | None) – The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node).

template_fields: Sequence[str] = ('bucket_name',)[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator(bucket_name, aws_conn_id='aws_default', **kwargs)[source]

Bases: airflow.models.BaseOperator

This operator gets tagging from an S3 bucket.

See also

For more information on how to use this operator, take a look at the guide: Get the tag of an Amazon S3 bucket

Parameters
  • bucket_name (str) – This is bucket name you want to reference

  • aws_conn_id (str | None) – The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node).

template_fields: Sequence[str] = ('bucket_name',)[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator(bucket_name, key=None, value=None, tag_set=None, aws_conn_id='aws_default', **kwargs)[source]

Bases: airflow.models.BaseOperator

This operator puts tagging for an S3 bucket.

See also

For more information on how to use this operator, take a look at the guide: Set the tags for an Amazon S3 bucket

Parameters
  • bucket_name (str) – The name of the bucket to add tags to.

  • key (str | None) – The key portion of the key/value pair for a tag to be added. If a key is provided, a value must be provided as well.

  • value (str | None) – The value portion of the key/value pair for a tag to be added. If a value is provided, a key must be provided as well.

  • tag_set (dict | list[dict[str, str]] | None) – A dictionary containing the tags, or a List of key/value pairs.

  • aws_conn_id (str | None) – The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then the default boto3 configuration would be used (and must be maintained on each worker node).

template_fields: Sequence[str] = ('bucket_name',)[source]
template_fields_renderers[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator(bucket_name, aws_conn_id='aws_default', **kwargs)[source]

Bases: airflow.models.BaseOperator

This operator deletes tagging from an S3 bucket.

See also

For more information on how to use this operator, take a look at the guide: Delete the tags of an Amazon S3 bucket

Parameters
  • bucket_name (str) – This is the name of the bucket to delete tags from.

  • aws_conn_id (str | None) – The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node).

template_fields: Sequence[str] = ('bucket_name',)[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.amazon.aws.operators.s3.S3CopyObjectOperator(*, source_bucket_key, dest_bucket_key, source_bucket_name=None, dest_bucket_name=None, source_version_id=None, aws_conn_id='aws_default', verify=None, acl_policy=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a copy of an object that is already stored in S3.

Note: the S3 connection used here needs to have access to both source and destination bucket/key.

See also

For more information on how to use this operator, take a look at the guide: Copy an Amazon S3 object

Parameters
  • source_bucket_key (str) –

    The key of the source object. (templated)

    It can be either full s3:// style url or relative path from root level.

    When it’s specified as a full s3:// url, please omit source_bucket_name.

  • dest_bucket_key (str) –

    The key of the object to copy to. (templated)

    The convention to specify dest_bucket_key is the same as source_bucket_key.

  • source_bucket_name (str | None) –

    Name of the S3 bucket where the source object is in. (templated)

    It should be omitted when source_bucket_key is provided as a full s3:// url.

  • dest_bucket_name (str | None) –

    Name of the S3 bucket to where the object is copied. (templated)

    It should be omitted when dest_bucket_key is provided as a full s3:// url.

  • source_version_id (str | None) – Version ID of the source object (OPTIONAL)

  • aws_conn_id (str | None) – Connection id of the S3 connection to use

  • verify (str | bool | None) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified.

    You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used,

      but SSL certificates will not be verified.

    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.

      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

  • acl_policy (str | None) – String specifying the canned ACL policy for the file being uploaded to the S3 bucket.

template_fields: Sequence[str] = ('source_bucket_key', 'dest_bucket_key', 'source_bucket_name', 'dest_bucket_name')[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

get_openlineage_facets_on_start()[source]
class airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator(*, s3_bucket=None, s3_key, data, replace=False, encrypt=False, acl_policy=None, encoding=None, compression=None, aws_conn_id='aws_default', verify=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new object from data as string or bytes.

See also

For more information on how to use this operator, take a look at the guide: Create an Amazon S3 object

Parameters
  • s3_bucket (str | None) – Name of the S3 bucket where to save the object. (templated) It should be omitted when s3_key is provided as a full s3:// url.

  • s3_key (str) – The key of the object to be created. (templated) It can be either full s3:// style url or relative path from root level. When it’s specified as a full s3:// url, please omit s3_bucket.

  • data (str | bytes) – string or bytes to save as content.

  • replace (bool) – If True, it will overwrite the key if it already exists

  • encrypt (bool) – If True, the file will be encrypted on the server-side by S3 and will be stored in an encrypted form while at rest in S3.

  • acl_policy (str | None) – String specifying the canned ACL policy for the file being uploaded to the S3 bucket.

  • encoding (str | None) – The string to byte encoding. It should be specified only when data is provided as string.

  • compression (str | None) – Type of compression to use, currently only gzip is supported. It can be specified only when data is provided as string.

  • aws_conn_id (str | None) – Connection id of the S3 connection to use

  • verify (str | bool | None) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified.

    You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used,

      but SSL certificates will not be verified.

    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.

      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

template_fields: Sequence[str] = ('s3_bucket', 's3_key', 'data')[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

get_openlineage_facets_on_start()[source]
class airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator(*, bucket, keys=None, prefix=None, from_datetime=None, to_datetime=None, aws_conn_id='aws_default', verify=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

To enable users to delete single object or multiple objects from a bucket using a single HTTP request.

See also

For more information on how to use this operator, take a look at the guide: Delete Amazon S3 objects

Parameters
  • bucket (str) – Name of the bucket in which you are going to delete object(s). (templated)

  • keys (str | list | None) –

    The key(s) to delete from S3 bucket. (templated)

    When keys is a string, it’s supposed to be the key name of the single object to delete.

    When keys is a list, it’s supposed to be the list of the keys to delete.

  • prefix (str | None) – Prefix of objects to delete. (templated) All objects matching this prefix in the bucket will be deleted.

  • from_datetime (datetime.datetime | str | None) – Greater LastModified Date of objects to delete. (templated) All objects which LastModified Date is greater than this datetime in the bucket will be deleted.

  • to_datetime (datetime.datetime | str | None) – less LastModified Date of objects to delete. (templated) All objects which LastModified Date is less than this datetime in the bucket will be deleted.

  • aws_conn_id (str | None) – Connection id of the S3 connection to use

  • verify (str | bool | None) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified.

    You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used,

      but SSL certificates will not be verified.

    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.

      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

template_fields: Sequence[str] = ('keys', 'bucket', 'prefix', 'from_datetime', 'to_datetime')[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

get_openlineage_facets_on_complete(task_instance)[source]

Implement _on_complete because object keys are resolved in execute().

class airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator(*, source_s3_key, dest_s3_key, transform_script=None, select_expression=None, select_expr_serialization_config=None, script_args=None, source_aws_conn_id='aws_default', source_verify=None, dest_aws_conn_id='aws_default', dest_verify=None, replace=False, **kwargs)[source]

Bases: airflow.models.BaseOperator

Copies data from a source S3 location to a temporary location on the local filesystem.

Runs a transformation on this file as specified by the transformation script and uploads the output to a destination S3 location.

The locations of the source and the destination files in the local filesystem is provided as a first and second arguments to the transformation script. The transformation script is expected to read the data from source, transform it and write the output to the local destination file. The operator then takes over control and uploads the local destination file to S3.

S3 Select is also available to filter the source contents. Users can omit the transformation script if S3 Select expression is specified.

See also

For more information on how to use this operator, take a look at the guide: Transform an Amazon S3 object

Parameters
  • source_s3_key (str) – The key to be retrieved from S3. (templated)

  • dest_s3_key (str) – The key to be written from S3. (templated)

  • transform_script (str | None) – location of the executable transformation script

  • select_expression – S3 Select expression

  • select_expr_serialization_config (dict[str, dict[str, dict]] | None) – A dictionary that contains input and output serialization configurations for S3 Select.

  • script_args (Sequence[str] | None) – arguments for transformation script (templated)

  • source_aws_conn_id (str | None) – source s3 connection

  • source_verify (bool | str | None) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used

      (unless use_ssl is False), but SSL certificates will not be verified.

    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.

      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

    This is also applicable to dest_verify.

  • dest_aws_conn_id (str | None) – destination s3 connection

  • dest_verify (bool | str | None) – Whether or not to verify SSL certificates for S3 connection. See: source_verify

  • replace (bool) – Replace dest S3 key if it already exists

template_fields: Sequence[str] = ('source_s3_key', 'dest_s3_key', 'script_args')[source]
template_ext: Sequence[str] = ()[source]
ui_color = '#f9c915'[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

get_openlineage_facets_on_start()[source]
class airflow.providers.amazon.aws.operators.s3.S3ListOperator(*, bucket, prefix='', delimiter='', aws_conn_id='aws_default', verify=None, apply_wildcard=False, **kwargs)[source]

Bases: airflow.models.BaseOperator

List all objects from the bucket with the given string prefix in name.

This operator returns a python list with the name of objects which can be used by xcom in the downstream task.

See also

For more information on how to use this operator, take a look at the guide: List Amazon S3 objects

Parameters
  • bucket (str) – The S3 bucket where to find the objects. (templated)

  • prefix (str) – Prefix string to filters the objects whose name begin with such prefix. (templated)

  • delimiter (str) – the delimiter marks key hierarchy. (templated)

  • aws_conn_id (str | None) – The connection ID to use when connecting to S3 storage.

  • verify (str | bool | None) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used

      (unless use_ssl is False), but SSL certificates will not be verified.

    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.

      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

  • apply_wildcard (bool) – whether to treat ‘*’ as a wildcard or a plain symbol in the prefix.

Example:

The following operator would list all the files (excluding subfolders) from the S3 customers/2018/04/ key in the data bucket.

s3_file = S3ListOperator(
    task_id="list_3s_files",
    bucket="data",
    prefix="customers/2018/04/",
    delimiter="/",
    aws_conn_id="aws_customers_conn",
)
template_fields: Sequence[str] = ('bucket', 'prefix', 'delimiter')[source]
ui_color = '#ffd700'[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.amazon.aws.operators.s3.S3ListPrefixesOperator(*, bucket, prefix, delimiter, aws_conn_id='aws_default', verify=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

List all subfolders from the bucket with the given string prefix in name.

This operator returns a python list with the name of all subfolders which can be used by xcom in the downstream task.

See also

For more information on how to use this operator, take a look at the guide: List Amazon S3 prefixes

Parameters
  • bucket (str) – The S3 bucket where to find the subfolders. (templated)

  • prefix (str) – Prefix string to filter the subfolders whose name begin with such prefix. (templated)

  • delimiter (str) – the delimiter marks subfolder hierarchy. (templated)

  • aws_conn_id (str | None) – The connection ID to use when connecting to S3 storage.

  • verify (str | bool | None) –

    Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values:

    • False: do not validate SSL certificates. SSL will still be used

      (unless use_ssl is False), but SSL certificates will not be verified.

    • path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.

      You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

Example:

The following operator would list all the subfolders from the S3 customers/2018/04/ prefix in the data bucket.

s3_file = S3ListPrefixesOperator(
    task_id="list_s3_prefixes",
    bucket="data",
    prefix="customers/2018/04/",
    delimiter="/",
    aws_conn_id="aws_customers_conn",
)
template_fields: Sequence[str] = ('bucket', 'prefix', 'delimiter')[source]
ui_color = '#ffd700'[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Was this entry helpful?