Amazon Athena Operator¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation
Using Operator¶
Use the
AWSAthenaOperator
to run a query in Amazon Athena.  To get started with Amazon Athena please visit
aws.amazon.com/athena
In the following example, we create an Athena table and run a query based upon a CSV file created in an S3 bucket and populated with SAMPLE_DATA. The example waits for the query to complete and then drops the created table and deletes the sample CSV file in the S3 bucket.
    # Using a task-decorated function to create a CSV file in S3
    add_sample_data_to_s3 = add_sample_data_to_s3()
    create_table = AWSAthenaOperator(
        task_id='setup__create_table',
        query=QUERY_CREATE_TABLE,
        database=ATHENA_DATABASE,
        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
        sleep_time=30,
        max_tries=None,
        aws_conn_id=AWS_CONN_ID,
    )
    read_table = AWSAthenaOperator(
        task_id='query__read_table',
        query=QUERY_READ_TABLE,
        database=ATHENA_DATABASE,
        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
        sleep_time=30,
        max_tries=None,
        aws_conn_id=AWS_CONN_ID,
    )
    get_read_state = AthenaSensor(
        task_id='query__get_read_state',
        query_execution_id="{{ task_instance.xcom_pull('query__read_table', key='return_value') }}",
        max_retries=None,
        sleep_time=10,
        aws_conn_id=AWS_CONN_ID,
    )
    # Using a task-decorated function to read the results from S3
    read_results_from_s3 = read_results_from_s3(
        "{{ task_instance.xcom_pull('query__read_table', key='return_value') }}"
    )
    drop_table = AWSAthenaOperator(
        task_id='teardown__drop_table',
        query=QUERY_DROP_TABLE,
        database=ATHENA_DATABASE,
        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
        sleep_time=30,
        max_tries=None,
        aws_conn_id=AWS_CONN_ID,
    )
    # Using a task-decorated function to delete the S3 file we created earlier
    remove_sample_data_from_s3 = remove_sample_data_from_s3()
    (
        add_sample_data_to_s3
        >> create_table
        >> read_table
        >> get_read_state
        >> read_results_from_s3
        >> drop_table
        >> remove_sample_data_from_s3
    )
More information¶
For further information, look at the documentation of start_query_execution() method
in boto3.