SFTP to Google Cloud Storage Transfer Operator

Google has a service Google Cloud Storage. This service is used to store large data from various applications. SFTP (SSH File Transfer Protocol) is a secure file transfer protocol. It runs over the SSH protocol. It supports the full security and authentication functionality of the SSH.

Operator

Transfer files between SFTP and Google Storage is performed with the SFTPToGCSOperator operator.

Use Jinja templating with source_path, destination_path, destination_bucket, impersonation_chain to define values dynamically.

Copying single files

The following Operator copies a single file.

airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.pyView Source

copy_file_from_sftp_to_gcs = SFTPToGCSOperator(
    task_id="file-copy-sftp-to-gcs",
    source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_1),
    destination_bucket=BUCKET_SRC,
)

Moving a single file

To move the file use the move_object parameter. Once the file is copied to Google Storage, the original file from the SFTP is deleted. The destination_path parameter defines the full path of the file in the bucket.

airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.pyView Source

move_file_from_sftp_to_gcs_destination = SFTPToGCSOperator(
    task_id="file-move-sftp-to-gcs-destination",
    source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_2),
    destination_bucket=BUCKET_SRC,
    destination_path="destination_dir/destination_filename.bin",
    move_object=True,
)

Copying directory

Use the wildcard in source_path parameter to copy the directory.

airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.pyView Source

copy_directory_from_sftp_to_gcs = SFTPToGCSOperator(
    task_id="dir-copy-sftp-to-gcs",
    source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*"),
    destination_bucket=BUCKET_SRC,
)

Moving specific files

Use the wildcard in source_path parameter to move the specific files. You can use only one wildcard within your path. The destination_path defines the path that is prefixed to all copied files, e.g. tests_sftp_hook_dir/subdir/parent-1.bin is copied to specific_files/parent-1.bin, and tests_sftp_hook_dir/subdir/parent-2.bin is copied to specific_files/parent-2.bin . tests_sftp_hook_dir/subdir/parent-3.txt is skipped.

airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.pyView Source

move_specific_files_from_gcs_to_sftp = SFTPToGCSOperator(
    task_id="dir-move-specific-files-sftp-to-gcs",
    source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*.bin"),
    destination_bucket=BUCKET_SRC,
    destination_path="specific_files/",
    move_object=True,
)

Was this entry helpful?