Google Cloud Workflows Operators

You can use Workflows to create serverless workflows that link series of serverless tasks together in an order you define. Combine the power of Google Cloud's APIs, serverless products like Cloud Functions and Cloud Run, and calls to external APIs to create flexible serverless applications.

For more information about the service visit Workflows production documentation <Product documentation.

Create workflow

To create a workflow use WorkflowsCreateWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

create_workflow = WorkflowsCreateWorkflowOperator(
    task_id="create_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow=WORKFLOW,
    workflow_id=WORKFLOW_ID,
)

The workflow should be define in similar why to this example:

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

WORKFLOW_CONTENT = """
- getCurrentTime:
    call: http.get
    args:
        url: https://us-central1-workflowsample.cloudfunctions.net/datetime
    result: currentTime
- readWikipedia:
    call: http.get
    args:
        url: https://en.wikipedia.org/w/api.php
        query:
            action: opensearch
            search: ${currentTime.body.dayOfTheWeek}
    result: wikiResult
- returnResult:
    return: ${wikiResult.body[1]}
"""

WORKFLOW = {
    "description": "Test workflow",
    "labels": {"airflow-version": "dev"},
    "source_contents": WORKFLOW_CONTENT,
}

For more information about authoring workflows check official production documentation <Product documentation.

Update workflow

To update a workflow use WorkflowsUpdateWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

update_workflows = WorkflowsUpdateWorkflowOperator(
    task_id="update_workflows",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    update_mask={"paths": ["name", "description"]},
)

Get workflow

To get a workflow use WorkflowsGetWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

get_workflow = WorkflowsGetWorkflowOperator(
    task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

List workflows

To list workflows use WorkflowsListWorkflowsOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

list_workflows = WorkflowsListWorkflowsOperator(
    task_id="list_workflows",
    location=LOCATION,
    project_id=PROJECT_ID,
)

Delete workflow

To delete a workflow use WorkflowsDeleteWorkflowOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

delete_workflow = WorkflowsDeleteWorkflowOperator(
    task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

Create execution

To create an execution use WorkflowsCreateExecutionOperator. This operator is not idempotent due to API limitation.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

create_execution = WorkflowsCreateExecutionOperator(
    task_id="create_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    execution=EXECUTION,
    workflow_id=WORKFLOW_ID,
)

The create operator does not wait for execution to complete. To wait for execution result use WorkflowExecutionSensor.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

wait_for_execution = WorkflowExecutionSensor(
    task_id="wait_for_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}',
)

Get execution

To get an execution use WorkflowsGetExecutionOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

get_execution = WorkflowsGetExecutionOperator(
    task_id="get_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}',
)

List executions

To list executions use WorkflowsListExecutionsOperator. By default this operator will return only executions for last 60 minutes.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

list_executions = WorkflowsListExecutionsOperator(
    task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

Cancel execution

To cancel an execution use WorkflowsCancelExecutionOperator.

airflow/providers/google/cloud/example_dags/example_workflows.pyView Source

cancel_execution = WorkflowsCancelExecutionOperator(
    task_id="cancel_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=SLEEP_WORKFLOW_ID,
    execution_id='{{ task_instance.xcom_pull("create_execution_for_cancel", key="execution_id") }}',
)

Was this entry helpful?