# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
from copy import deepcopy
from datetime import date, time
from airflow import AirflowException
from airflow.contrib.hooks.gcp_transfer_hook import (
    GCPTransferServiceHook,
    GcpTransferJobsStatus,
    TRANSFER_OPTIONS,
    OBJECT_CONDITIONS,
    PROJECT_ID,
    BUCKET_NAME,
    GCS_DATA_SINK,
    STATUS,
    DESCRIPTION,
    GCS_DATA_SOURCE,
    HTTP_DATA_SOURCE,
    SECONDS,
    MINUTES,
    HOURS,
    YEAR,
    MONTH,
    DAY,
    START_TIME_OF_DAY,
    SCHEDULE_END_DATE,
    SCHEDULE_START_DATE,
    SCHEDULE,
    SECRET_ACCESS_KEY,
    ACCESS_KEY_ID,
    AWS_ACCESS_KEY,
    AWS_S3_DATA_SOURCE,
    TRANSFER_SPEC,
)
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
try:
    from airflow.contrib.hooks.aws_hook import AwsHook
except ImportError:  # pragma: no cover
[docs]    AwsHook = None  # type: ignore 
[docs]class TransferJobPreprocessor:
    def __init__(self, body, aws_conn_id='aws_default', default_schedule=False):
        self.body = body
        self.aws_conn_id = aws_conn_id
        self.default_schedule = default_schedule
[docs]    def _inject_aws_credentials(self):
        if TRANSFER_SPEC not in self.body or AWS_S3_DATA_SOURCE not in self.body[TRANSFER_SPEC]:
            return
        aws_hook = AwsHook(self.aws_conn_id)
        aws_credentials = aws_hook.get_credentials()
        aws_access_key_id = aws_credentials.access_key
        aws_secret_access_key = aws_credentials.secret_key
        self.body[TRANSFER_SPEC][AWS_S3_DATA_SOURCE][AWS_ACCESS_KEY] = {
            ACCESS_KEY_ID: aws_access_key_id,
            SECRET_ACCESS_KEY: aws_secret_access_key, 
        }
[docs]    def process_body(self):
        self._inject_aws_credentials()
        self._reformat_schedule()
        return self.body 
    @staticmethod
[docs]    def _convert_date_to_dict(field_date):
        """
        Convert native python ``datetime.date`` object  to a format supported by the API
        """
        return {DAY: field_date.day, MONTH: field_date.month, YEAR: field_date.year} 
    @staticmethod
[docs]    def _convert_time_to_dict(time):
        """
        Convert native python ``datetime.time`` object  to a format supported by the API
        """
        return {HOURS: time.hour, MINUTES: time.minute, SECONDS: time.second}  
[docs]class TransferJobValidator:
    def __init__(self, body):
        self.body = body
[docs]    def _verify_data_source(self):
        is_gcs = GCS_DATA_SOURCE in self.body[TRANSFER_SPEC]
        is_aws_s3 = AWS_S3_DATA_SOURCE in self.body[TRANSFER_SPEC]
        is_http = HTTP_DATA_SOURCE in self.body[TRANSFER_SPEC]
        sources_count = sum([is_gcs, is_aws_s3, is_http])
        if sources_count != 0 and sources_count != 1:
            raise AirflowException(
                "More than one data source detected. Please choose exactly one data source from: " 
                "gcsDataSource, awsS3DataSource and httpDataSource."
            )
[docs]    def _restrict_aws_credentials(self):
        if AWS_S3_DATA_SOURCE not in self.body[TRANSFER_SPEC]:
            return
        if AWS_ACCESS_KEY in self.body[TRANSFER_SPEC][AWS_S3_DATA_SOURCE]:
            raise AirflowException(
                "AWS credentials detected inside the body parameter (awsAccessKey). This is not allowed, " 
                "please use Airflow connections to store credentials."
            )
[docs]    def _restrict_empty_body(self):
        if not self.body:
            raise AirflowException("The required parameter 'body' is empty or None") 
[docs]    def validate_body(self):
        self._restrict_empty_body()
        if TRANSFER_SPEC not in self.body:
            return
        self._restrict_aws_credentials()
        self._verify_data_source()  
