Amazon Athena Operator

Prerequisite Tasks

To use these operators, you must do a few things:

Using Operator

Use the AWSAthenaOperator to run a query in Amazon Athena. To get started with Amazon Athena please visit aws.amazon.com/athena

In the following example, we create an Athena table and run a query based upon a CSV file created in an S3 bucket and populated with SAMPLE_DATA. The example waits for the query to complete and then drops the created table and deletes the sample CSV file in the S3 bucket.

airflow/providers/amazon/aws/example_dags/example_athena.pyView Source


    # Using a task-decorated function to create a CSV file in S3
    add_sample_data_to_s3 = add_sample_data_to_s3()

    create_table = AWSAthenaOperator(
        task_id='setup__create_table',
        query=QUERY_CREATE_TABLE,
        database=ATHENA_DATABASE,
        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
        sleep_time=30,
        max_tries=None,
        aws_conn_id=AWS_CONN_ID,
    )

    read_table = AWSAthenaOperator(
        task_id='query__read_table',
        query=QUERY_READ_TABLE,
        database=ATHENA_DATABASE,
        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
        sleep_time=30,
        max_tries=None,
        aws_conn_id=AWS_CONN_ID,
    )

    get_read_state = AthenaSensor(
        task_id='query__get_read_state',
        query_execution_id="{{ task_instance.xcom_pull('query__read_table', key='return_value') }}",
        max_retries=None,
        sleep_time=10,
        aws_conn_id=AWS_CONN_ID,
    )

    # Using a task-decorated function to read the results from S3
    read_results_from_s3 = read_results_from_s3(
        "{{ task_instance.xcom_pull('query__read_table', key='return_value') }}"
    )

    drop_table = AWSAthenaOperator(
        task_id='teardown__drop_table',
        query=QUERY_DROP_TABLE,
        database=ATHENA_DATABASE,
        output_location=f's3://{S3_BUCKET}/{S3_KEY}',
        sleep_time=30,
        max_tries=None,
        aws_conn_id=AWS_CONN_ID,
    )

    # Using a task-decorated function to delete the S3 file we created earlier
    remove_sample_data_from_s3 = remove_sample_data_from_s3()

    (
        add_sample_data_to_s3
        >> create_table
        >> read_table
        >> get_read_state
        >> read_results_from_s3
        >> drop_table
        >> remove_sample_data_from_s3
    )

More information

For further information, look at the documentation of start_query_execution() method in boto3.

Was this entry helpful?