Google Cloud Datastore Operators

Firestore in Datastore mode is a NoSQL document database built for automatic scaling, high performance, and ease of application development.

For more information about the service visit Datastore product documentation

Export Entities

To export entities from Google Cloud Datastore to Cloud Storage use CloudDatastoreExportEntitiesOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

export_task = CloudDatastoreExportEntitiesOperator(
    task_id="export_task",
    bucket=BUCKET,
    project_id=GCP_PROJECT_ID,
    overwrite_existing=True,
)

Import Entities

To import entities from Cloud Storage to Google Cloud Datastore use CloudDatastoreImportEntitiesOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

import_task = CloudDatastoreImportEntitiesOperator(
    task_id="import_task",
    bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
    file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
    project_id=GCP_PROJECT_ID,
)

Allocate Ids

To allocate IDs for incomplete keys use CloudDatastoreAllocateIdsOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

allocate_ids = CloudDatastoreAllocateIdsOperator(
    task_id="allocate_ids", partial_keys=KEYS, project_id=GCP_PROJECT_ID
)

An example of a partial keys required by the operator:

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

KEYS = [
    {
        "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
        "path": {"kind": "airflow"},
    }
]

Begin transaction

To begin a new transaction use CloudDatastoreBeginTransactionOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
    task_id="begin_transaction_commit",
    transaction_options=TRANSACTION_OPTIONS,
    project_id=GCP_PROJECT_ID,
)

An example of a transaction options required by the operator:

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}

Commit transaction

To commit a transaction, optionally creating, deleting or modifying some entities use CloudDatastoreCommitOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

commit_task = CloudDatastoreCommitOperator(
    task_id="commit_task", body=COMMIT_BODY, project_id=GCP_PROJECT_ID
)

An example of a commit information required by the operator:

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

COMMIT_BODY = {
    "mode": "TRANSACTIONAL",
    "mutations": [
        {
            "insert": {
                "key": KEYS[0],
                "properties": {"string": {"stringValue": "airflow is awesome!"}},
            }
        }
    ],
    "transaction": "{{ task_instance.xcom_pull('begin_transaction_commit') }}",
}

Run query

To run a query for entities use CloudDatastoreRunQueryOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=GCP_PROJECT_ID)

An example of a query required by the operator:

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

QUERY = {
    "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
    "readOptions": {"transaction": "{{ task_instance.xcom_pull('begin_transaction_query') }}"},
    "query": {},
}

Roll back transaction

To roll back a transaction use CloudDatastoreRollbackOperator

airflow/providers/google/cloud/example_dags/example_datastore.pyView Source

rollback_transaction = CloudDatastoreRollbackOperator(
    task_id="rollback_transaction",
    transaction="{{ task_instance.xcom_pull('begin_transaction_to_rollback') }}",
)
begin_transaction_to_rollback >> rollback_transaction

References

For further information, take a look at:

Was this entry helpful?