[docs]class GcpTransferServiceJobCreateOperator(BaseOperator):
    """
    Creates a transfer job that runs periodically.
    .. warning::
        This operator is NOT idempotent. If you run it many times, many transfer
        jobs will be created in the Google Cloud Platform.
    .. seealso::
        For more information on how to use this operator, take a look at the guide:
        :ref:`howto/operator:GcpTransferServiceJobCreateOperator`
    :param body: (Required) The request body, as described in
        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/create#request-body
        With three additional improvements:
        * dates can be given in the form :class:`datetime.date`
        * times can be given in the form :class:`datetime.time`
        * credentials to Amazon Web Service should be stored in the connection and indicated by the
          aws_conn_id parameter
    :type body: dict
    :param aws_conn_id: The connection ID used to retrieve credentials to
        Amazon Web Service.
    :type aws_conn_id: str
    :param gcp_conn_id: The connection ID used to connect to Google Cloud
        Platform.
    :type gcp_conn_id: str
    :param api_version: API version used (e.g. v1).
    :type api_version: str
    """
    # [START gcp_transfer_job_create_template_fields]
[docs]    template_fields = ('body', 'gcp_conn_id', 'aws_conn_id') 
    # [END gcp_transfer_job_create_template_fields]
    @apply_defaults
    def __init__(
        self,
        body,
        aws_conn_id='aws_default',
        gcp_conn_id='google_cloud_default',
        api_version='v1',
        *args,
        **kwargs
    ):
        super(GcpTransferServiceJobCreateOperator, self).__init__(*args, **kwargs)
        self.body = deepcopy(body)
        self.aws_conn_id = aws_conn_id
        self.gcp_conn_id = gcp_conn_id
        self.api_version = api_version
        self._validate_inputs()
[docs]    def execute(self, context):
        TransferJobPreprocessor(body=self.body, aws_conn_id=self.aws_conn_id).process_body()
        hook = GCPTransferServiceHook(api_version=self.api_version, gcp_conn_id=self.gcp_conn_id)
        return hook.create_transfer_job(body=self.body)  
[docs]class GcpTransferServiceJobUpdateOperator(BaseOperator):
    """
    Updates a transfer job that runs periodically.
    .. seealso::
        For more information on how to use this operator, take a look at the guide:
        :ref:`howto/operator:GcpTransferServiceJobUpdateOperator`
    :param job_name: (Required) Name of the job to be updated
    :type job_name: str
    :param body: (Required) The request body, as described in
        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
        With three additional improvements:
        * dates can be given in the form :class:`datetime.date`
        * times can be given in the form :class:`datetime.time`
        * credentials to Amazon Web Service should be stored in the connection and indicated by the
          aws_conn_id parameter
    :type body: dict
    :param aws_conn_id: The connection ID used to retrieve credentials to
        Amazon Web Service.
    :type aws_conn_id: str
    :param gcp_conn_id: The connection ID used to connect to Google Cloud
        Platform.
    :type gcp_conn_id: str
    :param api_version: API version used (e.g. v1).
    :type api_version: str
    """
    # [START gcp_transfer_job_update_template_fields]
[docs]    template_fields = ('job_name', 'body', 'gcp_conn_id', 'aws_conn_id') 
    # [END gcp_transfer_job_update_template_fields]
    @apply_defaults
    def __init__(
        self,
        job_name,
        body,
        aws_conn_id='aws_default',
        gcp_conn_id='google_cloud_default',
        api_version='v1',
        *args,
        **kwargs
    ):
        super(GcpTransferServiceJobUpdateOperator, self).__init__(*args, **kwargs)
        self.job_name = job_name
        self.body = body
        self.gcp_conn_id = gcp_conn_id
        self.api_version = api_version
        self.aws_conn_id = aws_conn_id
        self._validate_inputs()
[docs]    def execute(self, context):
        TransferJobPreprocessor(body=self.body, aws_conn_id=self.aws_conn_id).process_body()
        hook = GCPTransferServiceHook(api_version=self.api_version, gcp_conn_id=self.gcp_conn_id)
        return hook.update_transfer_job(job_name=self.job_name, body=self.body)  
