Google Dataform Operators

Dataform is a service for data analysts to develop, test, version control, and schedule complex SQL workflows for data transformation in BigQuery.

Dataform lets you manage data transformation in the Extraction, Loading, and Transformation (ELT) process for data integration. After raw data is extracted from source systems and loaded into BigQuery, Dataform helps you to transform it into a well-defined, tested, and documented suite of data tables.

For more information about the task visit Dataform production documentation <Product documentation

Configuration

Before you can use the Dataform operators you need to initialize repository and workspace, for more information about this visit Dataform Production documentation <Product documentation

Create Compilation Result

A simple configuration to create Compilation Result can look as followed:

DataformCreateCompilationResultOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[source]

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": "main",
            "workspace": (
                f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
                f"workspaces/{WORKSPACE_ID}"
            ),
        },
    )

Get Compilation Result

To get a Compilation Result you can use:

DataformGetCompilationResultOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[source]

get_compilation_result = DataformGetCompilationResultOperator(
    task_id="get_compilation_result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result_id=(
        "{{ task_instance.xcom_pull('create_compilation_result')['name'].split('/')[-1] }}"
    ),
)

Create Workflow Invocation

To create a Workflow Invocation you can use:

DataformCreateWorkflowInvocationOperator

We have possibility to run this operation in the sync mode and async, for async operation we also have a sensor: DataformWorkflowInvocationStateSensor

tests/system/providers/google/cloud/dataform/example_dataform.py[source]

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
    },
)

tests/system/providers/google/cloud/dataform/example_dataform.py[source]

create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation_async',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
    },
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"
    ),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)

Get Workflow Invocation

To get a Workflow Invocation you can use:

DataformGetWorkflowInvocationOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[source]

get_workflow_invocation = DataformGetWorkflowInvocationOperator(
    task_id='get_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"
    ),
)

Cancel Workflow Invocation

To cancel a Workflow Invocation you can use:

DataformCancelWorkflowInvocationOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[source]

cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
    task_id='cancel_workflow_incoation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create_second_workflow_invocation')['name'].split('/')[-1] }}"
    ),
)

Was this entry helpful?