airflow.providers.amazon.aws.transfers.mongo_to_s3

Module Contents

Classes

MongoToS3Operator

Operator meant to move data from mongo via pymongo to s3 via boto.

class airflow.providers.amazon.aws.transfers.mongo_to_s3.MongoToS3Operator(*, s3_conn_id: Optional[str] = None, mongo_conn_id: str = 'mongo_default', aws_conn_id: str = 'aws_default', mongo_collection: str, mongo_query: Union[list, dict], s3_bucket: str, s3_key: str, mongo_db: Optional[str] = None, mongo_projection: Optional[Union[list, dict]] = None, replace: bool = False, allow_disk_use: bool = False, compression: Optional[str] = None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Operator meant to move data from mongo via pymongo to s3 via boto.

Parameters
  • mongo_conn_id (str) -- reference to a specific mongo connection

  • aws_conn_id (str) -- reference to a specific S3 connection

  • mongo_collection (str) -- reference to a specific collection in your mongo db

  • mongo_query (Union[list, dict]) -- query to execute. A list including a dict of the query

  • mongo_projection (Union[list, dict]) -- optional parameter to filter the returned fields by the query. It can be a list of fields names to include or a dictionary for excluding fields (e.g projection={"_id": 0} )

  • s3_bucket (str) -- reference to a specific S3 bucket to store the data

  • s3_key (str) -- in which S3 key the file will be stored

  • mongo_db (str) -- reference to a specific mongo database

  • replace (bool) -- whether or not to replace the file in S3 if it previously existed

  • allow_disk_use (bool) -- enables writing to temporary files in the case you are handling large dataset. This only takes effect when mongo_query is a list - running an aggregate pipeline

  • compression (str) -- type of compression to use for output file in S3. Currently only gzip is supported.

template_fields :Sequence[str] = ['s3_bucket', 's3_key', 'mongo_query', 'mongo_collection'][source]
ui_color = #589636[source]
template_fields_renderers[source]
execute(self, context: airflow.utils.context.Context)[source]

Is written to depend on transform method

static transform(docs: Any) Any[source]

This method is meant to be extended by child classes to perform transformations unique to those operators needs. Processes pyMongo cursor and returns an iterable with each element being a JSON serializable dictionary

Base transform() assumes no processing is needed ie. docs is a pyMongo cursor of documents and cursor just needs to be passed through

Override this method for custom transformations

Was this entry helpful?