Google API To S3 Transfer¶
Overview¶
The GoogleApiToS3Transfer
can call requests to any Google API which supports discovery and save its response on S3.
Two example_dags are provided which showcase the
GoogleApiToS3Transfer
in action.
example_google_api_to_s3_transfer_basic.py
example_google_api_to_s3_transfer_advanced.py
example_google_api_to_s3_transfer_basic.py¶
Purpose¶
This is a basic example dag for using GoogleApiToS3Transfer
to retrieve Google Sheets data.
Environment variables¶
These examples rely on the following variables, which can be passed via OS environment variables.
GOOGLE_SHEET_ID = getenv("GOOGLE_SHEET_ID")
GOOGLE_SHEET_RANGE = getenv("GOOGLE_SHEET_RANGE")
S3_DESTINATION_KEY = getenv("S3_DESTINATION_KEY", "s3://bucket/key.json")
All of them are required.
Get Google Sheets Sheet Values¶
In the following code we are requesting a Google Sheet via the sheets.spreadsheets.values.get
endpoint.
task_google_sheets_values_to_s3 = GoogleApiToS3Operator(
google_api_service_name='sheets',
google_api_service_version='v4',
google_api_endpoint_path='sheets.spreadsheets.values.get',
google_api_endpoint_params={'spreadsheetId': GOOGLE_SHEET_ID, 'range': GOOGLE_SHEET_RANGE},
s3_destination_key=S3_DESTINATION_KEY,
task_id='google_sheets_values_to_s3',
dag=dag,
)
You can find more information to the API endpoint used here.
example_google_api_to_s3_transfer_advanced.py¶
Purpose¶
This is a more advanced example dag for using GoogleApiToS3Transfer
which uses xcom to pass data between
tasks to retrieve specific information about YouTube videos.
Environment variables¶
This example relies on the following variables, which can be passed via OS environment variables.
YOUTUBE_CONN_ID = getenv("YOUTUBE_CONN_ID", "google_cloud_default")
YOUTUBE_CHANNEL_ID = getenv("YOUTUBE_CHANNEL_ID", "UCSXwxpWZQ7XZ1WL3wqevChA") # "Apache Airflow"
YOUTUBE_VIDEO_PUBLISHED_AFTER = getenv("YOUTUBE_VIDEO_PUBLISHED_AFTER", "2019-09-25T00:00:00Z")
YOUTUBE_VIDEO_PUBLISHED_BEFORE = getenv("YOUTUBE_VIDEO_PUBLISHED_BEFORE", "2019-10-18T00:00:00Z")
S3_DESTINATION_KEY = getenv("S3_DESTINATION_KEY", "s3://bucket/key.json")
YOUTUBE_VIDEO_PARTS = getenv("YOUTUBE_VIDEO_PARTS", "snippet")
YOUTUBE_VIDEO_FIELDS = getenv("YOUTUBE_VIDEO_FIELDS", "items(id,snippet(description,publishedAt,tags,title))")
S3_DESTINATION_KEY
is required.
Get YouTube Videos¶
First it searches for up to 50 videos (due to pagination) in a given time range
(YOUTUBE_VIDEO_PUBLISHED_AFTER
, YOUTUBE_VIDEO_PUBLISHED_BEFORE
) on a YouTube channel (YOUTUBE_CHANNEL_ID
)
saves the response in S3 and also pushes the data to xcom.
task_video_ids_to_s3 = GoogleApiToS3Operator(
gcp_conn_id=YOUTUBE_CONN_ID,
google_api_service_name='youtube',
google_api_service_version='v3',
google_api_endpoint_path='youtube.search.list',
google_api_endpoint_params={
'part': 'snippet',
'channelId': YOUTUBE_CHANNEL_ID,
'maxResults': 50,
'publishedAfter': YOUTUBE_VIDEO_PUBLISHED_AFTER,
'publishedBefore': YOUTUBE_VIDEO_PUBLISHED_BEFORE,
'type': 'video',
'fields': 'items/id/videoId',
},
google_api_response_via_xcom='video_ids_response',
s3_destination_key=f'{s3_directory}/youtube_search_{s3_file_name}.json',
task_id='video_ids_to_s3',
)
From there a BranchPythonOperator
will extract the xcom data and bring the IDs in a format the next
request needs it + it also decides whether we need to request any videos or not.
def _check_and_transform_video_ids(task_output):
video_ids_response = task_output
video_ids = [item['id']['videoId'] for item in video_ids_response['items']]
if video_ids:
context = get_current_context()
context["task_instance"].xcom_push(key='video_ids', value={'id': ','.join(video_ids)})
return 'video_data_to_s3'
return 'no_video_ids'
task_check_and_transform_video_ids = BranchPythonOperator(
python_callable=_check_and_transform_video_ids,
op_args=[task_video_ids_to_s3.output[task_video_ids_to_s3.google_api_response_via_xcom]],
task_id='check_and_transform_video_ids',
)
If there are YouTube Video IDs available, it passes over the YouTube IDs to the next request which then gets the
information (YOUTUBE_VIDEO_FIELDS
) for the requested videos and saves them in S3 (S3_DESTINATION_KEY
).
task_video_data_to_s3 = GoogleApiToS3Operator(
gcp_conn_id=YOUTUBE_CONN_ID,
google_api_service_name='youtube',
google_api_service_version='v3',
google_api_endpoint_path='youtube.videos.list',
google_api_endpoint_params={
'part': YOUTUBE_VIDEO_PARTS,
'maxResults': 50,
'fields': YOUTUBE_VIDEO_FIELDS,
},
google_api_endpoint_params_via_xcom='video_ids',
s3_destination_key=f'{s3_directory}/youtube_videos_{s3_file_name}.json',
task_id='video_data_to_s3',
)
If not do nothing - and track it.
task_no_video_ids = DummyOperator(task_id='no_video_ids')