Google Cloud Workflows Operators

You can use Workflows to create serverless workflows that link series of serverless tasks together in an order you define. Combine the power of Google Cloud’s APIs, serverless products like Cloud Functions and Cloud Run, and calls to external APIs to create flexible serverless applications.

For more information about the service visit Workflows production documentation <Product documentation.

Prerequisite Tasks

Create workflow

To create a workflow use WorkflowsCreateWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

create_workflow = WorkflowsCreateWorkflowOperator(
    task_id="create_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow=WORKFLOW,
    workflow_id=WORKFLOW_ID,
)

The workflow should be define in similar why to this example:

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

WORKFLOW_CONTENT = """
- getCurrentTime:
    call: http.get
    args:
        url: https://us-central1-workflowsample.cloudfunctions.net/datetime
    result: currentTime
- readWikipedia:
    call: http.get
    args:
        url: https://en.wikipedia.org/w/api.php
        query:
            action: opensearch
            search: ${currentTime.body.dayOfTheWeek}
    result: wikiResult
- returnResult:
    return: ${wikiResult.body[1]}
"""

WORKFLOW = {
    "description": "Test workflow",
    "labels": {"airflow-version": "dev"},
    "source_contents": WORKFLOW_CONTENT,
}

For more information about authoring workflows check official production documentation <Product documentation.

Update workflow

To update a workflow use WorkflowsUpdateWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

update_workflows = WorkflowsUpdateWorkflowOperator(
    task_id="update_workflows",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    update_mask=FieldMask(paths=["name", "description"]),
)

Get workflow

To get a workflow use WorkflowsGetWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

get_workflow = WorkflowsGetWorkflowOperator(
    task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

List workflows

To list workflows use WorkflowsListWorkflowsOperator.

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

list_workflows = WorkflowsListWorkflowsOperator(
    task_id="list_workflows",
    location=LOCATION,
    project_id=PROJECT_ID,
)

Delete workflow

To delete a workflow use WorkflowsDeleteWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

delete_workflow = WorkflowsDeleteWorkflowOperator(
    task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

Create execution

To create an execution use WorkflowsCreateExecutionOperator. This operator is not idempotent due to API limitation.

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

create_execution = WorkflowsCreateExecutionOperator(
    task_id="create_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    execution=EXECUTION,
    workflow_id=WORKFLOW_ID,
)

The create operator does not wait for execution to complete. To wait for execution result use WorkflowExecutionSensor.

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

wait_for_execution = WorkflowExecutionSensor(
    task_id="wait_for_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id=create_execution_id,
)

Get execution

To get an execution use WorkflowsGetExecutionOperator.

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

get_execution = WorkflowsGetExecutionOperator(
    task_id="get_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id=create_execution_id,
)

List executions

To list executions use WorkflowsListExecutionsOperator. By default this operator will return only executions for last 60 minutes.

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

list_executions = WorkflowsListExecutionsOperator(
    task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

Cancel execution

To cancel an execution use WorkflowsCancelExecutionOperator.

airflow/providers/google/cloud/example_dags/example_workflows.py[source]

cancel_execution = WorkflowsCancelExecutionOperator(
    task_id="cancel_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=SLEEP_WORKFLOW_ID,
    execution_id=create_execution_id,
)

Was this entry helpful?