Source code for tests.system.providers.amazon.aws.example_glue
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromdatetimeimportdatetimefromtypingimportList,Optional,Tupleimportboto3frombotocore.clientimportBaseClientfromairflowimportDAGfromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.operators.pythonimportget_current_contextfromairflow.providers.amazon.aws.operators.glueimportGlueJobOperatorfromairflow.providers.amazon.aws.operators.glue_crawlerimportGlueCrawlerOperatorfromairflow.providers.amazon.aws.operators.s3import(S3CreateBucketOperator,S3CreateObjectOperator,S3DeleteBucketOperator,)fromairflow.providers.amazon.aws.sensors.glueimportGlueJobSensorfromairflow.providers.amazon.aws.sensors.glue_crawlerimportGlueCrawlerSensorfromairflow.utils.trigger_ruleimportTriggerRulefromtests.system.providers.amazon.aws.utilsimportENV_ID_KEY,SystemTestContextBuilder,purge_logs
# Externally fetched variables:# Role needs S3 putobject/getobject access as well as the glue service role,# see docs here: https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html
# Example Spark script to operate on the above sample csv data.
[docs]EXAMPLE_SCRIPT='''from pyspark.context import SparkContextfrom awsglue.context import GlueContextglueContext = GlueContext(SparkContext.getOrCreate())datasource = glueContext.create_dynamic_frame.from_catalog( database='{db_name}', table_name='input')print('There are %s items in the table' % datasource.count())datasource.toDF().write.format('csv').mode("append").save('s3://{bucket_name}/output')'''
@task(trigger_rule=TriggerRule.ALL_DONE)
[docs]defdelete_logs(job_id:str,glue_crawler_name:str)->None:""" Glue generates four Cloudwatch log groups and multiple log streams and leaves them. """generated_log_groups:List[Tuple[str,Optional[str]]]=[# Format: ('log group name', 'log stream prefix')('/aws-glue/crawlers',glue_crawler_name),('/aws-glue/jobs/logs-v2',job_id),('/aws-glue/jobs/error',job_id),('/aws-glue/jobs/output',job_id),]purge_logs(generated_log_groups)
test_setup=set_up(env_id=test_context[ENV_ID_KEY],role_arn=test_context[ROLE_ARN_KEY],)create_bucket=S3CreateBucketOperator(task_id='create_bucket',bucket_name=test_setup['bucket_name'],)upload_csv=S3CreateObjectOperator(task_id='upload_csv',s3_bucket=test_setup['bucket_name'],s3_key='input/input.csv',data=EXAMPLE_CSV,replace=True,)upload_script=S3CreateObjectOperator(task_id='upload_script',s3_bucket=test_setup['bucket_name'],s3_key='etl_script.py',data=EXAMPLE_SCRIPT.format(db_name=test_setup['glue_db_name'],bucket_name=test_setup['bucket_name']),replace=True,)# [START howto_operator_glue_crawler]crawl_s3=GlueCrawlerOperator(task_id='crawl_s3',config=test_setup['glue_crawler_config'],# Waits by default, set False to test the Sensor belowwait_for_completion=False,)# [END howto_operator_glue_crawler]# [START howto_sensor_glue_crawler]wait_for_crawl=GlueCrawlerSensor(task_id='wait_for_crawl',crawler_name=test_setup['glue_crawler_name'],)# [END howto_sensor_glue_crawler]# [START howto_operator_glue]submit_glue_job=GlueJobOperator(task_id='submit_glue_job',job_name=test_setup['glue_job_name'],script_location=f's3://{test_setup["bucket_name"]}/etl_script.py',s3_bucket=test_setup['bucket_name'],iam_role_name=test_setup['role_name'],create_job_kwargs={'GlueVersion':'3.0','NumberOfWorkers':2,'WorkerType':'G.1X'},# Waits by default, set False to test the Sensor belowwait_for_completion=False,)# [END howto_operator_glue]# [START howto_sensor_glue]wait_for_job=GlueJobSensor(task_id='wait_for_job',job_name=test_setup['glue_job_name'],# Job ID extracted from previous Glue Job Operator taskrun_id=submit_glue_job.output,)# [END howto_sensor_glue]delete_bucket=S3DeleteBucketOperator(task_id='delete_bucket',trigger_rule=TriggerRule.ALL_DONE,bucket_name=test_setup['bucket_name'],force_delete=True,)clean_up=glue_cleanup(test_setup['glue_crawler_name'],test_setup['glue_job_name'],test_setup['glue_db_name'],)chain(# TEST SETUPtest_context,test_setup,create_bucket,upload_csv,upload_script,# TEST BODYcrawl_s3,wait_for_crawl,submit_glue_job,wait_for_job,# TEST TEARDOWNclean_up,delete_bucket,delete_logs(submit_glue_job.output,test_setup['glue_crawler_name']),)fromtests.system.utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)