Google Cloud Datastore Operators

Firestore in Datastore mode is a NoSQL document database built for automatic scaling, high performance, and ease of application development.

For more information about the service visit Datastore product documentation

Export Entities

To export entities from Google Cloud Datastore to Cloud Storage use CloudDatastoreExportEntitiesOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

export_task = CloudDatastoreExportEntitiesOperator(
    task_id="export_task",
    bucket=BUCKET,
    project_id=GCP_PROJECT_ID,
    overwrite_existing=True,
)
Copy to clipboard

Import Entities

To import entities from Cloud Storage to Google Cloud Datastore use CloudDatastoreImportEntitiesOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

import_task = CloudDatastoreImportEntitiesOperator(
    task_id="import_task",
    bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
    file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
    project_id=GCP_PROJECT_ID,
)
Copy to clipboard

Allocate Ids

To allocate IDs for incomplete keys use CloudDatastoreAllocateIdsOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

allocate_ids = CloudDatastoreAllocateIdsOperator(
    task_id="allocate_ids", partial_keys=KEYS, project_id=GCP_PROJECT_ID
)
Copy to clipboard

An example of a partial keys required by the operator:

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

KEYS = [
    {
        "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
        "path": {"kind": "airflow"},
    }
]
Copy to clipboard

Begin transaction

To begin a new transaction use CloudDatastoreBeginTransactionOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
    task_id="begin_transaction_commit",
    transaction_options=TRANSACTION_OPTIONS,
    project_id=GCP_PROJECT_ID,
)
Copy to clipboard

An example of a transaction options required by the operator:

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
Copy to clipboard

Commit transaction

To commit a transaction, optionally creating, deleting or modifying some entities use CloudDatastoreCommitOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

commit_task = CloudDatastoreCommitOperator(
    task_id="commit_task", body=COMMIT_BODY, project_id=GCP_PROJECT_ID
)
Copy to clipboard

An example of a commit information required by the operator:

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

COMMIT_BODY = {
    "mode": "TRANSACTIONAL",
    "mutations": [
        {
            "insert": {
                "key": KEYS[0],
                "properties": {"string": {"stringValue": "airflow is awesome!"}},
            }
        }
    ],
    "transaction": "{{ task_instance.xcom_pull('begin_transaction_commit') }}",
}
Copy to clipboard

Run query

To run a query for entities use CloudDatastoreRunQueryOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=GCP_PROJECT_ID)
Copy to clipboard

An example of a query required by the operator:

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

QUERY = {
    "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
    "readOptions": {"transaction": "{{ task_instance.xcom_pull('begin_transaction_query') }}"},
    "query": {},
}
Copy to clipboard

Roll back transaction

To roll back a transaction use CloudDatastoreRollbackOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

rollback_transaction = CloudDatastoreRollbackOperator(
    task_id="rollback_transaction",
    transaction="{{ task_instance.xcom_pull('begin_transaction_to_rollback') }}",
)
begin_transaction_to_rollback >> rollback_transaction
Copy to clipboard

References

For further information, take a look at:

Was this entry helpful?