Source code for airflow.providers.amazon.aws.example_dags.example_datasync
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import re
from datetime import datetime
from os import getenv
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.datasync import DataSyncOperator
[docs]TASK_ARN = getenv("TASK_ARN", "my_aws_datasync_task_arn")
[docs]SOURCE_LOCATION_URI = getenv("SOURCE_LOCATION_URI", "smb://hostname/directory/")
[docs]DESTINATION_LOCATION_URI = getenv("DESTINATION_LOCATION_URI", "s3://mybucket/prefix")
[docs]CREATE_TASK_KWARGS = json.loads(getenv("CREATE_TASK_KWARGS", '{"Name": "Created by Airflow"}'))
[docs]CREATE_SOURCE_LOCATION_KWARGS = json.loads(getenv("CREATE_SOURCE_LOCATION_KWARGS", '{}'))
[docs]default_destination_location_kwargs = """\
{"S3BucketArn": "arn:aws:s3:::mybucket",
"S3Config": {"BucketAccessRoleArn":
"arn:aws:iam::11112223344:role/r-11112223344-my-bucket-access-role"}
}"""
[docs]CREATE_DESTINATION_LOCATION_KWARGS = json.loads(
getenv("CREATE_DESTINATION_LOCATION_KWARGS", re.sub(r"[\s+]", '', default_destination_location_kwargs))
)
[docs]UPDATE_TASK_KWARGS = json.loads(getenv("UPDATE_TASK_KWARGS", '{"Name": "Updated by Airflow"}'))
with models.DAG(
"example_datasync",
schedule_interval=None, # Override to match your needs
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_datasync_specific_task]
# Execute a specific task
[docs] datasync_specific_task = DataSyncOperator(task_id="datasync_specific_task", task_arn=TASK_ARN)
# [END howto_operator_datasync_specific_task]
# [START howto_operator_datasync_search_task]
# Search and execute a task
datasync_search_task = DataSyncOperator(
task_id="datasync_search_task",
source_location_uri=SOURCE_LOCATION_URI,
destination_location_uri=DESTINATION_LOCATION_URI,
)
# [END howto_operator_datasync_search_task]
# [START howto_operator_datasync_create_task]
# Create a task (the task does not exist)
datasync_create_task = DataSyncOperator(
task_id="datasync_create_task",
source_location_uri=SOURCE_LOCATION_URI,
destination_location_uri=DESTINATION_LOCATION_URI,
create_task_kwargs=CREATE_TASK_KWARGS,
create_source_location_kwargs=CREATE_SOURCE_LOCATION_KWARGS,
create_destination_location_kwargs=CREATE_DESTINATION_LOCATION_KWARGS,
update_task_kwargs=UPDATE_TASK_KWARGS,
delete_task_after_execution=True,
)
# [END howto_operator_datasync_create_task]
chain(
datasync_specific_task,
datasync_search_task,
datasync_create_task,
)