[docs]class GcpTransferServiceJobDeleteOperator(BaseOperator):
    """
    Delete a transfer job. This is a soft delete. After a transfer job is
    deleted, the job and all the transfer executions are subject to garbage
    collection. Transfer jobs become eligible for garbage collection
    30 days after soft delete.
    .. seealso::
        For more information on how to use this operator, take a look at the guide:
        :ref:`howto/operator:GcpTransferServiceJobDeleteOperator`
    :param job_name: (Required) Name of the TRANSFER operation
    :type job_name: str
    :param project_id: (Optional) the ID of the project that owns the Transfer
        Job. If set to None or missing, the default project_id from the GCP
        connection is used.
    :type project_id: str
    :param gcp_conn_id: The connection ID used to connect to Google Cloud
        Platform.
    :type gcp_conn_id: str
    :param api_version: API version used (e.g. v1).
    :type api_version: str
    """
    # [START gcp_transfer_job_delete_template_fields]
[docs]    template_fields = ('job_name', 'project_id', 'gcp_conn_id', 'api_version') 
    # [END gcp_transfer_job_delete_template_fields]
    @apply_defaults
    def __init__(
        self, job_name, gcp_conn_id='google_cloud_default', api_version='v1', project_id=None, *args, **kwargs
    ):
        super(GcpTransferServiceJobDeleteOperator, self).__init__(*args, **kwargs)
        self.job_name = job_name
        self.project_id = project_id
        self.gcp_conn_id = gcp_conn_id
        self.api_version = api_version
        self._validate_inputs()
[docs]    def execute(self, context):
        self._validate_inputs()
        hook = GCPTransferServiceHook(api_version=self.api_version, gcp_conn_id=self.gcp_conn_id)
        hook.delete_transfer_job(job_name=self.job_name, project_id=self.project_id)  
[docs]class GcpTransferServiceOperationGetOperator(BaseOperator):
    """
    Gets the latest state of a long-running operation in Google Storage Transfer
    Service.
    .. seealso::
        For more information on how to use this operator, take a look at the guide:
        :ref:`howto/operator:GcpTransferServiceOperationGetOperator`
    :param operation_name: (Required) Name of the transfer operation.
    :type operation_name: str
    :param gcp_conn_id: The connection ID used to connect to Google
        Cloud Platform.
    :type gcp_conn_id: str
    :param api_version: API version used (e.g. v1).
    :type api_version: str
    """
    # [START gcp_transfer_operation_get_template_fields]
[docs]    template_fields = ('operation_name', 'gcp_conn_id') 
    # [END gcp_transfer_operation_get_template_fields]
    @apply_defaults
    def __init__(self, operation_name, gcp_conn_id='google_cloud_default', api_version='v1', *args, **kwargs):
        super(GcpTransferServiceOperationGetOperator, self).__init__(*args, **kwargs)
        self.operation_name = operation_name
        self.gcp_conn_id = gcp_conn_id
        self.api_version = api_version
        self._validate_inputs()
[docs]    def execute(self, context):
        hook = GCPTransferServiceHook(api_version=self.api_version, gcp_conn_id=self.gcp_conn_id)
        operation = hook.get_transfer_operation(operation_name=self.operation_name)
        return operation  
[docs]class GcpTransferServiceOperationsListOperator(BaseOperator):
    """
    Lists long-running operations in Google Storage Transfer
    Service that match the specified filter.
    .. seealso::
        For more information on how to use this operator, take a look at the guide:
        :ref:`howto/operator:GcpTransferServiceOperationsListOperator`
    :param filter: (Required) A request filter, as described in
            https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
    :type filter: dict
    :param gcp_conn_id: The connection ID used to connect to Google
        Cloud Platform.
    :type gcp_conn_id: str
    :param api_version: API version used (e.g. v1).
    :type api_version: str
    """
    # [START gcp_transfer_operations_list_template_fields]
[docs]    template_fields = ('filter', 'gcp_conn_id') 
    # [END gcp_transfer_operations_list_template_fields]
    def __init__(self, filter, gcp_conn_id='google_cloud_default', api_version='v1', *args, **kwargs):
        super(GcpTransferServiceOperationsListOperator, self).__init__(*args, **kwargs)
        self.filter = filter
        self.gcp_conn_id = gcp_conn_id
        self.api_version = api_version
        self._validate_inputs()
