Google Cloud Spanner Operators

Prerequisite Tasks

SpannerDeployInstanceOperator

Creates a new Cloud Spanner instance, or if an instance with the same instance id exists in the specified project, updates the Cloud Spanner instance.

For parameter definition, take a look at SpannerDeployInstanceOperator.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_spanner.py[source]

spanner_instance_create_task = SpannerDeployInstanceOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=GCP_SPANNER_INSTANCE_ID,
    configuration_name=GCP_SPANNER_CONFIG_NAME,
    node_count=int(GCP_SPANNER_NODE_COUNT),
    display_name=GCP_SPANNER_DISPLAY_NAME,
    task_id='spanner_instance_create_task',
)
spanner_instance_update_task = SpannerDeployInstanceOperator(
    instance_id=GCP_SPANNER_INSTANCE_ID,
    configuration_name=GCP_SPANNER_CONFIG_NAME,
    node_count=int(GCP_SPANNER_NODE_COUNT) + 1,
    display_name=GCP_SPANNER_DISPLAY_NAME + '_updated',
    task_id='spanner_instance_update_task',
)

Templating

template_fields: Sequence[str] = (
    'project_id',
    'instance_id',
    'configuration_name',
    'display_name',
    'gcp_conn_id',
    'impersonation_chain',
)

More information

See Google Cloud Spanner API documentation to create a new instance.

SpannerDeleteDatabaseInstanceOperator

Deletes a database from the specified Cloud Spanner instance. If the database does not exist, no action is taken, and the operator succeeds.

For parameter definition, take a look at SpannerDeleteDatabaseInstanceOperator.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_spanner.py[source]

spanner_database_delete_task = SpannerDeleteDatabaseInstanceOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=GCP_SPANNER_INSTANCE_ID,
    database_id=GCP_SPANNER_DATABASE_ID,
    task_id='spanner_database_delete_task',
)
spanner_database_delete_task2 = SpannerDeleteDatabaseInstanceOperator(
    instance_id=GCP_SPANNER_INSTANCE_ID,
    database_id=GCP_SPANNER_DATABASE_ID,
    task_id='spanner_database_delete_task2',
)

Templating

template_fields: Sequence[str] = (
    'project_id',
    'instance_id',
    'gcp_conn_id',
    'impersonation_chain',
)

SpannerDeployDatabaseInstanceOperator

Creates a new Cloud Spanner database in the specified instance, or if the desired database exists, assumes success with no changes applied to database configuration. No structure of the database is verified - it's enough if the database exists with the same name.

For parameter definition, take a look at SpannerDeployDatabaseInstanceOperator.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_spanner.py[source]

spanner_database_deploy_task = SpannerDeployDatabaseInstanceOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=GCP_SPANNER_INSTANCE_ID,
    database_id=GCP_SPANNER_DATABASE_ID,
    ddl_statements=[
        "CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
        "CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
    ],
    task_id='spanner_database_deploy_task',
)
spanner_database_deploy_task2 = SpannerDeployDatabaseInstanceOperator(
    instance_id=GCP_SPANNER_INSTANCE_ID,
    database_id=GCP_SPANNER_DATABASE_ID,
    ddl_statements=[
        "CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
        "CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
    ],
    task_id='spanner_database_deploy_task2',
)

Templating

template_fields: Sequence[str] = (
    'project_id',
    'instance_id',
    'database_id',
    'ddl_statements',
    'gcp_conn_id',
    'impersonation_chain',
)
template_ext: Sequence[str] = ('.sql',)

More information

See Google Cloud Spanner API documentation to create a new database.

SpannerUpdateDatabaseInstanceOperator

Runs a DDL query in a Cloud Spanner database and allows you to modify the structure of an existing database.

You can optionally specify an operation_id parameter which simplifies determining whether the statements were executed in case the update_database call is replayed (idempotency check). The operation_id should be unique within the database, and must be a valid identifier: [a-z][a-z0-9_]*. More information can be found in the documentation of updateDdl API

