Source code for airflow.providers.amazon.aws.example_dags.example_glue
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromdatetimeimportdatetimefromosimportgetenvfromairflowimportDAGfromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.amazon.aws.operators.glueimportGlueJobOperatorfromairflow.providers.amazon.aws.operators.glue_crawlerimportGlueCrawlerOperatorfromairflow.providers.amazon.aws.sensors.glueimportGlueJobSensorfromairflow.providers.amazon.aws.sensors.glue_crawlerimportGlueCrawlerSensor
# Role needs putobject/getobject access to the above bucket as well as the glue# service role, see docs here: https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html
# Example Spark script to operate on the above sample csv data.
[docs]EXAMPLE_SCRIPT=f'''from pyspark.context import SparkContextfrom awsglue.context import GlueContextglueContext = GlueContext(SparkContext.getOrCreate())datasource = glueContext.create_dynamic_frame.from_catalog( database='{GLUE_DATABASE_NAME}', table_name='input')print('There are %s items in the table' % datasource.count())datasource.toDF().write.format('csv').mode("append").save('s3://{GLUE_EXAMPLE_S3_BUCKET}/output')'''
@task(task_id='setup__upload_artifacts_to_s3')
[docs]defupload_artifacts_to_s3():'''Upload example CSV input data and an example Spark script to be used by the Glue Job'''s3_hook=S3Hook()s3_load_kwargs={"replace":True,"bucket_name":GLUE_EXAMPLE_S3_BUCKET}s3_hook.load_string(string_data=EXAMPLE_CSV,key='input/input.csv',**s3_load_kwargs)s3_hook.load_string(string_data=EXAMPLE_SCRIPT,key='etl_script.py',**s3_load_kwargs)