[docs]    def execute(self, context):
        hook = GCPTransferServiceHook(api_version=self.api_version, gcp_conn_id=self.gcp_conn_id)
        operations_list = hook.list_transfer_operations(filter=self.filter)
        self.log.info(operations_list)
        return operations_list  
[docs]class GcpTransferServiceOperationPauseOperator(BaseOperator):
    """
    Pauses a transfer operation in Google Storage Transfer Service.
    .. seealso::
        For more information on how to use this operator, take a look at the guide:
        :ref:`howto/operator:GcpTransferServiceOperationPauseOperator`
    :param operation_name: (Required) Name of the transfer operation.
    :type operation_name: str
    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
    :type gcp_conn_id: str
    :param api_version:  API version used (e.g. v1).
    :type api_version: str
    """
    # [START gcp_transfer_operation_pause_template_fields]
[docs]    template_fields = ('operation_name', 'gcp_conn_id', 'api_version') 
    # [END gcp_transfer_operation_pause_template_fields]
    @apply_defaults
    def __init__(self, operation_name, gcp_conn_id='google_cloud_default', api_version='v1', *args, **kwargs):
        super(GcpTransferServiceOperationPauseOperator, self).__init__(*args, **kwargs)
        self.operation_name = operation_name
        self.gcp_conn_id = gcp_conn_id
        self.api_version = api_version
        self._validate_inputs()
[docs]    def execute(self, context):
        hook = GCPTransferServiceHook(api_version=self.api_version, gcp_conn_id=self.gcp_conn_id)
        hook.pause_transfer_operation(operation_name=self.operation_name)  
[docs]class GcpTransferServiceOperationResumeOperator(BaseOperator):
    """
    Resumes a transfer operation in Google Storage Transfer Service.
    .. seealso::
        For more information on how to use this operator, take a look at the guide:
        :ref:`howto/operator:GcpTransferServiceOperationResumeOperator`
    :param operation_name: (Required) Name of the transfer operation.
    :type operation_name: str
    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
    :param api_version: API version used (e.g. v1).
    :type api_version: str
    :type gcp_conn_id: str
    """
    # [START gcp_transfer_operation_resume_template_fields]
[docs]    template_fields = ('operation_name', 'gcp_conn_id', 'api_version') 
    # [END gcp_transfer_operation_resume_template_fields]
    @apply_defaults
    def __init__(self, operation_name, gcp_conn_id='google_cloud_default', api_version='v1', *args, **kwargs):
        self.operation_name = operation_name
        self.gcp_conn_id = gcp_conn_id
        self.api_version = api_version
        self._validate_inputs()
        super(GcpTransferServiceOperationResumeOperator, self).__init__(*args, **kwargs)
[docs]    def execute(self, context):
        hook = GCPTransferServiceHook(api_version=self.api_version, gcp_conn_id=self.gcp_conn_id)
        hook.resume_transfer_operation(operation_name=self.operation_name)  
[docs]class GcpTransferServiceOperationCancelOperator(BaseOperator):
    """
    Cancels a transfer operation in Google Storage Transfer Service.
    .. seealso::
        For more information on how to use this operator, take a look at the guide:
        :ref:`howto/operator:GcpTransferServiceOperationCancelOperator`
    :param operation_name: (Required) Name of the transfer operation.
    :type operation_name: str
    :param api_version: API version used (e.g. v1).
    :type api_version: str
    :param gcp_conn_id: The connection ID used to connect to Google
        Cloud Platform.
    :type gcp_conn_id: str
    """
    # [START gcp_transfer_operation_cancel_template_fields]
[docs]    template_fields = ('operation_name', 'gcp_conn_id', 'api_version') 
    # [END gcp_transfer_operation_cancel_template_fields]
    @apply_defaults
    def __init__(self, operation_name, api_version='v1', gcp_conn_id='google_cloud_default', *args, **kwargs):
        super(GcpTransferServiceOperationCancelOperator, self).__init__(*args, **kwargs)
        self.operation_name = operation_name
        self.api_version = api_version
        self.gcp_conn_id = gcp_conn_id
        self._validate_inputs()
