Google Cloud Datastore Operators¶
Firestore in Datastore mode is a NoSQL document database built for automatic scaling, high performance, and ease of application development.
For more information about the service visit Datastore product documentation
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Export Entities¶
To export entities from Google Cloud Datastore to Cloud Storage use
CloudDatastoreExportEntitiesOperator
google/tests/system/google/cloud/datastore/example_datastore_commit.py
export_task = CloudDatastoreExportEntitiesOperator(
task_id="export_task",
bucket=BUCKET_NAME,
project_id=PROJECT_ID,
overwrite_existing=True,
)
Import Entities¶
To import entities from Cloud Storage to Google Cloud Datastore use
CloudDatastoreImportEntitiesOperator
google/tests/system/google/cloud/datastore/example_datastore_commit.py
import_task = CloudDatastoreImportEntitiesOperator(
task_id="import_task",
bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
project_id=PROJECT_ID,
)
Allocate Ids¶
To allocate IDs for incomplete keys use
CloudDatastoreAllocateIdsOperator
google/tests/system/google/cloud/datastore/example_datastore_commit.py
allocate_ids = CloudDatastoreAllocateIdsOperator(
task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)
An example of a partial keys required by the operator:
google/tests/system/google/cloud/datastore/example_datastore_commit.py
KEYS = [
{
"partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
"path": {"kind": "airflow"},
}
]
Begin transaction¶
To begin a new transaction use
CloudDatastoreBeginTransactionOperator
google/tests/system/google/cloud/datastore/example_datastore_commit.py
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
task_id="begin_transaction_commit",
transaction_options=TRANSACTION_OPTIONS,
project_id=PROJECT_ID,
)
An example of a transaction options required by the operator:
google/tests/system/google/cloud/datastore/example_datastore_commit.py
TRANSACTION_OPTIONS: dict[str, Any] = {"readWrite": {}}
Commit transaction¶
To commit a transaction, optionally creating, deleting or modifying some entities
use CloudDatastoreCommitOperator
google/tests/system/google/cloud/datastore/example_datastore_commit.py
commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)
An example of a commit information required by the operator:
google/tests/system/google/cloud/datastore/example_datastore_commit.py
COMMIT_BODY = {
"mode": "TRANSACTIONAL",
"mutations": [
{
"insert": {
"key": KEYS[0],
"properties": {"string": {"stringValue": "airflow is awesome!"}},
}
}
],
"singleUseTransaction": {"readWrite": {}},
}
Run query¶
To run a query for entities use
CloudDatastoreRunQueryOperator
google/tests/system/google/cloud/datastore/example_datastore_query.py
run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID)
An example of a query required by the operator:
google/tests/system/google/cloud/datastore/example_datastore_query.py
QUERY = {
"partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
"readOptions": {"transaction": begin_transaction_query.output},
"query": {},
}
Roll back transaction¶
To roll back a transaction
use CloudDatastoreRollbackOperator
google/tests/system/google/cloud/datastore/example_datastore_rollback.py
rollback_transaction = CloudDatastoreRollbackOperator(
task_id="rollback_transaction",
transaction=begin_transaction_to_rollback.output,
)
Get operation state¶
To get the current state of a long-running operation use
CloudDatastoreGetOperationOperator
google/tests/system/google/cloud/datastore/example_datastore_commit.py
get_operation = CloudDatastoreGetOperationOperator(
task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)
Delete operation¶
To delete an operation use
CloudDatastoreDeleteOperationOperator
google/tests/system/google/cloud/datastore/example_datastore_commit.py
delete_export_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_export_operation",
name="{{ task_instance.xcom_pull('export_task')['name'] }}",
trigger_rule=TriggerRule.ALL_DONE,
)
References¶
For further information, take a look at: