Google Cloud Workflows Operators

You can use Workflows to create serverless workflows that link series of serverless tasks together in an order you define. Combine the power of Google Cloud’s APIs, serverless products like Cloud Functions and Cloud Run, and calls to external APIs to create flexible serverless applications.

For more information about the service visit Workflows production documentation <Product documentation.

Create workflow

To create a workflow use WorkflowsCreateWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

create_workflow = WorkflowsCreateWorkflowOperator(
    task_id="create_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow=WORKFLOW,
    workflow_id=WORKFLOW_ID,
)
Copy to clipboard

The workflow should be define in similar why to this example:

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

WORKFLOW_CONTENT = """
- getCurrentTime:
    call: http.get
    args:
        url: https://us-central1-workflowsample.cloudfunctions.net/datetime
    result: currentTime
- readWikipedia:
    call: http.get
    args:
        url: https://en.wikipedia.org/w/api.php
        query:
            action: opensearch
            search: ${currentTime.body.dayOfTheWeek}
    result: wikiResult
- returnResult:
    return: ${wikiResult.body[1]}
"""

WORKFLOW = {
    "description": "Test workflow",
    "labels": {"airflow-version": "dev"},
    "source_contents": WORKFLOW_CONTENT,
}
Copy to clipboard

For more information about authoring workflows check official production documentation <Product documentation.

Update workflow

To update a workflow use WorkflowsUpdateWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

update_workflows = WorkflowsUpdateWorkflowOperator(
    task_id="update_workflows",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    update_mask={"paths": ["name", "description"]},
)
Copy to clipboard

Get workflow

To get a workflow use WorkflowsGetWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

get_workflow = WorkflowsGetWorkflowOperator(
    task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
Copy to clipboard

List workflows

To list workflows use WorkflowsListWorkflowsOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

list_workflows = WorkflowsListWorkflowsOperator(
    task_id="list_workflows",
    location=LOCATION,
    project_id=PROJECT_ID,
)
Copy to clipboard

Delete workflow

To delete a workflow use WorkflowsDeleteWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

delete_workflow = WorkflowsDeleteWorkflowOperator(
    task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
Copy to clipboard

Create execution

To create an execution use WorkflowsCreateExecutionOperator. This operator is not idempotent due to API limitation.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

create_execution = WorkflowsCreateExecutionOperator(
    task_id="create_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    execution=EXECUTION,
    workflow_id=WORKFLOW_ID,
)
Copy to clipboard

The create operator does not wait for execution to complete. To wait for execution result use WorkflowExecutionSensor.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

wait_for_execution = WorkflowExecutionSensor(
    task_id="wait_for_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}',
)
Copy to clipboard

Get execution

To get an execution use WorkflowsGetExecutionOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

get_execution = WorkflowsGetExecutionOperator(
    task_id="get_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}',
)
Copy to clipboard

List executions

To list executions use WorkflowsListExecutionsOperator. By default this operator will return only executions for last 60 minutes.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

list_executions = WorkflowsListExecutionsOperator(
    task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
Copy to clipboard

Cancel execution

To cancel an execution use WorkflowsCancelExecutionOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

cancel_execution = WorkflowsCancelExecutionOperator(
    task_id="cancel_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=SLEEP_WORKFLOW_ID,
    execution_id='{{ task_instance.xcom_pull("create_execution_for_cancel", key="execution_id") }}',
)
Copy to clipboard

Was this entry helpful?