[docs]    def execute(self, context):
        hook = GCPTransferServiceHook(api_version=self.api_version, gcp_conn_id=self.gcp_conn_id)
        hook.cancel_transfer_operation(operation_name=self.operation_name)  
[docs]class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
    """
    Synchronizes an S3 bucket with a Google Cloud Storage bucket using the
    GCP Storage Transfer Service.
    .. warning::
        This operator is NOT idempotent. If you run it many times, many transfer
        jobs will be created in the Google Cloud Platform.
    **Example**:
    .. code-block:: python
       s3_to_gcs_transfer_op = S3ToGoogleCloudStorageTransferOperator(
            task_id='s3_to_gcs_transfer_example',
            s3_bucket='my-s3-bucket',
            project_id='my-gcp-project',
            gcs_bucket='my-gcs-bucket',
            dag=my_dag)
    :param s3_bucket: The S3 bucket where to find the objects. (templated)
    :type s3_bucket: str
    :param gcs_bucket: The destination Google Cloud Storage bucket
        where you want to store the files. (templated)
    :type gcs_bucket: str
    :param project_id: Optional ID of the Google Cloud Platform Console project that
        owns the job
    :type project_id: str
    :param aws_conn_id: The source S3 connection
    :type aws_conn_id: str
    :param gcp_conn_id: The destination connection ID to use
        when connecting to Google Cloud Storage.
    :type gcp_conn_id: str
    :param delegate_to: The account to impersonate, if any.
        For this to work, the service account making the request must have
        domain-wide delegation enabled.
    :type delegate_to: str
    :param description: Optional transfer service job description
    :type description: str
    :param schedule: Optional transfer service schedule;
        If not set, run transfer job once as soon as the operator runs
        The format is described
        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
        With two additional improvements:
        * dates they can be passed as :class:`datetime.date`
        * times they can be passed as :class:`datetime.time`
    :type schedule: dict
    :param object_conditions: Optional transfer service object conditions; see
        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
    :type object_conditions: dict
    :param transfer_options: Optional transfer service transfer options; see
        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
    :type transfer_options: dict
    :param wait: Wait for transfer to finish
    :type wait: bool
    :param timeout: Time to wait for the operation to end in seconds
    :type timeout: int
    """
[docs]    template_fields = ('gcp_conn_id', 's3_bucket', 'gcs_bucket', 'description', 'object_conditions') 
    @apply_defaults
    def __init__(
        self,
        s3_bucket,
        gcs_bucket,
        project_id=None,
        aws_conn_id='aws_default',
        gcp_conn_id='google_cloud_default',
        delegate_to=None,
        description=None,
        schedule=None,
        object_conditions=None,
        transfer_options=None,
        wait=True,
        timeout=None,
        *args,
        **kwargs
    ):
        super(S3ToGoogleCloudStorageTransferOperator, self).__init__(*args, **kwargs)
        self.s3_bucket = s3_bucket
        self.gcs_bucket = gcs_bucket
        self.project_id = project_id
        self.aws_conn_id = aws_conn_id
        self.gcp_conn_id = gcp_conn_id
        self.delegate_to = delegate_to
        self.description = description
        self.schedule = schedule
        self.object_conditions = object_conditions
        self.transfer_options = transfer_options
        self.wait = wait
        self.timeout = timeout
[docs]    def execute(self, context):
        hook = GCPTransferServiceHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
        body = self._create_body()
        TransferJobPreprocessor(body=body, aws_conn_id=self.aws_conn_id, default_schedule=True).process_body()
        job = hook.create_transfer_job(body=body)
        if self.wait:
            hook.wait_for_transfer_job(job, timeout=self.timeout) 
[docs]    def _create_body(self):
        body = {
            DESCRIPTION: self.description,
            STATUS: GcpTransferJobsStatus.ENABLED,
            TRANSFER_SPEC: {
                AWS_S3_DATA_SOURCE: {BUCKET_NAME: self.s3_bucket},
                GCS_DATA_SINK: {BUCKET_NAME: self.gcs_bucket},
            },
        }
        if self.project_id is not None:
            body[PROJECT_ID] = self.project_id
        if self.schedule is not None:
            body[SCHEDULE] = self.schedule
        if self.object_conditions is not None:
            body[TRANSFER_SPEC][OBJECT_CONDITIONS] = self.object_conditions
        if self.transfer_options is not None:
            body[TRANSFER_SPEC][TRANSFER_OPTIONS] = self.transfer_options
        return body  
