Google API To S3 Transfer

Overview

The GoogleApiToS3Transfer can call requests to any Google API which supports discovery and save its response on S3.

Two example_dags are provided which showcase the GoogleApiToS3Transfer in action.

  • example_google_api_to_s3_transfer_basic.py

  • example_google_api_to_s3_transfer_advanced.py

example_google_api_to_s3_transfer_basic.py

Purpose

This is a basic example dag for using GoogleApiToS3Transfer to retrieve Google Sheets data.

Environment variables

These examples rely on the following variables, which can be passed via OS environment variables.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_basic.py[source]

GOOGLE_SHEET_ID = getenv("GOOGLE_SHEET_ID")
GOOGLE_SHEET_RANGE = getenv("GOOGLE_SHEET_RANGE")
S3_DESTINATION_KEY = getenv("S3_DESTINATION_KEY", "s3://bucket/key.json")

All of them are required.

Get Google Sheets Sheet Values

In the following code we are requesting a Google Sheet via the sheets.spreadsheets.values.get endpoint.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_basic.py[source]

    task_google_sheets_values_to_s3 = GoogleApiToS3Operator(
        google_api_service_name='sheets',
        google_api_service_version='v4',
        google_api_endpoint_path='sheets.spreadsheets.values.get',
        google_api_endpoint_params={'spreadsheetId': GOOGLE_SHEET_ID, 'range': GOOGLE_SHEET_RANGE},
        s3_destination_key=S3_DESTINATION_KEY,
        task_id='google_sheets_values_to_s3',
    )

You can find more information to the API endpoint used here.

example_google_api_to_s3_transfer_advanced.py

Purpose

This is a more advanced example dag for using GoogleApiToS3Transfer which uses xcom to pass data between tasks to retrieve specific information about YouTube videos.

Environment variables

This example relies on the following variables, which can be passed via OS environment variables.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py[source]

YOUTUBE_CONN_ID = getenv("YOUTUBE_CONN_ID", "google_cloud_default")
YOUTUBE_CHANNEL_ID = getenv("YOUTUBE_CHANNEL_ID", "UCSXwxpWZQ7XZ1WL3wqevChA")  # "Apache Airflow"
YOUTUBE_VIDEO_PUBLISHED_AFTER = getenv("YOUTUBE_VIDEO_PUBLISHED_AFTER", "2019-09-25T00:00:00Z")
YOUTUBE_VIDEO_PUBLISHED_BEFORE = getenv("YOUTUBE_VIDEO_PUBLISHED_BEFORE", "2019-10-18T00:00:00Z")
S3_DESTINATION_KEY = getenv("S3_DESTINATION_KEY", "s3://bucket/key.json")
YOUTUBE_VIDEO_PARTS = getenv("YOUTUBE_VIDEO_PARTS", "snippet")
YOUTUBE_VIDEO_FIELDS = getenv("YOUTUBE_VIDEO_FIELDS", "items(id,snippet(description,publishedAt,tags,title))")

S3_DESTINATION_KEY is required.

Get YouTube Videos

First it searches for up to 50 videos (due to pagination) in a given time range (YOUTUBE_VIDEO_PUBLISHED_AFTER, YOUTUBE_VIDEO_PUBLISHED_BEFORE) on a YouTube channel (YOUTUBE_CHANNEL_ID) saves the response in S3 and also pushes the data to xcom.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py[source]

    task_video_ids_to_s3 = GoogleApiToS3Operator(
        gcp_conn_id=YOUTUBE_CONN_ID,
        google_api_service_name='youtube',
        google_api_service_version='v3',
        google_api_endpoint_path='youtube.search.list',
        google_api_endpoint_params={
            'part': 'snippet',
            'channelId': YOUTUBE_CHANNEL_ID,
            'maxResults': 50,
            'publishedAfter': YOUTUBE_VIDEO_PUBLISHED_AFTER,
            'publishedBefore': YOUTUBE_VIDEO_PUBLISHED_BEFORE,
            'type': 'video',
            'fields': 'items/id/videoId',
        },
        google_api_response_via_xcom='video_ids_response',
        s3_destination_key=f'{s3_directory}/youtube_search_{s3_file_name}.json',
        task_id='video_ids_to_s3',
    )

From there a BranchPythonOperator will extract the xcom data and bring the IDs in a format the next request needs it + it also decides whether we need to request any videos or not.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py[source]

def _check_and_transform_video_ids(task_output, task_instance):
    video_ids_response = task_output
    video_ids = [item['id']['videoId'] for item in video_ids_response['items']]

    if video_ids:
        task_instance.xcom_push(key='video_ids', value={'id': ','.join(video_ids)})
        return 'video_data_to_s3'
    return 'no_video_ids'


airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py[source]

    task_check_and_transform_video_ids = BranchPythonOperator(
        python_callable=_check_and_transform_video_ids,
        op_args=[task_video_ids_to_s3.output[task_video_ids_to_s3.google_api_response_via_xcom]],
        task_id='check_and_transform_video_ids',
    )

If there are YouTube Video IDs available, it passes over the YouTube IDs to the next request which then gets the information (YOUTUBE_VIDEO_FIELDS) for the requested videos and saves them in S3 (S3_DESTINATION_KEY).

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py[source]

    task_video_data_to_s3 = GoogleApiToS3Operator(
        gcp_conn_id=YOUTUBE_CONN_ID,
        google_api_service_name='youtube',
        google_api_service_version='v3',
        google_api_endpoint_path='youtube.videos.list',
        google_api_endpoint_params={
            'part': YOUTUBE_VIDEO_PARTS,
            'maxResults': 50,
            'fields': YOUTUBE_VIDEO_FIELDS,
        },
        google_api_endpoint_params_via_xcom='video_ids',
        s3_destination_key=f'{s3_directory}/youtube_videos_{s3_file_name}.json',
        task_id='video_data_to_s3',
    )

If not do nothing - and track it.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py[source]

    task_no_video_ids = DummyOperator(task_id='no_video_ids')

Was this entry helpful?