Source code for airflow.providers.common.io.operators.file_transfer
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportSequencefromtypingimportTYPE_CHECKINGfromairflow.providers.common.io.version_compatimportAIRFLOW_V_3_0_PLUSifTYPE_CHECKING:fromairflow.providers.openlineage.extractorsimportOperatorLineagefromairflow.sdkimportContextifAIRFLOW_V_3_0_PLUS:fromairflow.sdkimportObjectStoragePathfromairflow.sdk.bases.operatorimportBaseOperatorelse:fromairflow.io.pathimportObjectStoragePath# type: ignore[no-redef]fromairflow.modelsimportBaseOperator# type: ignore[no-redef]
[docs]classFileTransferOperator(BaseOperator):""" Copies a file from a source to a destination. This streams the file from the source to the destination if required , so it does not need to fit into memory. :param src: The source file path or ObjectStoragePath object. :param dst: The destination file path or ObjectStoragePath object. :param source_conn_id: The optional source connection id. :param dest_conn_id: The optional destination connection id. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:FileTransferOperator` """
[docs]defget_openlineage_facets_on_start(self)->OperatorLineage:fromairflow.providers.common.compat.openlineage.facetimportDatasetfromairflow.providers.openlineage.extractorsimportOperatorLineagedef_prepare_ol_dataset(path:ObjectStoragePath)->Dataset:ifhasattr(path,"namespace"):# namespace has been added in Airflow 2.9.0; #36410returnDataset(namespace=path.namespace,name=path.key)# manually recreating namespacereturnDataset(namespace=f"{path.protocol}://{path.bucket}"ifpath.bucketelsepath.protocol,name=path.key.lstrip(path.sep),)src:ObjectStoragePath=self._get_path(self.src,self.source_conn_id)dst:ObjectStoragePath=self._get_path(self.dst,self.dst_conn_id)input_dataset=_prepare_ol_dataset(src)output_dataset=_prepare_ol_dataset(dst)returnOperatorLineage(inputs=[input_dataset],outputs=[output_dataset],)