Trino to Google Cloud Storage Transfer Operator¶
Trino is an open source, fast, distributed SQL query engine for running interactive analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Trino allows querying data where it lives, including Hive, Cassandra, relational databases or even proprietary data stores. A single Trino query can combine data from multiple sources, allowing for analytics across your entire organization.
Google Cloud Storage allows world-wide storage and retrieval of any amount of data at any time. You can use it to store backup and archive data as well as a data source for BigQuery.
Data transfer¶
Transfer files between Trino and Google Storage is performed with the
TrinoToGCSOperator
operator.
This operator has 3 required parameters:
sql
- The SQL to execute.bucket
- The bucket to upload to.filename
- The filename to use as the object name when uploading to Google Cloud Storage. A{}
should be specified in the filename to allow the operator to inject file numbers in cases where the file is split due to size.
All parameters are described in the reference documentation - TrinoToGCSOperator
.
An example operator call might look like this:
trino_to_gcs_basic = TrinoToGCSOperator(
task_id="trino_to_gcs_basic",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
)
Choice of data format¶
The operator supports two output formats:
json
- JSON Lines (default)csv
You can specify these options by the export_format
parameter.
If you want a CSV file to be created, your operator call might look like this:
trino_to_gcs_csv = TrinoToGCSOperator(
task_id="trino_to_gcs_csv",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.csv",
schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
export_format="csv",
)
Generating BigQuery schema¶
If you set schema_filename
parameter, a .json
file containing the BigQuery schema fields for the table
will be dumped from the database and upload to the bucket.
If you want to create a schema file, then an example operator call might look like this:
trino_to_gcs_multiple_types = TrinoToGCSOperator(
task_id="trino_to_gcs_multiple_types",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
gzip=False,
)
For more information about the BigQuery schema, please look at Specifying schema in the Big Query documentation.
Division of the result into multiple files¶
This operator supports the ability to split large result into multiple files. The approx_max_file_size_bytes
parameters allows developers to specify the file size of the splits. By default, the file has no more
than 1 900 000 000 bytes (1900 MB)
Check Quotas & limits in Google Cloud Storage to see the maximum allowed file size for a single object.
If you want to create 10 MB files, your code might look like this:
read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
task_id="read_data_from_gcs_many_chunks",
configuration={
"query": {
"query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
"useLegacySql": False,
}
},
)
Querying data using the BigQuery¶
The data available in Google Cloud Storage can be used by BigQuery. You can load data to BigQuery or refer in queries directly to GCS data. For information about the loading data to the BigQuery, please look at Introduction to loading data from Cloud Storage in the BigQuery documentation. For information about the querying GCS data, please look at Querying Cloud Storage data in the BigQuery documentation.
Airflow also has numerous operators that allow you to create the use of BigQuery.
For example, if you want to create an external table that allows you to create queries that
read data directly from GCS, then you can use BigQueryCreateExternalTableOperator
.
Using this operator looks like this:
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
task_id="create_external_table_multiple_types",
bucket=GCS_BUCKET,
table_resource={
"tableReference": {
"projectId": GCP_PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": f"{safe_name(SOURCE_SCHEMA_COLUMNS)}",
},
"schema": {
"fields": [
{"name": "table_catalog", "type": "STRING"},
{"name": "table_schema", "type": "STRING"},
{"name": "table_name", "type": "STRING"},
{"name": "column_name", "type": "STRING"},
{"name": "ordinal_position", "type": "INT64"},
{"name": "column_default", "type": "STRING"},
{"name": "is_nullable", "type": "STRING"},
{"name": "data_type", "type": "STRING"},
],
},
"externalDataConfiguration": {
"sourceFormat": "NEWLINE_DELIMITED_JSON",
"compression": "NONE",
"sourceUris": [f"gs://{GCS_BUCKET}/{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
},
},
source_objects=[f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
schema_object=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
)
For more information about the Airflow and BigQuery integration, please look at
the Python API Reference - bigquery
.