[docs]class GoogleCloudStorageToGoogleCloudStorageTransferOperator(BaseOperator):
    """
    Copies objects from a bucket to another using the GCP Storage Transfer
    Service.
    .. warning::
        This operator is NOT idempotent. If you run it many times, many transfer
        jobs will be created in the Google Cloud Platform.
    **Example**:
    .. code-block:: python
       gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
            task_id='gcs_to_gcs_transfer_example',
            source_bucket='my-source-bucket',
            destination_bucket='my-destination-bucket',
            project_id='my-gcp-project',
            dag=my_dag)
    :param source_bucket: The source Google cloud storage bucket where the
         object is. (templated)
    :type source_bucket: str
    :param destination_bucket: The destination Google cloud storage bucket
        where the object should be. (templated)
    :type destination_bucket: str
    :param project_id: The ID of the Google Cloud Platform Console project that
        owns the job
    :type project_id: str
    :param gcp_conn_id: Optional connection ID to use when connecting to Google Cloud
        Storage.
    :type gcp_conn_id: str
    :param delegate_to: The account to impersonate, if any.
        For this to work, the service account making the request must have
        domain-wide delegation enabled.
    :type delegate_to: str
    :param description: Optional transfer service job description
    :type description: str
    :param schedule: Optional transfer service schedule;
        If not set, run transfer job once as soon as the operator runs
        See:
        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
        With two additional improvements:
        * dates they can be passed as :class:`datetime.date`
        * times they can be passed as :class:`datetime.time`
    :type schedule: dict
    :param object_conditions: Optional transfer service object conditions; see
        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions
    :type object_conditions: dict
    :param transfer_options: Optional transfer service transfer options; see
        https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#TransferOptions
    :type transfer_options: dict
    :param wait: Wait for transfer to finish; defaults to `True`
    :type wait: bool
    :param timeout: Time to wait for the operation to end in seconds
    :type timeout: int
    """
[docs]    template_fields = (
        'gcp_conn_id',
        'source_bucket',
        'destination_bucket',
        'description',
        'object_conditions', 
    )
    @apply_defaults
    def __init__(
        self,
        source_bucket,
        destination_bucket,
        project_id=None,
        gcp_conn_id='google_cloud_default',
        delegate_to=None,
        description=None,
        schedule=None,
        object_conditions=None,
        transfer_options=None,
        wait=True,
        timeout=None,
        *args,
        **kwargs
    ):
        super(GoogleCloudStorageToGoogleCloudStorageTransferOperator, self).__init__(*args, **kwargs)
        self.source_bucket = source_bucket
        self.destination_bucket = destination_bucket
        self.project_id = project_id
        self.gcp_conn_id = gcp_conn_id
        self.delegate_to = delegate_to
        self.description = description
        self.schedule = schedule
        self.object_conditions = object_conditions
        self.transfer_options = transfer_options
        self.wait = wait
        self.timeout = timeout
[docs]    def execute(self, context):
        hook = GCPTransferServiceHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
        body = self._create_body()
        TransferJobPreprocessor(body=body, default_schedule=True).process_body()
        job = hook.create_transfer_job(body=body)
        if self.wait:
            hook.wait_for_transfer_job(job, timeout=self.timeout) 
[docs]    def _create_body(self):
        body = {
            DESCRIPTION: self.description,
            STATUS: GcpTransferJobsStatus.ENABLED,
            TRANSFER_SPEC: {
                GCS_DATA_SOURCE: {BUCKET_NAME: self.source_bucket},
                GCS_DATA_SINK: {BUCKET_NAME: self.destination_bucket},
            },
        }
        if self.project_id is not None:
            body[PROJECT_ID] = self.project_id
        if self.schedule is not None:
            body[SCHEDULE] = self.schedule
        if self.object_conditions is not None:
            body[TRANSFER_SPEC][OBJECT_CONDITIONS] = self.object_conditions
        if self.transfer_options is not None:
            body[TRANSFER_SPEC][TRANSFER_OPTIONS] = self.transfer_options
        return body