Amazon Athena Operator¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'
Detailed information is available Installation
Using Operator¶
Use the
AWSAthenaOperator
to run a query in Amazon Athena. To get started with Amazon Athena please visit
aws.amazon.com/athena
In the following example, we create an Athena table and run a query based upon a CSV file created in an S3 bucket and populated with SAMPLE_DATA. The example waits for the query to complete and then drops the created table and deletes the sample CSV file in the S3 bucket.
# Using a task-decorated function to create a CSV file in S3
add_sample_data_to_s3 = add_sample_data_to_s3()
create_table = AWSAthenaOperator(
task_id='setup__create_table',
query=QUERY_CREATE_TABLE,
database=ATHENA_DATABASE,
output_location=f's3://{S3_BUCKET}/{S3_KEY}',
sleep_time=30,
max_tries=None,
aws_conn_id=AWS_CONN_ID,
)
read_table = AWSAthenaOperator(
task_id='query__read_table',
query=QUERY_READ_TABLE,
database=ATHENA_DATABASE,
output_location=f's3://{S3_BUCKET}/{S3_KEY}',
sleep_time=30,
max_tries=None,
aws_conn_id=AWS_CONN_ID,
)
get_read_state = AthenaSensor(
task_id='query__get_read_state',
query_execution_id="{{ task_instance.xcom_pull('query__read_table', key='return_value') }}",
max_retries=None,
sleep_time=10,
aws_conn_id=AWS_CONN_ID,
)
# Using a task-decorated function to read the results from S3
read_results_from_s3 = read_results_from_s3(
"{{ task_instance.xcom_pull('query__read_table', key='return_value') }}"
)
drop_table = AWSAthenaOperator(
task_id='teardown__drop_table',
query=QUERY_DROP_TABLE,
database=ATHENA_DATABASE,
output_location=f's3://{S3_BUCKET}/{S3_KEY}',
sleep_time=30,
max_tries=None,
aws_conn_id=AWS_CONN_ID,
)
# Using a task-decorated function to delete the S3 file we created earlier
remove_sample_data_from_s3 = remove_sample_data_from_s3()
(
add_sample_data_to_s3
>> create_table
>> read_table
>> get_read_state
>> read_results_from_s3
>> drop_table
>> remove_sample_data_from_s3
)
More information¶
For further information, look at the documentation of start_query_execution()
method
in boto3.