Google Dataform Operators

Dataform is a service for data analysts to develop, test, version control, and schedule complex SQL workflows for data transformation in BigQuery.

Dataform lets you manage data transformation in the Extraction, Loading, and Transformation (ELT) process for data integration. After raw data is extracted from source systems and loaded into BigQuery, Dataform helps you to transform it into a well-defined, tested, and documented suite of data tables.

For more information about the task visit Dataform production documentation <Product documentation

Configuration

Before you can use the Dataform operators you need to initialize repository and workspace, for more information about this visit Dataform Production documentation <Product documentation

Create Repository

Create repository for tracking your code in Dataform service. Example of usage can be seen below:

DataformCreateRepositoryOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

    make_repository = DataformCreateRepositoryOperator(
        task_id="make-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
    )

Create Workspace

Create workspace for storing your code in Dataform service. Example of usage can be seen below:

DataformCreateWorkspaceOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

    make_workspace = DataformCreateWorkspaceOperator(
        task_id="make-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
    )

Create Compilation Result

A simple configuration to create Compilation Result can look as followed:

DataformCreateCompilationResultOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create-compilation-result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": "main",
            "workspace": (
                f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
                f"workspaces/{WORKSPACE_ID}"
            ),
        },
    )

Get Compilation Result

To get a Compilation Result you can use:

DataformGetCompilationResultOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

get_compilation_result = DataformGetCompilationResultOperator(
    task_id="get-compilation-result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result_id=(
        "{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
    ),
)

Create Workflow Invocation

To create a Workflow Invocation you can use:

DataformCreateWorkflowInvocationOperator

We have possibility to run this operation in the sync mode and async, for async operation we also have a sensor: DataformWorkflowInvocationStateSensor

tests/system/google/cloud/dataform/example_dataform.py[source]

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

tests/system/google/cloud/dataform/example_dataform.py[source]

create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation-async",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is-workflow-invocation-done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)

Get Workflow Invocation

To get a Workflow Invocation you can use:

DataformGetWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

get_workflow_invocation = DataformGetWorkflowInvocationOperator(
    task_id="get-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

Query Workflow Invocation Action

To query Workflow Invocation Actions you can use:

DataformQueryWorkflowInvocationActionsOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
    task_id="query-workflow-invocation-actions",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

Cancel Workflow Invocation

To cancel a Workflow Invocation you can use:

DataformCancelWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
    task_id="cancel-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
    ),
)

Delete Repository

Deletes repository. Example of usage can be seen below:

DataformDeleteRepositoryOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

    delete_workspace = DataformDeleteWorkspaceOperator(
        task_id="delete-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

Delete Workspace

Deletes workspace. Example of usage can be seen below:

DataformDeleteRepositoryOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

    delete_repository = DataformDeleteRepositoryOperator(
        task_id="delete-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

Remove file

Removes file. Example of usage can be seen below:

DataformRemoveFileOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

    remove_test_file = DataformRemoveFileOperator(
        task_id="remove-test-file",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        filepath="test/test.txt",
    )

Remove directory

Removes directory. Example of usage can be seen below:

DataformRemoveDirectoryOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

    remove_test_directory = DataformRemoveDirectoryOperator(
        task_id="remove-test-directory",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        directory_path="test",
    )

Initialize workspace

Creates default projects structure for provided workspace. Before it can be done workspace and repository should be created. Example of usage can be seen below:

make_initialization_workspace_flow

tests/system/google/cloud/dataform/example_dataform.py[source]

    first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        package_name=f"dataform_package_{ENV_ID}",
        without_installation=True,
        dataform_schema_name=DATAFORM_SCHEMA_NAME,
    )

Write file to workspace

Writes file with given content to specified workspace.

DataformWriteFileOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
    task_id="make-test-file",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    filepath="test/test.txt",
    contents=test_file_content,
)

Make directory in workspace

Make directory with given path in specified workspace.

DataformMakeDirectoryOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

make_test_directory = DataformMakeDirectoryOperator(
    task_id="make-test-directory",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    directory_path="test",
)

Install NPM packages

Installs npm packages for specified workspace

DataformInstallNpmPackagesOperator

tests/system/google/cloud/dataform/example_dataform.py[source]

install_npm_packages = DataformInstallNpmPackagesOperator(
    task_id="install-npm-packages",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
)

Was this entry helpful?