Google API To S3 Transfer

Overview

The GoogleApiToS3Transfer can call requests to any Google API which supports discovery and save its response on S3.

Two example_dags are provided which showcase the GoogleApiToS3Transfer in action.

  • example_google_api_to_s3_transfer_basic.py

  • example_google_api_to_s3_transfer_advanced.py

example_google_api_to_s3_transfer_basic.py

Purpose

This is a basic example dag for using GoogleApiToS3Transfer to retrieve Google Sheets data.

Environment variables

These examples rely on the following variables, which can be passed via OS environment variables.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_basic.pyView Source

GOOGLE_SHEET_ID = getenv("GOOGLE_SHEET_ID")
GOOGLE_SHEET_RANGE = getenv("GOOGLE_SHEET_RANGE")
S3_DESTINATION_KEY = getenv("S3_DESTINATION_KEY", "s3://bucket/key.json")

All of them are required.

Get Google Sheets Sheet Values

In the following code we are requesting a Google Sheet via the sheets.spreadsheets.values.get endpoint.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_basic.pyView Source

    task_google_sheets_values_to_s3 = GoogleApiToS3Operator(
        google_api_service_name='sheets',
        google_api_service_version='v4',
        google_api_endpoint_path='sheets.spreadsheets.values.get',
        google_api_endpoint_params={'spreadsheetId': GOOGLE_SHEET_ID, 'range': GOOGLE_SHEET_RANGE},
        s3_destination_key=S3_DESTINATION_KEY,
        task_id='google_sheets_values_to_s3',
        dag=dag,
    )

You can find more information to the API endpoint used here.

example_google_api_to_s3_transfer_advanced.py

Purpose

This is a more advanced example dag for using GoogleApiToS3Transfer which uses xcom to pass data between tasks to retrieve specific information about YouTube videos.

Environment variables

This example relies on the following variables, which can be passed via OS environment variables.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.pyView Source

YOUTUBE_CONN_ID = getenv("YOUTUBE_CONN_ID", "google_cloud_default")
YOUTUBE_CHANNEL_ID = getenv("YOUTUBE_CHANNEL_ID", "UCSXwxpWZQ7XZ1WL3wqevChA")  # "Apache Airflow"
YOUTUBE_VIDEO_PUBLISHED_AFTER = getenv("YOUTUBE_VIDEO_PUBLISHED_AFTER", "2019-09-25T00:00:00Z")
YOUTUBE_VIDEO_PUBLISHED_BEFORE = getenv("YOUTUBE_VIDEO_PUBLISHED_BEFORE", "2019-10-18T00:00:00Z")
S3_DESTINATION_KEY = getenv("S3_DESTINATION_KEY", "s3://bucket/key.json")
YOUTUBE_VIDEO_PARTS = getenv("YOUTUBE_VIDEO_PARTS", "snippet")
YOUTUBE_VIDEO_FIELDS = getenv("YOUTUBE_VIDEO_FIELDS", "items(id,snippet(description,publishedAt,tags,title))")

S3_DESTINATION_KEY is required.

Get YouTube Videos

First it searches for up to 50 videos (due to pagination) in a given time range (YOUTUBE_VIDEO_PUBLISHED_AFTER, YOUTUBE_VIDEO_PUBLISHED_BEFORE) on a YouTube channel (YOUTUBE_CHANNEL_ID) saves the response in S3 and also pushes the data to xcom.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.pyView Source

    task_video_ids_to_s3 = GoogleApiToS3Operator(
        gcp_conn_id=YOUTUBE_CONN_ID,
        google_api_service_name='youtube',
        google_api_service_version='v3',
        google_api_endpoint_path='youtube.search.list',
        google_api_endpoint_params={
            'part': 'snippet',
            'channelId': YOUTUBE_CHANNEL_ID,
            'maxResults': 50,
            'publishedAfter': YOUTUBE_VIDEO_PUBLISHED_AFTER,
            'publishedBefore': YOUTUBE_VIDEO_PUBLISHED_BEFORE,
            'type': 'video',
            'fields': 'items/id/videoId',
        },
        google_api_response_via_xcom='video_ids_response',
        s3_destination_key=f'{s3_directory}/youtube_search_{s3_file_name}.json',
        task_id='video_ids_to_s3',
    )

From there a BranchPythonOperator will extract the xcom data and bring the IDs in a format the next request needs it + it also decides whether we need to request any videos or not.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.pyView Source

def _check_and_transform_video_ids(task_output):
    video_ids_response = task_output
    video_ids = [item['id']['videoId'] for item in video_ids_response['items']]

    if video_ids:
        context = get_current_context()
        context["task_instance"].xcom_push(key='video_ids', value={'id': ','.join(video_ids)})
        return 'video_data_to_s3'
    return 'no_video_ids'


airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.pyView Source

    task_check_and_transform_video_ids = BranchPythonOperator(
        python_callable=_check_and_transform_video_ids,
        op_args=[task_video_ids_to_s3.output[task_video_ids_to_s3.google_api_response_via_xcom]],
        task_id='check_and_transform_video_ids',
    )

If there are YouTube Video IDs available, it passes over the YouTube IDs to the next request which then gets the information (YOUTUBE_VIDEO_FIELDS) for the requested videos and saves them in S3 (S3_DESTINATION_KEY).

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.pyView Source

    task_video_data_to_s3 = GoogleApiToS3Operator(
        gcp_conn_id=YOUTUBE_CONN_ID,
        google_api_service_name='youtube',
        google_api_service_version='v3',
        google_api_endpoint_path='youtube.videos.list',
        google_api_endpoint_params={
            'part': YOUTUBE_VIDEO_PARTS,
            'maxResults': 50,
            'fields': YOUTUBE_VIDEO_FIELDS,
        },
        google_api_endpoint_params_via_xcom='video_ids',
        s3_destination_key=f'{s3_directory}/youtube_videos_{s3_file_name}.json',
        task_id='video_data_to_s3',
    )

If not do nothing - and track it.

airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.pyView Source

    task_no_video_ids = DummyOperator(task_id='no_video_ids')

Was this entry helpful?