Source code for tests.system.providers.amazon.aws.example_glue
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromdatetimeimportdatetimeimportboto3frombotocore.clientimportBaseClientfromairflowimportDAGfromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.providers.amazon.aws.operators.glueimportGlueJobOperatorfromairflow.providers.amazon.aws.operators.glue_crawlerimportGlueCrawlerOperatorfromairflow.providers.amazon.aws.operators.s3import(S3CreateBucketOperator,S3CreateObjectOperator,S3DeleteBucketOperator,)fromairflow.providers.amazon.aws.sensors.glueimportGlueJobSensorfromairflow.providers.amazon.aws.sensors.glue_crawlerimportGlueCrawlerSensorfromairflow.utils.trigger_ruleimportTriggerRulefromtests.system.providers.amazon.aws.utilsimportENV_ID_KEY,SystemTestContextBuilder,purge_logs
# Externally fetched variables:# Role needs S3 putobject/getobject access as well as the glue service role,# see docs here: https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html
[docs]defdelete_logs(job_id:str,crawler_name:str)->None:""" Glue generates four Cloudwatch log groups and multiple log streams and leaves them. """generated_log_groups:list[tuple[str,str|None]]=[# Format: ('log group name', 'log stream prefix')("/aws-glue/crawlers",crawler_name),("/aws-glue/jobs/logs-v2",job_id),("/aws-glue/jobs/error",job_id),("/aws-glue/jobs/output",job_id),]purge_logs(generated_log_groups)
env_id=test_context[ENV_ID_KEY]role_arn=test_context[ROLE_ARN_KEY]glue_crawler_name=f"{env_id}_crawler"glue_db_name=f"{env_id}_glue_db"glue_job_name=f"{env_id}_glue_job"bucket_name=f"{env_id}-bucket"role_name=get_role_name(role_arn)glue_crawler_config={"Name":glue_crawler_name,"Role":role_arn,"DatabaseName":glue_db_name,"Targets":{"S3Targets":[{"Path":f"{bucket_name}/input"}]},}create_bucket=S3CreateBucketOperator(task_id="create_bucket",bucket_name=bucket_name,)upload_csv=S3CreateObjectOperator(task_id="upload_csv",s3_bucket=bucket_name,s3_key="input/input.csv",data=EXAMPLE_CSV,replace=True,)upload_script=S3CreateObjectOperator(task_id="upload_script",s3_bucket=bucket_name,s3_key="etl_script.py",data=EXAMPLE_SCRIPT.format(db_name=glue_db_name,bucket_name=bucket_name),replace=True,)# [START howto_operator_glue_crawler]crawl_s3=GlueCrawlerOperator(task_id="crawl_s3",config=glue_crawler_config,)# [END howto_operator_glue_crawler]# GlueCrawlerOperator waits by default, setting as False to test the Sensor below.crawl_s3.wait_for_completion=False# [START howto_sensor_glue_crawler]wait_for_crawl=GlueCrawlerSensor(task_id="wait_for_crawl",crawler_name=glue_crawler_name,)# [END howto_sensor_glue_crawler]# [START howto_operator_glue]submit_glue_job=GlueJobOperator(task_id="submit_glue_job",job_name=glue_job_name,script_location=f"s3://{bucket_name}/etl_script.py",s3_bucket=bucket_name,iam_role_name=role_name,create_job_kwargs={"GlueVersion":"3.0","NumberOfWorkers":2,"WorkerType":"G.1X"},)# [END howto_operator_glue]# GlueJobOperator waits by default, setting as False to test the Sensor below.submit_glue_job.wait_for_completion=False# [START howto_sensor_glue]wait_for_job=GlueJobSensor(task_id="wait_for_job",job_name=glue_job_name,# Job ID extracted from previous Glue Job Operator taskrun_id=submit_glue_job.output,)# [END howto_sensor_glue]delete_bucket=S3DeleteBucketOperator(task_id="delete_bucket",trigger_rule=TriggerRule.ALL_DONE,bucket_name=bucket_name,force_delete=True,)chain(# TEST SETUPtest_context,create_bucket,upload_csv,upload_script,# TEST BODYcrawl_s3,wait_for_crawl,submit_glue_job,wait_for_job,# TEST TEARDOWNglue_cleanup(glue_crawler_name,glue_job_name,glue_db_name),delete_bucket,delete_logs(submit_glue_job.output,glue_crawler_name),)fromtests.system.utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)