For parameter definition take a look at SpannerUpdateDatabaseInstanceOperator.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_spanner.py[source]

spanner_database_update_task = SpannerUpdateDatabaseInstanceOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=GCP_SPANNER_INSTANCE_ID,
    database_id=GCP_SPANNER_DATABASE_ID,
    ddl_statements=[
        "CREATE TABLE my_table3 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
    ],
    task_id='spanner_database_update_task',
)

airflow/providers/google/cloud/example_dags/example_spanner.py[source]

spanner_database_update_idempotent1_task = SpannerUpdateDatabaseInstanceOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=GCP_SPANNER_INSTANCE_ID,
    database_id=GCP_SPANNER_DATABASE_ID,
    operation_id=OPERATION_ID,
    ddl_statements=[
        "CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
    ],
    task_id='spanner_database_update_idempotent1_task',
)
spanner_database_update_idempotent2_task = SpannerUpdateDatabaseInstanceOperator(
    instance_id=GCP_SPANNER_INSTANCE_ID,
    database_id=GCP_SPANNER_DATABASE_ID,
    operation_id=OPERATION_ID,
    ddl_statements=[
        "CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
    ],
    task_id='spanner_database_update_idempotent2_task',
)

Templating

template_fields: Sequence[str] = (
    'project_id',
    'instance_id',
    'database_id',
    'ddl_statements',
    'gcp_conn_id',
    'impersonation_chain',
)
template_ext: Sequence[str] = ('.sql',)

More information

See Google Cloud Spanner API documentation for database update_ddl.

SpannerQueryDatabaseInstanceOperator

Executes an arbitrary DML query (INSERT, UPDATE, DELETE).

For parameter definition take a look at SpannerQueryDatabaseInstanceOperator.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_spanner.py[source]

spanner_instance_query_task = SpannerQueryDatabaseInstanceOperator(
    project_id=GCP_PROJECT_ID,
    instance_id=GCP_SPANNER_INSTANCE_ID,
    database_id=GCP_SPANNER_DATABASE_ID,
    query=["DELETE FROM my_table2 WHERE true"],
    task_id='spanner_instance_query_task',
)
spanner_instance_query_task2 = SpannerQueryDatabaseInstanceOperator(
    instance_id=GCP_SPANNER_INSTANCE_ID,
    database_id=GCP_SPANNER_DATABASE_ID,
    query=["DELETE FROM my_table2 WHERE true"],
    task_id='spanner_instance_query_task2',
)

Templating

template_fields: Sequence[str] = (
    'project_id',
    'instance_id',
    'database_id',
    'query',
    'gcp_conn_id',
    'impersonation_chain',
)
template_ext: Sequence[str] = ('.sql',)

More information

See Google Cloud Spanner API documentation for more information about DML syntax.

SpannerDeleteInstanceOperator

Deletes a Cloud Spanner instance. If an instance does not exist, no action is taken, and the operator succeeds.

For parameter definition take a look at SpannerDeleteInstanceOperator.

Using the operator

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google Cloud connection used. Both variants are shown:

airflow/providers/google/cloud/example_dags/example_spanner.py[source]

spanner_instance_delete_task = SpannerDeleteInstanceOperator(
    project_id=GCP_PROJECT_ID, instance_id=GCP_SPANNER_INSTANCE_ID, task_id='spanner_instance_delete_task'
)
spanner_instance_delete_task2 = SpannerDeleteInstanceOperator(
    instance_id=GCP_SPANNER_INSTANCE_ID, task_id='spanner_instance_delete_task2'
)

Templating

template_fields: Sequence[str] = (
    'project_id',
    'instance_id',
    'gcp_conn_id',
    'impersonation_chain',
)

More information

See Google Cloud Spanner API documentation to delete an instance.

Reference

For further information, look at:

Was this entry helpful?