SFTP to Google Cloud Storage Transfer Operator

Google has a service Google Cloud Storage. This service is used to store large data from various applications. SFTP (SSH File Transfer Protocol) is a secure file transfer protocol. It runs over the SSH protocol. It supports the full security and authentication functionality of the SSH.

Prerequisite Tasks

Operator

Transfer files between SFTP and Google Storage is performed with the SFTPToGCSOperator operator.

Use Jinja templating with source_path, destination_path, destination_bucket, impersonation_chain to define values dynamically.

Copying single files

The following Operator copies a single file.

airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.py[source]

copy_file_from_sftp_to_gcs = SFTPToGCSOperator(
    task_id="file-copy-sftp-to-gcs",
    source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_1),
    destination_bucket=BUCKET_SRC,
)

Moving a single file

To move the file use the move_object parameter. Once the file is copied to Google Storage, the original file from the SFTP is deleted. The destination_path parameter defines the full path of the file in the bucket.

airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.py[source]

move_file_from_sftp_to_gcs_destination = SFTPToGCSOperator(
    task_id="file-move-sftp-to-gcs-destination",
    source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_2),
    destination_bucket=BUCKET_SRC,
    destination_path="destination_dir/destination_filename.bin",
    move_object=True,
)

Copying directory

Use the wildcard in source_path parameter to copy the directory.

airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.py[source]

copy_directory_from_sftp_to_gcs = SFTPToGCSOperator(
    task_id="dir-copy-sftp-to-gcs",
    source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*"),
    destination_bucket=BUCKET_SRC,
)

Moving specific files

Use the wildcard in source_path parameter to move the specific files. You can use only one wildcard within your path. The destination_path defines the path that is prefixed to all copied files, e.g. tests_sftp_hook_dir/subdir/parent-1.bin is copied to specific_files/parent-1.bin, and tests_sftp_hook_dir/subdir/parent-2.bin is copied to specific_files/parent-2.bin . tests_sftp_hook_dir/subdir/parent-3.txt is skipped.

airflow/providers/google/cloud/example_dags/example_sftp_to_gcs.py[source]

move_specific_files_from_gcs_to_sftp = SFTPToGCSOperator(
    task_id="dir-move-specific-files-sftp-to-gcs",
    source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*.bin"),
    destination_bucket=BUCKET_SRC,
    destination_path="specific_files/",
    move_object=True,
)

Reference

For more information, see

Was this entry helpful?