Trino to Google Cloud Storage Transfer Operator

Trino is an open source, fast, distributed SQL query engine for running interactive analytic queries against data sources of all sizes ranging from gigabytes to petabytes. Trino allows querying data where it lives, including Hive, Cassandra, relational databases or even proprietary data stores. A single Trino query can combine data from multiple sources, allowing for analytics across your entire organization.

Google Cloud Storage allows world-wide storage and retrieval of any amount of data at any time. You can use it to store backup and archive data as well as a data source for BigQuery.

Data transfer

Transfer files between Trino and Google Storage is performed with the TrinoToGCSOperator operator.

This operator has 3 required parameters:

  • sql - The SQL to execute.

  • bucket - The bucket to upload to.

  • filename - The filename to use as the object name when uploading to Google Cloud Storage. A {} should be specified in the filename to allow the operator to inject file numbers in cases where the file is split due to size.

All parameters are described in the reference documentation - TrinoToGCSOperator.

An example operator call might look like this:

tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py[source]

trino_to_gcs_basic = TrinoToGCSOperator(
    task_id="trino_to_gcs_basic",
    sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
)

Choice of data format

The operator supports two output formats:

  • json - JSON Lines (default)

  • csv

You can specify these options by the export_format parameter.

If you want a CSV file to be created, your operator call might look like this:

tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py[source]

trino_to_gcs_csv = TrinoToGCSOperator(
    task_id="trino_to_gcs_csv",
    sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.csv",
    schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
    export_format="csv",
)

Generating BigQuery schema

If you set schema_filename parameter, a .json file containing the BigQuery schema fields for the table will be dumped from the database and upload to the bucket.

If you want to create a schema file, then an example operator call might look like this:

tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py[source]

trino_to_gcs_multiple_types = TrinoToGCSOperator(
    task_id="trino_to_gcs_multiple_types",
    sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
    schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
    gzip=False,
)

For more information about the BigQuery schema, please look at Specifying schema in the Big Query documentation.

Division of the result into multiple files

This operator supports the ability to split large result into multiple files. The approx_max_file_size_bytes parameters allows developers to specify the file size of the splits. By default, the file has no more than 1 900 000 000 bytes (1900 MB)

Check Quotas & limits in Google Cloud Storage to see the maximum allowed file size for a single object.

If you want to create 10 MB files, your code might look like this:

tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py[source]

read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
    task_id="read_data_from_gcs_many_chunks",
    configuration={
        "query": {
            "query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
            f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
            "useLegacySql": False,
        }
    },
)

Querying data using the BigQuery

The data available in Google Cloud Storage can be used by BigQuery. You can load data to BigQuery or refer in queries directly to GCS data. For information about the loading data to the BigQuery, please look at Introduction to loading data from Cloud Storage in the BigQuery documentation. For information about the querying GCS data, please look at Querying Cloud Storage data in the BigQuery documentation.

Airflow also has numerous operators that allow you to create the use of BigQuery. For example, if you want to create an external table that allows you to create queries that read data directly from GCS, then you can use BigQueryCreateExternalTableOperator. Using this operator looks like this:

tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py[source]

create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
    task_id="create_external_table_multiple_types",
    bucket=GCS_BUCKET,
    table_resource={
        "tableReference": {
            "projectId": GCP_PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": f"{safe_name(SOURCE_SCHEMA_COLUMNS)}",
        },
        "schema": {
            "fields": [
                {"name": "table_catalog", "type": "STRING"},
                {"name": "table_schema", "type": "STRING"},
                {"name": "table_name", "type": "STRING"},
                {"name": "column_name", "type": "STRING"},
                {"name": "ordinal_position", "type": "INT64"},
                {"name": "column_default", "type": "STRING"},
                {"name": "is_nullable", "type": "STRING"},
                {"name": "data_type", "type": "STRING"},
            ],
        },
        "externalDataConfiguration": {
            "sourceFormat": "NEWLINE_DELIMITED_JSON",
            "compression": "NONE",
            "sourceUris": [f"gs://{GCS_BUCKET}/{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
        },
    },
    source_objects=[f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
    schema_object=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
)

For more information about the Airflow and BigQuery integration, please look at the Python API Reference - bigquery.

Reference

For further information, look at:

Was this entry helpful?