Using Operators¶
An operator represents a single, ideally idempotent, task. Operators determine what actually executes when your DAG runs.
See the Operators Concepts documentation and the Operators API Reference for more information.
- BashOperator
- PythonOperator
- Google Cloud Storage Operators
- Google Compute Engine Operators
- Google Cloud Bigtable Operators
- Google Cloud Functions Operators
- Google Cloud Spanner Operators
- Google Cloud Sql Operators
- Google Cloud Storage Operators
BashOperator¶
Use the BashOperator
to execute
commands in a Bash shell.
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)
Templating¶
You can use Jinja templates to parameterize the
bash_command
argument.
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)
Troubleshooting¶
Jinja template not found¶
Add a space after the script name when directly calling a Bash script with
the bash_command
argument. This is because Airflow tries to apply a Jinja
template to it, which will fail.
t2 = BashOperator(
task_id='bash_example',
# This fails with `Jinja template not found` error
# bash_command="/home/batcher/test.sh",
# This works (has a space after)
bash_command="/home/batcher/test.sh ",
dag=dag)
PythonOperator¶
Use the PythonOperator
to execute
Python callables.
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)
Passing in arguments¶
Use the op_args
and op_kwargs
arguments to pass additional arguments
to the Python callable.
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag,
)
run_this >> task
Templating¶
When you set the provide_context
argument to True
, Airflow passes in
an additional set of keyword arguments: one for each of the Jinja
template variables and a templates_dict
argument.
The templates_dict
argument is templated, so each value in the dictionary
is evaluated as a Jinja template.
Google Cloud Storage Operators¶
GoogleCloudStorageToBigQueryOperator¶
Use the
GoogleCloudStorageToBigQueryOperator
to execute a BigQuery load job.
Google Compute Engine Operators¶
GceInstanceStartOperator¶
Use the
GceInstanceStartOperator
to start an existing Google Compute Engine instance.
Arguments¶
The following examples of OS environment variables used to pass arguments to the operator:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
GCE_INSTANCE = os.environ.get('GCE_INSTANCE', 'testinstance')
Using the operator¶
The code to create the operator:
gce_instance_start = GceInstanceStartOperator(
project_id=GCP_PROJECT_ID,
zone=GCE_ZONE,
resource_id=GCE_INSTANCE,
task_id='gcp_compute_start_task'
)
You can also create the operator without project id - project id will be retrieved from the GCP connection id used:
gce_instance_start2 = GceInstanceStartOperator(
zone=GCE_ZONE,
resource_id=GCE_INSTANCE,
task_id='gcp_compute_start_task2'
)
Templating¶
template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version')
GceInstanceStopOperator¶
Use the operator to stop Google Compute Engine instance.
For parameter definition, take a look at
GceInstanceStopOperator
Arguments¶
The following examples of OS environment variables used to pass arguments to the operator:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
GCE_INSTANCE = os.environ.get('GCE_INSTANCE', 'testinstance')
Using the operator¶
The code to create the operator:
gce_instance_stop = GceInstanceStopOperator(
project_id=GCP_PROJECT_ID,
zone=GCE_ZONE,
resource_id=GCE_INSTANCE,
task_id='gcp_compute_stop_task'
)
You can also create the operator without project id - project id will be retrieved from the GCP connection used:
gce_instance_stop2 = GceInstanceStopOperator(
zone=GCE_ZONE,
resource_id=GCE_INSTANCE,
task_id='gcp_compute_stop_task2'
)
Templating¶
template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version')
GceSetMachineTypeOperator¶
Use the operator to change machine type of a Google Compute Engine instance.
For parameter definition, take a look at
GceSetMachineTypeOperator
.
Arguments¶
The following examples of OS environment variables used to pass arguments to the operator:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
GCE_INSTANCE = os.environ.get('GCE_INSTANCE', 'testinstance')
GCE_SHORT_MACHINE_TYPE_NAME = os.environ.get('GCE_SHORT_MACHINE_TYPE_NAME', 'n1-standard-1')
SET_MACHINE_TYPE_BODY = {
'machineType': 'zones/{}/machineTypes/{}'.format(GCE_ZONE, GCE_SHORT_MACHINE_TYPE_NAME)
}
Using the operator¶
The code to create the operator:
gce_set_machine_type = GceSetMachineTypeOperator(
project_id=GCP_PROJECT_ID,
zone=GCE_ZONE,
resource_id=GCE_INSTANCE,
body=SET_MACHINE_TYPE_BODY,
task_id='gcp_compute_set_machine_type'
)
You can also create the operator without project id - project id will be retrieved from the GCP connection used:
gce_set_machine_type2 = GceSetMachineTypeOperator(
zone=GCE_ZONE,
resource_id=GCE_INSTANCE,
body=SET_MACHINE_TYPE_BODY,
task_id='gcp_compute_set_machine_type2'
)
Templating¶
template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version')
GceInstanceTemplateCopyOperator¶
Use the operator to copy an existing Google Compute Engine instance template applying a patch to it.
For parameter definition, take a look at
GceInstanceTemplateCopyOperator
.
Arguments¶
The following examples of OS environment variables used to pass arguments to the operator:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
GCE_TEMPLATE_NAME = os.environ.get('GCE_TEMPLATE_NAME', 'instance-template-test')
GCE_NEW_TEMPLATE_NAME = os.environ.get('GCE_NEW_TEMPLATE_NAME',
'instance-template-test-new')
GCE_NEW_DESCRIPTION = os.environ.get('GCE_NEW_DESCRIPTION', 'Test new description')
GCE_INSTANCE_TEMPLATE_BODY_UPDATE = {
"name": GCE_NEW_TEMPLATE_NAME,
"description": GCE_NEW_DESCRIPTION,
"properties": {
"machineType": "n1-standard-2"
}
}
Using the operator¶
The code to create the operator:
gce_instance_template_copy = GceInstanceTemplateCopyOperator(
project_id=GCP_PROJECT_ID,
resource_id=GCE_TEMPLATE_NAME,
body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
task_id='gcp_compute_igm_copy_template_task'
)
You can also create the operator without project id - project id will be retrieved from the GCP connection used:
gce_instance_template_copy2 = GceInstanceTemplateCopyOperator(
resource_id=GCE_TEMPLATE_NAME,
body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
task_id='gcp_compute_igm_copy_template_task_2'
)
Templating¶
template_fields = ('project_id', 'resource_id', 'request_id',
'gcp_conn_id', 'api_version')
GceInstanceGroupManagerUpdateTemplateOperator¶
Use the operator to update template in Google Compute Engine Instance Group Manager.
For parameter definition, take a look at
GceInstanceGroupManagerUpdateTemplateOperator
.
Arguments¶
The following examples of OS environment variables used to pass arguments to the operator:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
GCE_INSTANCE_GROUP_MANAGER_NAME = os.environ.get('GCE_INSTANCE_GROUP_MANAGER_NAME',
'instance-group-test')
SOURCE_TEMPLATE_URL = os.environ.get(
'SOURCE_TEMPLATE_URL',
"https://www.googleapis.com/compute/beta/projects/" + GCP_PROJECT_ID +
"/global/instanceTemplates/instance-template-test")
DESTINATION_TEMPLATE_URL = os.environ.get(
'DESTINATION_TEMPLATE_URL',
"https://www.googleapis.com/compute/beta/projects/" + GCP_PROJECT_ID +
"/global/instanceTemplates/" + GCE_NEW_TEMPLATE_NAME)
UPDATE_POLICY = {
"type": "OPPORTUNISTIC",
"minimalAction": "RESTART",
"maxSurge": {
"fixed": 1
},
"minReadySec": 1800
}
Using the operator¶
The code to create the operator:
gce_instance_group_manager_update_template = \
GceInstanceGroupManagerUpdateTemplateOperator(
project_id=GCP_PROJECT_ID,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
zone=GCE_ZONE,
source_template=SOURCE_TEMPLATE_URL,
destination_template=DESTINATION_TEMPLATE_URL,
update_policy=UPDATE_POLICY,
task_id='gcp_compute_igm_group_manager_update_template'
)
You can also create the operator without project id - project id will be retrieved from the GCP connection used:
gce_instance_group_manager_update_template2 = \
GceInstanceGroupManagerUpdateTemplateOperator(
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
zone=GCE_ZONE,
source_template=SOURCE_TEMPLATE_URL,
destination_template=DESTINATION_TEMPLATE_URL,
task_id='gcp_compute_igm_group_manager_update_template_2'
)
Templating¶
template_fields = ('project_id', 'resource_id', 'zone', 'request_id',
'source_template', 'destination_template',
'gcp_conn_id', 'api_version')
Troubleshooting¶
You might find that your GceInstanceGroupManagerUpdateTemplateOperator fails with missing permissions. To execute the operation, the service account requires the permissions that theService Account User role provides (assigned via Google Cloud IAM).
Google Cloud Bigtable Operators¶
All examples below rely on the following variables, which can be passed via environment variables.
GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project')
CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id')
CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name')
CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2')
CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}')
CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id')
CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b')
CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3')
CBT_CLUSTER_NODES_UPDATED = getenv('CBT_CLUSTER_NODES_UPDATED', '5')
CBT_CLUSTER_STORAGE_TYPE = getenv('CBT_CLUSTER_STORAGE_TYPE', '2')
CBT_TABLE_ID = getenv('CBT_TABLE_ID', 'some-table-id')
CBT_POKE_INTERVAL = getenv('CBT_POKE_INTERVAL', '60')
BigtableInstanceCreateOperator¶
Use the BigtableInstanceCreateOperator
to create a Google Cloud Bigtable instance.
If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration and immediately succeeds. No changes are made to the existing instance.
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
create_instance_task = BigtableInstanceCreateOperator(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=int(CBT_INSTANCE_TYPE),
instance_labels=json.loads(CBT_INSTANCE_LABELS),
cluster_nodes=int(CBT_CLUSTER_NODES),
cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
task_id='create_instance_task',
)
create_instance_task2 = BigtableInstanceCreateOperator(
instance_id=CBT_INSTANCE_ID,
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=int(CBT_INSTANCE_TYPE),
instance_labels=json.loads(CBT_INSTANCE_LABELS),
cluster_nodes=int(CBT_CLUSTER_NODES),
cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
task_id='create_instance_task2',
)
create_instance_task >> create_instance_task2
BigtableInstanceDeleteOperator¶
Use the BigtableInstanceDeleteOperator
to delete a Google Cloud Bigtable instance.
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
delete_instance_task = BigtableInstanceDeleteOperator(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
task_id='delete_instance_task',
)
delete_instance_task2 = BigtableInstanceDeleteOperator(
instance_id=CBT_INSTANCE_ID,
task_id='delete_instance_task2',
)
BigtableClusterUpdateOperator¶
Use the BigtableClusterUpdateOperator
to modify number of nodes in a Cloud Bigtable cluster.
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
cluster_update_task = BigtableClusterUpdateOperator(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
cluster_id=CBT_CLUSTER_ID,
nodes=int(CBT_CLUSTER_NODES_UPDATED),
task_id='update_cluster_task',
)
cluster_update_task2 = BigtableClusterUpdateOperator(
instance_id=CBT_INSTANCE_ID,
cluster_id=CBT_CLUSTER_ID,
nodes=int(CBT_CLUSTER_NODES_UPDATED),
task_id='update_cluster_task2',
)
cluster_update_task >> cluster_update_task2
BigtableTableCreateOperator¶
Creates a table in a Cloud Bigtable instance.
If the table with given ID exists in the Cloud Bigtable instance, the operator compares the Column Families. If the Column Families are identical operator succeeds. Otherwise, the operator fails with the appropriate error message.
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
create_table_task = BigtableTableCreateOperator(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id='create_table',
)
create_table_task2 = BigtableTableCreateOperator(
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id='create_table_task2',
)
create_table_task >> create_table_task2
Advanced¶
When creating a table, you can specify the optional initial_split_keys
and column_familes
.
Please refer to the Python Client for Google Cloud Bigtable documentation
for Table and for Column
Families.
BigtableTableDeleteOperator¶
Use the BigtableTableDeleteOperator
to delete a table in Google Cloud Bigtable.
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
delete_table_task = BigtableTableDeleteOperator(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id='delete_table_task',
)
delete_table_task2 = BigtableTableDeleteOperator(
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id='delete_table_task2',
)
BigtableTableWaitForReplicationSensor¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
Use the BigtableTableWaitForReplicationSensor
to wait for the table to replicate fully.
The same arguments apply to this sensor as the BigtableTableCreateOperator.
Note: If the table or the Cloud Bigtable instance does not exist, this sensor waits for the table until timeout hits and does not raise any exception.
Using the operator¶
wait_for_table_replication_task = BigtableTableWaitForReplicationSensor(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
poke_interval=int(CBT_POKE_INTERVAL),
timeout=180,
task_id='wait_for_table_replication_task',
)
wait_for_table_replication_task2 = BigtableTableWaitForReplicationSensor(
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
poke_interval=int(CBT_POKE_INTERVAL),
timeout=180,
task_id='wait_for_table_replication_task2',
)
Google Cloud Functions Operators¶
GcfFunctionDeleteOperator¶
Use the operator to delete a function from Google Cloud Functions.
For parameter definition, take a look at
GcfFunctionDeleteOperator
.
Arguments¶
The following examples of OS environment variables show how you can build function name to use in the operator:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
GCF_SHORT_FUNCTION_NAME = os.environ.get('GCF_SHORT_FUNCTION_NAME', 'hello').\
replace("-", "_") # make sure there are no dashes in function name (!)
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
GCP_LOCATION,
GCF_SHORT_FUNCTION_NAME)
Using the operator¶
delete_task = GcfFunctionDeleteOperator(
task_id="gcf_delete_task",
name=FUNCTION_NAME
)
Templating¶
template_fields = ('name', 'gcp_conn_id', 'api_version')
GcfFunctionDeployOperator¶
Use the operator to deploy a function to Google Cloud Functions. If a function with this name already exists, it will be updated.
For parameter definition, take a look at
GcfFunctionDeployOperator
.
Arguments¶
In the example DAG the following environment variables are used to parameterize the operator’s definition:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
GCF_SHORT_FUNCTION_NAME = os.environ.get('GCF_SHORT_FUNCTION_NAME', 'hello').\
replace("-", "_") # make sure there are no dashes in function name (!)
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(GCP_PROJECT_ID,
GCP_LOCATION,
GCF_SHORT_FUNCTION_NAME)
GCF_SOURCE_ARCHIVE_URL = os.environ.get('GCF_SOURCE_ARCHIVE_URL', '')
GCF_SOURCE_UPLOAD_URL = os.environ.get('GCF_SOURCE_UPLOAD_URL', '')
GCF_SOURCE_REPOSITORY = os.environ.get(
'GCF_SOURCE_REPOSITORY',
'https://source.developers.google.com/'
'projects/{}/repos/hello-world/moveable-aliases/master'.format(GCP_PROJECT_ID))
GCF_ZIP_PATH = os.environ.get('GCF_ZIP_PATH', '')
GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld')
GCF_RUNTIME = 'nodejs6'
GCP_VALIDATE_BODY = os.environ.get('GCP_VALIDATE_BODY', True)
Some of those variables are used to create the request’s body:
body = {
"name": FUNCTION_NAME,
"entryPoint": GCF_ENTRYPOINT,
"runtime": GCF_RUNTIME,
"httpsTrigger": {}
}
When a DAG is created, the default_args dictionary can be used to pass arguments common with other tasks:
default_args = {
'start_date': dates.days_ago(1)
}
Note that the neither the body nor the default args are complete in the above examples.
Depending on the variables set, there might be different variants on how to pass source
code related fields. Currently, you can pass either sourceArchiveUrl
,
sourceRepository
or sourceUploadUrl
as described in the
Cloud Functions API specification.
Additionally, default_args
or direct operator args might contain zip_path
parameter
to run the extra step of uploading the source code before deploying it.
In this case, you also need to provide an empty sourceUploadUrl
parameter in the body.
Using the operator¶
Depending on the combination of parameters, the Function’s source code can be obtained from different sources:
if GCF_SOURCE_ARCHIVE_URL:
body['sourceArchiveUrl'] = GCF_SOURCE_ARCHIVE_URL
elif GCF_SOURCE_REPOSITORY:
body['sourceRepository'] = {
'url': GCF_SOURCE_REPOSITORY
}
elif GCF_ZIP_PATH:
body['sourceUploadUrl'] = ''
default_args['zip_path'] = GCF_ZIP_PATH
elif GCF_SOURCE_UPLOAD_URL:
body['sourceUploadUrl'] = GCF_SOURCE_UPLOAD_URL
else:
raise Exception("Please provide one of the source_code parameters")
The code to create the operator:
deploy_task = GcfFunctionDeployOperator(
task_id="gcf_deploy_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=body,
validate_body=GCP_VALIDATE_BODY
)
You can also create the operator without project id - project id will be retrieved from the GCP connection used:
deploy2_task = GcfFunctionDeployOperator(
task_id="gcf_deploy2_task",
location=GCP_LOCATION,
body=body,
validate_body=GCP_VALIDATE_BODY
)
Templating¶
template_fields = ('project_id', 'location', 'gcp_conn_id', 'api_version')
Troubleshooting¶
If during the deploy you see an error similar to:
“HttpError 403: Missing necessary permission iam.serviceAccounts.actAs for on resource project-name@appspot.gserviceaccount.com. Please grant the roles/iam.serviceAccountUser role.”
it means that your service account does not have the correct Cloud IAM permissions.
- Assign your Service Account the Cloud Functions Developer role.
- Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime service account.
The typical way of assigning Cloud IAM permissions with gcloud is shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
gcloud iam service-accounts add-iam-policy-binding \
PROJECT_ID@appspot.gserviceaccount.com \
--member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
--role="roles/iam.serviceAccountUser"
You can also do that via the GCP Web console.
See Adding the IAM service agent user role to the runtime service for details.
If the source code for your function is in Google Source Repository, make sure that your service account has the Source Repository Viewer role so that the source code can be downloaded if necessary.
Google Cloud Spanner Operators¶
CloudSpannerInstanceDatabaseDeleteOperator¶
Deletes a database from the specified Cloud Spanner instance. If the database does not exist, no action is taken, and the operator succeeds.
For parameter definition, take a look at
CloudSpannerInstanceDatabaseDeleteOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables.
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
'projects/example-project/instanceConfigs/eur3')
GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
# OPERATION_ID should be unique per operation
OPERATION_ID = 'unique_operation_id'
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
spanner_database_delete_task = CloudSpannerInstanceDatabaseDeleteOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
task_id='spanner_database_delete_task'
)
spanner_database_delete_task2 = CloudSpannerInstanceDatabaseDeleteOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
task_id='spanner_database_delete_task2'
)
Templating¶
template_fields = ('project_id', 'instance_id', 'gcp_conn_id')
CloudSpannerInstanceDatabaseDeployOperator¶
Creates a new Cloud Spanner database in the specified instance, or if the desired database exists, assumes success with no changes applied to database configuration. No structure of the database is verified - it’s enough if the database exists with the same name.
For parameter definition, take a look at
CloudSpannerInstanceDatabaseDeployOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables.
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
'projects/example-project/instanceConfigs/eur3')
GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
# OPERATION_ID should be unique per operation
OPERATION_ID = 'unique_operation_id'
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
spanner_database_deploy_task = CloudSpannerInstanceDatabaseDeployOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
"CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id='spanner_database_deploy_task'
)
spanner_database_deploy_task2 = CloudSpannerInstanceDatabaseDeployOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
"CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id='spanner_database_deploy_task2'
)
Templating¶
template_fields = ('project_id', 'instance_id', 'database_id', 'ddl_statements',
'gcp_conn_id')
template_ext = ('.sql', )
More information¶
See Google Cloud Spanner API documentation for database create
CloudSpannerInstanceDatabaseUpdateOperator¶
Runs a DDL query in a Cloud Spanner database and allows you to modify the structure of an existing database.
You can optionally specify an operation_id parameter which simplifies determining whether the statements were executed in case the update_database call is replayed (idempotency check). The operation_id should be unique within the database, and must be a valid identifier: [a-z][a-z0-9_]*. More information can be found in the documentation of updateDdl API
For parameter definition take a look at
CloudSpannerInstanceDatabaseUpdateOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables.
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
'projects/example-project/instanceConfigs/eur3')
GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
# OPERATION_ID should be unique per operation
OPERATION_ID = 'unique_operation_id'
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
spanner_database_update_task = CloudSpannerInstanceDatabaseUpdateOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table3 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id='spanner_database_update_task'
)
spanner_database_update_idempotent1_task = CloudSpannerInstanceDatabaseUpdateOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
operation_id=OPERATION_ID,
ddl_statements=[
"CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id='spanner_database_update_idempotent1_task'
)
spanner_database_update_idempotent2_task = CloudSpannerInstanceDatabaseUpdateOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
operation_id=OPERATION_ID,
ddl_statements=[
"CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id='spanner_database_update_idempotent2_task'
)
Templating¶
template_fields = ('project_id', 'instance_id', 'database_id', 'ddl_statements',
'gcp_conn_id')
template_ext = ('.sql', )
More information¶
See Google Cloud Spanner API documentation for database update_ddl.
CloudSpannerInstanceDatabaseQueryOperator¶
Executes an arbitrary DML query (INSERT, UPDATE, DELETE).
For parameter definition take a look at
CloudSpannerInstanceDatabaseQueryOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables.
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
'projects/example-project/instanceConfigs/eur3')
GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
# OPERATION_ID should be unique per operation
OPERATION_ID = 'unique_operation_id'
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
spanner_instance_query_task = CloudSpannerInstanceDatabaseQueryOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
query=["DELETE FROM my_table2 WHERE true"],
task_id='spanner_instance_query_task'
)
spanner_instance_query_task2 = CloudSpannerInstanceDatabaseQueryOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
query=["DELETE FROM my_table2 WHERE true"],
task_id='spanner_instance_query_task2'
)
Templating¶
template_fields = ('project_id', 'instance_id', 'database_id', 'query', 'gcp_conn_id')
template_ext = ('.sql',)
More information¶
See Google Cloud Spanner API documentation for the DML syntax.
CloudSpannerInstanceDeleteOperator¶
Deletes a Cloud Spanner instance. If an instance does not exist, no action is taken, and the operator succeeds.
For parameter definition take a look at
CloudSpannerInstanceDeleteOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_SPANNER_INSTANCE_ID = os.environ.get('GCP_SPANNER_INSTANCE_ID', 'testinstance')
GCP_SPANNER_DATABASE_ID = os.environ.get('GCP_SPANNER_DATABASE_ID', 'testdatabase')
GCP_SPANNER_CONFIG_NAME = os.environ.get('GCP_SPANNER_CONFIG_NAME',
'projects/example-project/instanceConfigs/eur3')
GCP_SPANNER_NODE_COUNT = os.environ.get('GCP_SPANNER_NODE_COUNT', '1')
GCP_SPANNER_DISPLAY_NAME = os.environ.get('GCP_SPANNER_DISPLAY_NAME', 'Test Instance')
# OPERATION_ID should be unique per operation
OPERATION_ID = 'unique_operation_id'
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
spanner_instance_delete_task = CloudSpannerInstanceDeleteOperator(
project_id=GCP_PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
task_id='spanner_instance_delete_task'
)
spanner_instance_delete_task2 = CloudSpannerInstanceDeleteOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
task_id='spanner_instance_delete_task2'
)
Templating¶
template_fields = ('project_id', 'instance_id', 'gcp_conn_id')
Google Cloud Sql Operators¶
CloudSqlInstanceDatabaseCreateOperator¶
Creates a new database inside a Cloud SQL instance.
For parameter definition, take a look at
CloudSqlInstanceDatabaseCreateOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql')
INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2')
DB_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'testdb')
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
project_id=GCP_PROJECT_ID,
body=db_create_body,
instance=INSTANCE_NAME,
task_id='sql_db_create_task'
)
sql_db_create_task2 = CloudSqlInstanceDatabaseCreateOperator(
body=db_create_body,
instance=INSTANCE_NAME,
task_id='sql_db_create_task2'
)
Example request body:
db_create_body = {
"instance": INSTANCE_NAME,
"name": DB_NAME,
"project": GCP_PROJECT_ID
}
Templating¶
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
CloudSqlInstanceDatabaseDeleteOperator¶
Deletes a database from a Cloud SQL instance.
For parameter definition, take a look at
CloudSqlInstanceDatabaseDeleteOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql')
INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2')
DB_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'testdb')
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
project_id=GCP_PROJECT_ID,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id='sql_db_delete_task'
)
sql_db_delete_task2 = CloudSqlInstanceDatabaseDeleteOperator(
instance=INSTANCE_NAME,
database=DB_NAME,
task_id='sql_db_delete_task2'
)
Templating¶
template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
'api_version')
CloudSqlInstanceDatabasePatchOperator¶
Updates a resource containing information about a database inside a Cloud SQL instance using patch semantics. See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
For parameter definition, take a look at
CloudSqlInstanceDatabasePatchOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql')
INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2')
DB_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'testdb')
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
project_id=GCP_PROJECT_ID,
body=db_patch_body,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id='sql_db_patch_task'
)
sql_db_patch_task2 = CloudSqlInstanceDatabasePatchOperator(
body=db_patch_body,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id='sql_db_patch_task2'
)
Example request body:
db_patch_body = {
"charset": "utf16",
"collation": "utf16_general_ci"
}
Templating¶
template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
'api_version')
CloudSqlInstanceDeleteOperator¶
Deletes a Cloud SQL instance in Google Cloud Platform.
For parameter definition, take a look at
CloudSqlInstanceDeleteOperator
.
Arguments¶
Some arguments in the example DAG are taken from OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql')
INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2')
DB_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'testdb')
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
project_id=GCP_PROJECT_ID,
instance=INSTANCE_NAME,
task_id='sql_instance_delete_task'
)
sql_instance_delete_task2 = CloudSqlInstanceDeleteOperator(
instance=INSTANCE_NAME2,
task_id='sql_instance_delete_task2'
)
Templating¶
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
CloudSqlInstanceExportOperator¶
Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump or CSV file.
Note: This operator is idempotent. If executed multiple times with the same export file URI, the export file in GCS will simply be overridden.
For parameter definition take a look at
CloudSqlInstanceExportOperator
.
Arguments¶
Some arguments in the example DAG are taken from Airflow variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql')
INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2')
DB_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'testdb')
EXPORT_URI = os.environ.get('GCSQL_MYSQL_EXPORT_URI', 'gs://bucketName/fileName')
IMPORT_URI = os.environ.get('GCSQL_MYSQL_IMPORT_URI', 'gs://bucketName/fileName')
Example body defining the export operation:
export_body = {
"exportContext": {
"fileType": "sql",
"uri": EXPORT_URI,
"sqlExportOptions": {
"schemaOnly": False
}
}
}
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
sql_export_task = CloudSqlInstanceExportOperator(
project_id=GCP_PROJECT_ID,
body=export_body,
instance=INSTANCE_NAME,
task_id='sql_export_task'
)
sql_export_task2 = CloudSqlInstanceExportOperator(
body=export_body,
instance=INSTANCE_NAME,
task_id='sql_export_task2'
)
Templating¶
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
Troubleshooting¶
If you receive an “Unauthorized” error in GCP, make sure that the service account of the Cloud SQL instance is authorized to write to the selected GCS bucket.
It is not the service account configured in Airflow that communicates with GCS, but rather the service account of the particular Cloud SQL instance.
To grant the service account with the appropriate WRITE permissions for the GCS bucket
you can use the GoogleCloudStorageBucketCreateAclEntryOperator
,
as shown in the example:
sql_gcp_add_bucket_permission_task = GoogleCloudStorageBucketCreateAclEntryOperator(
entity="user-{{ task_instance.xcom_pull("
"'sql_instance_create_task', key='service_account_email') "
"}}",
role="WRITER",
bucket=export_url_split[1], # netloc (bucket)
task_id='sql_gcp_add_bucket_permission_task'
)
CloudSqlInstanceImportOperator¶
Imports data into a Cloud SQL instance from a SQL dump or CSV file in Cloud Storage.
CSV import:¶
This operator is NOT idempotent for a CSV import. If the same file is imported multiple times, the imported data will be duplicated in the database. Moreover, if there are any unique constraints the duplicate import may result in an error.
SQL import:¶
This operator is idempotent for a SQL import if it was also exported by Cloud SQL. The exported SQL contains ‘DROP TABLE IF EXISTS’ statements for all tables to be imported.
If the import file was generated in a different way, idempotence is not guaranteed. It has to be ensured on the SQL file level.
For parameter definition take a look at
CloudSqlInstanceImportOperator
.
Arguments¶
Some arguments in the example DAG are taken from Airflow variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql')
INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2')
DB_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'testdb')
EXPORT_URI = os.environ.get('GCSQL_MYSQL_EXPORT_URI', 'gs://bucketName/fileName')
IMPORT_URI = os.environ.get('GCSQL_MYSQL_IMPORT_URI', 'gs://bucketName/fileName')
Example body defining the import operation:
import_body = {
"importContext": {
"fileType": "sql",
"uri": IMPORT_URI
}
}
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
sql_import_task = CloudSqlInstanceImportOperator(
project_id=GCP_PROJECT_ID,
body=import_body,
instance=INSTANCE_NAME2,
task_id='sql_import_task'
)
sql_import_task2 = CloudSqlInstanceImportOperator(
body=import_body,
instance=INSTANCE_NAME2,
task_id='sql_import_task2'
)
Templating¶
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
Troubleshooting¶
If you receive an “Unauthorized” error in GCP, make sure that the service account of the Cloud SQL instance is authorized to read from the selected GCS object.
It is not the service account configured in Airflow that communicates with GCS, but rather the service account of the particular Cloud SQL instance.
To grant the service account with the appropriate READ permissions for the GCS object
you can use the GoogleCloudStorageObjectCreateAclEntryOperator
,
as shown in the example:
sql_gcp_add_object_permission_task = GoogleCloudStorageObjectCreateAclEntryOperator(
entity="user-{{ task_instance.xcom_pull("
"'sql_instance_create_task2', key='service_account_email')"
" }}",
role="READER",
bucket=import_url_split[1], # netloc (bucket)
object_name=import_url_split[2][1:], # path (strip first '/')
task_id='sql_gcp_add_object_permission_task',
)
prev_task = next_dep(sql_gcp_add_object_permission_task, prev_task)
# For import to work we also need to add the Cloud SQL instance's Service Account
# write access to the whole bucket!.
sql_gcp_add_bucket_permission_2_task = GoogleCloudStorageBucketCreateAclEntryOperator(
entity="user-{{ task_instance.xcom_pull("
"'sql_instance_create_task2', key='service_account_email') "
"}}",
role="WRITER",
bucket=import_url_split[1], # netloc
task_id='sql_gcp_add_bucket_permission_2_task',
)
CloudSqlInstanceCreateOperator¶
Creates a new Cloud SQL instance in Google Cloud Platform.
For parameter definition, take a look at
CloudSqlInstanceCreateOperator
.
If an instance with the same name exists, no action will be taken and the operator will succeed.
Arguments¶
Some arguments in the example DAG are taken from OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql')
INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2')
DB_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'testdb')
Example body defining the instance:
body = {
"name": INSTANCE_NAME,
"settings": {
"tier": "db-n1-standard-1",
"backupConfiguration": {
"binaryLogEnabled": True,
"enabled": True,
"startTime": "05:00"
},
"activationPolicy": "ALWAYS",
"dataDiskSizeGb": 30,
"dataDiskType": "PD_SSD",
"databaseFlags": [],
"ipConfiguration": {
"ipv4Enabled": True,
"requireSsl": True,
},
"locationPreference": {
"zone": "europe-west4-a"
},
"maintenanceWindow": {
"hour": 5,
"day": 7,
"updateTrack": "canary"
},
"pricingPlan": "PER_USE",
"replicationType": "ASYNCHRONOUS",
"storageAutoResize": False,
"storageAutoResizeLimit": 0,
"userLabels": {
"my-key": "my-value"
}
},
"databaseVersion": "MYSQL_5_7",
"region": "europe-west4",
}
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
sql_instance_create_task = CloudSqlInstanceCreateOperator(
project_id=GCP_PROJECT_ID,
body=body,
instance=INSTANCE_NAME,
task_id='sql_instance_create_task'
)
Templating¶
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
CloudSqlInstancePatchOperator¶
Updates settings of a Cloud SQL instance in Google Cloud Platform (partial update).
For parameter definition, take a look at
CloudSqlInstancePatchOperator
.
This is a partial update, so only values for the settings specified in the body will be set / updated. The rest of the existing instance’s configuration will remain unchanged.
Arguments¶
Some arguments in the example DAG are taken from OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME', 'test-mysql')
INSTANCE_NAME2 = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME2', 'test-mysql2')
DB_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'testdb')
Example body defining the instance:
patch_body = {
"name": INSTANCE_NAME,
"settings": {
"dataDiskSizeGb": 35,
"maintenanceWindow": {
"hour": 3,
"day": 6,
"updateTrack": "canary"
},
"userLabels": {
"my-key-patch": "my-value-patch"
}
}
}
Using the operator¶
You can create the operator with or without project id. If project id is missing it will be retrieved from the GCP connection used. Both variants are shown:
sql_instance_patch_task = CloudSqlInstancePatchOperator(
project_id=GCP_PROJECT_ID,
body=patch_body,
instance=INSTANCE_NAME,
task_id='sql_instance_patch_task'
)
sql_instance_patch_task2 = CloudSqlInstancePatchOperator(
body=patch_body,
instance=INSTANCE_NAME,
task_id='sql_instance_patch_task2'
)
Templating¶
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
CloudSqlQueryOperator¶
Performs DDL or DML SQL queries in Google Cloud SQL instance. The DQL (retrieving data from Google Cloud SQL) is not supported. You might run the SELECT queries, but the results of those queries are discarded.
You can specify various connectivity methods to connect to running instance, starting from public IP plain connection through public IP with SSL or both TCP and socket connection via Cloud SQL Proxy. The proxy is downloaded and started/stopped dynamically as needed by the operator.
There is a gcpcloudsql:// connection type that you should use to define what
kind of connectivity you want the operator to use. The connection is a “meta”
type of connection. It is not used to make an actual connectivity on its own, but it
determines whether Cloud SQL Proxy should be started by CloudSqlDatabaseHook
and what kind of database connection (Postgres or MySQL) should be created
dynamically to connect to Cloud SQL via public IP address or via the proxy.
The ‘CloudSqlDatabaseHook` uses
CloudSqlProxyRunner
to manage Cloud SQL
Proxy lifecycle (each task has its own Cloud SQL Proxy)
When you build connection, you should use connection parameters as described in
CloudSqlDatabaseHook
. You can see
examples of connections below for all the possible types of connectivity. Such connection
can be reused between different tasks (instances of CloudSqlQueryOperator). Each
task will get their own proxy started if needed with their own TCP or UNIX socket.
For parameter definition, take a look at
CloudSqlQueryOperator
.
Since query operator can run arbitrary query, it cannot be guaranteed to be idempotent. SQL query designer should design the queries to be idempotent. For example, both Postgres and MySQL support CREATE TABLE IF NOT EXISTS statements that can be used to create tables in an idempotent way.
Arguments¶
If you define connection via AIRFLOW_CONN_* URL defined in an environment variable, make sure the URL components in the URL are URL-encoded. See examples below for details.
Note that in case of SSL connections you need to have a mechanism to make the certificate/key files available in predefined locations for all the workers on which the operator can run. This can be provided for example by mounting NFS-like volumes in the same path for all the workers.
Some arguments in the example DAG are taken from the OS environment variables:
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_REGION = os.environ.get('GCP_REGION', 'europe-west-1b')
GCSQL_POSTGRES_INSTANCE_NAME_QUERY = os.environ.get(
'GCSQL_POSTGRES_INSTANCE_NAME_QUERY',
'testpostgres')
GCSQL_POSTGRES_DATABASE_NAME = os.environ.get('GCSQL_POSTGRES_DATABASE_NAME',
'postgresdb')
GCSQL_POSTGRES_USER = os.environ.get('GCSQL_POSTGRES_USER', 'postgres_user')
GCSQL_POSTGRES_PASSWORD = os.environ.get('GCSQL_POSTGRES_PASSWORD', 'password')
GCSQL_POSTGRES_PUBLIC_IP = os.environ.get('GCSQL_POSTGRES_PUBLIC_IP', '0.0.0.0')
GCSQL_POSTGRES_PUBLIC_PORT = os.environ.get('GCSQL_POSTGRES_PUBLIC_PORT', 5432)
GCSQL_POSTGRES_CLIENT_CERT_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_CERT_FILE',
".key/postgres-client-cert.pem")
GCSQL_POSTGRES_CLIENT_KEY_FILE = os.environ.get('GCSQL_POSTGRES_CLIENT_KEY_FILE',
".key/postgres-client-key.pem")
GCSQL_POSTGRES_SERVER_CA_FILE = os.environ.get('GCSQL_POSTGRES_SERVER_CA_FILE',
".key/postgres-server-ca.pem")
GCSQL_MYSQL_INSTANCE_NAME_QUERY = os.environ.get('GCSQL_MYSQL_INSTANCE_NAME_QUERY',
'testmysql')
GCSQL_MYSQL_DATABASE_NAME = os.environ.get('GCSQL_MYSQL_DATABASE_NAME', 'mysqldb')
GCSQL_MYSQL_USER = os.environ.get('GCSQL_MYSQL_USER', 'mysql_user')
GCSQL_MYSQL_PASSWORD = os.environ.get('GCSQL_MYSQL_PASSWORD', 'password')
GCSQL_MYSQL_PUBLIC_IP = os.environ.get('GCSQL_MYSQL_PUBLIC_IP', '0.0.0.0')
GCSQL_MYSQL_PUBLIC_PORT = os.environ.get('GCSQL_MYSQL_PUBLIC_PORT', 3306)
GCSQL_MYSQL_CLIENT_CERT_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_CERT_FILE',
".key/mysql-client-cert.pem")
GCSQL_MYSQL_CLIENT_KEY_FILE = os.environ.get('GCSQL_MYSQL_CLIENT_KEY_FILE',
".key/mysql-client-key.pem")
GCSQL_MYSQL_SERVER_CA_FILE = os.environ.get('GCSQL_MYSQL_SERVER_CA_FILE',
".key/mysql-server-ca.pem")
SQL = [
'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)', # shows warnings logged
'INSERT INTO TABLE_TEST VALUES (0)',
'CREATE TABLE IF NOT EXISTS TABLE_TEST2 (I INTEGER)',
'DROP TABLE TABLE_TEST',
'DROP TABLE TABLE_TEST2',
]
Example connection definitions for all connectivity cases. Note that all the components of the connection URI should be URL-encoded:
HOME_DIR = expanduser("~")
def get_absolute_path(path):
if path.startswith("/"):
return path
else:
return os.path.join(HOME_DIR, path)
postgres_kwargs = dict(
user=quote_plus(GCSQL_POSTGRES_USER),
password=quote_plus(GCSQL_POSTGRES_PASSWORD),
public_port=GCSQL_POSTGRES_PUBLIC_PORT,
public_ip=quote_plus(GCSQL_POSTGRES_PUBLIC_IP),
project_id=quote_plus(GCP_PROJECT_ID),
location=quote_plus(GCP_REGION),
instance=quote_plus(GCSQL_POSTGRES_INSTANCE_NAME_QUERY),
database=quote_plus(GCSQL_POSTGRES_DATABASE_NAME),
client_cert_file=quote_plus(get_absolute_path(GCSQL_POSTGRES_CLIENT_CERT_FILE)),
client_key_file=quote_plus(get_absolute_path(GCSQL_POSTGRES_CLIENT_KEY_FILE)),
server_ca_file=quote_plus(get_absolute_path(GCSQL_POSTGRES_SERVER_CA_FILE))
)
# The connections below are created using one of the standard approaches - via environment
# variables named AIRFLOW_CONN_* . The connections can also be created in the database
# of AIRFLOW (using command line or UI).
# Postgres: connect via proxy over TCP
os.environ['AIRFLOW_CONN_PROXY_POSTGRES_TCP'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=postgres&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=True&" \
"sql_proxy_use_tcp=True".format(**postgres_kwargs)
# Postgres: connect via proxy over UNIX socket (specific proxy version)
os.environ['AIRFLOW_CONN_PROXY_POSTGRES_SOCKET'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=postgres&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=True&" \
"sql_proxy_version=v1.13&" \
"sql_proxy_use_tcp=False".format(**postgres_kwargs)
# Postgres: connect directly via TCP (non-SSL)
os.environ['AIRFLOW_CONN_PUBLIC_POSTGRES_TCP'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=postgres&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=False&" \
"use_ssl=False".format(**postgres_kwargs)
# Postgres: connect directly via TCP (SSL)
os.environ['AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=postgres&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=False&" \
"use_ssl=True&" \
"sslcert={client_cert_file}&" \
"sslkey={client_key_file}&" \
"sslrootcert={server_ca_file}"\
.format(**postgres_kwargs)
mysql_kwargs = dict(
user=quote_plus(GCSQL_MYSQL_USER),
password=quote_plus(GCSQL_MYSQL_PASSWORD),
public_port=GCSQL_MYSQL_PUBLIC_PORT,
public_ip=quote_plus(GCSQL_MYSQL_PUBLIC_IP),
project_id=quote_plus(GCP_PROJECT_ID),
location=quote_plus(GCP_REGION),
instance=quote_plus(GCSQL_MYSQL_INSTANCE_NAME_QUERY),
database=quote_plus(GCSQL_MYSQL_DATABASE_NAME),
client_cert_file=quote_plus(get_absolute_path(GCSQL_MYSQL_CLIENT_CERT_FILE)),
client_key_file=quote_plus(get_absolute_path(GCSQL_MYSQL_CLIENT_KEY_FILE)),
server_ca_file=quote_plus(get_absolute_path(GCSQL_MYSQL_SERVER_CA_FILE))
)
# MySQL: connect via proxy over TCP (specific proxy version)
os.environ['AIRFLOW_CONN_PROXY_MYSQL_TCP'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=mysql&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=True&" \
"sql_proxy_version=v1.13&" \
"sql_proxy_use_tcp=True".format(**mysql_kwargs)
# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
try:
sql_proxy_binary_path = subprocess.check_output(
['which', 'cloud_sql_proxy']).decode('utf-8').rstrip()
except subprocess.CalledProcessError:
sql_proxy_binary_path = "/tmp/anyhow_download_cloud_sql_proxy"
os.environ['AIRFLOW_CONN_PROXY_MYSQL_SOCKET'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=mysql&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=True&" \
"sql_proxy_binary_path={sql_proxy_binary_path}&" \
"sql_proxy_use_tcp=False".format(
sql_proxy_binary_path=quote_plus(sql_proxy_binary_path), **mysql_kwargs)
# MySQL: connect directly via TCP (non-SSL)
os.environ['AIRFLOW_CONN_PUBLIC_MYSQL_TCP'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=mysql&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=False&" \
"use_ssl=False".format(**mysql_kwargs)
# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
os.environ['AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=mysql&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=False&" \
"use_ssl=True&" \
"sslcert={client_cert_file}&" \
"sslkey={client_key_file}&" \
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
# Special case: MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql
# Proxy binary path AND with missing project_id
os.environ['AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL_NO_PROJECT_ID'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=mysql&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=False&" \
"use_ssl=True&" \
"sslcert={client_cert_file}&" \
"sslkey={client_key_file}&" \
"sslrootcert={server_ca_file}".format(**mysql_kwargs)
Using the operator¶
Example operators below are using all connectivity options. Note connection id from the operator matches the AIRFLOW_CONN_* postfix uppercase. This is standard AIRFLOW notation for defining connection via environment variables):
connection_names = [
"proxy_postgres_tcp",
"proxy_postgres_socket",
"public_postgres_tcp",
"public_postgres_tcp_ssl",
"proxy_mysql_tcp",
"proxy_mysql_socket",
"public_mysql_tcp",
"public_mysql_tcp_ssl",
"public_mysql_tcp_ssl_no_project_id"
]
tasks = []
with models.DAG(
dag_id='example_gcp_sql_query',
default_args=default_args,
schedule_interval=None
) as dag:
prev_task = None
for connection_name in connection_names:
task = CloudSqlQueryOperator(
gcp_cloudsql_conn_id=connection_name,
task_id="example_gcp_sql_task_" + connection_name,
sql=SQL
)
tasks.append(task)
if prev_task:
prev_task >> task
prev_task = task
Templating¶
template_fields = ('sql', 'gcp_cloudsql_conn_id', 'gcp_conn_id')
template_ext = ('.sql',)
Google Cloud Storage Operators¶
GoogleCloudStorageBucketCreateAclEntryOperator¶
Creates a new ACL entry on the specified bucket.
For parameter definition, take a look at
GoogleCloudStorageBucketCreateAclEntryOperator
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCS_ACL_BUCKET = os.environ.get('GCS_ACL_BUCKET', 'example-bucket')
GCS_ACL_OBJECT = os.environ.get('GCS_ACL_OBJECT', 'example-object')
GCS_ACL_ENTITY = os.environ.get('GCS_ACL_ENTITY', 'example-entity')
GCS_ACL_BUCKET_ROLE = os.environ.get('GCS_ACL_BUCKET_ROLE', 'example-bucket-role')
GCS_ACL_OBJECT_ROLE = os.environ.get('GCS_ACL_OBJECT_ROLE', 'example-object-role')
Using the operator¶
gcs_bucket_create_acl_entry_task = GoogleCloudStorageBucketCreateAclEntryOperator(
bucket=GCS_ACL_BUCKET,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_BUCKET_ROLE,
task_id="gcs_bucket_create_acl_entry_task"
)
Templating¶
template_fields = ('bucket', 'entity', 'role', 'user_project')
GoogleCloudStorageObjectCreateAclEntryOperator¶
Creates a new ACL entry on the specified object.
For parameter definition, take a look at
GoogleCloudStorageObjectCreateAclEntryOperator
Arguments¶
Some arguments in the example DAG are taken from the OS environment variables:
GCS_ACL_BUCKET = os.environ.get('GCS_ACL_BUCKET', 'example-bucket')
GCS_ACL_OBJECT = os.environ.get('GCS_ACL_OBJECT', 'example-object')
GCS_ACL_ENTITY = os.environ.get('GCS_ACL_ENTITY', 'example-entity')
GCS_ACL_BUCKET_ROLE = os.environ.get('GCS_ACL_BUCKET_ROLE', 'example-bucket-role')
GCS_ACL_OBJECT_ROLE = os.environ.get('GCS_ACL_OBJECT_ROLE', 'example-object-role')
Using the operator¶
gcs_object_create_acl_entry_task = GoogleCloudStorageObjectCreateAclEntryOperator(
bucket=GCS_ACL_BUCKET,
object_name=GCS_ACL_OBJECT,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_OBJECT_ROLE,
task_id="gcs_object_create_acl_entry_task"
)
Templating¶
template_fields = ('bucket', 'object_name', 'entity', 'role', 'generation',
'user_project')