Google Cloud Workflows Operators¶
You can use Workflows to create serverless workflows that link series of serverless tasks together in an order you define. Combine the power of Google Cloud’s APIs, serverless products like Cloud Functions and Cloud Run, and calls to external APIs to create flexible serverless applications.
For more information about the service visit Workflows production documentation <Product documentation.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Create workflow¶
To create a workflow use
WorkflowsCreateWorkflowOperator
.
create_workflow = WorkflowsCreateWorkflowOperator(
task_id="create_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow=WORKFLOW,
workflow_id=WORKFLOW_ID,
)
The workflow should be define in similar why to this example:
WORKFLOW_CONTENT = """
- getCurrentTime:
call: http.get
args:
url: https://us-central1-workflowsample.cloudfunctions.net/datetime
result: currentTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${currentTime.body.dayOfTheWeek}
result: wikiResult
- returnResult:
return: ${wikiResult.body[1]}
"""
WORKFLOW = {
"description": "Test workflow",
"labels": {"airflow-version": "dev"},
"source_contents": WORKFLOW_CONTENT,
}
For more information about authoring workflows check official production documentation <Product documentation.
Update workflow¶
To update a workflow use
WorkflowsUpdateWorkflowOperator
.
update_workflow = WorkflowsUpdateWorkflowOperator(
task_id="update_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
update_mask=FieldMask(paths=["name", "description"]),
)
Get workflow¶
To get a workflow use
WorkflowsGetWorkflowOperator
.
get_workflow = WorkflowsGetWorkflowOperator(
task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
List workflows¶
To list workflows use
WorkflowsListWorkflowsOperator
.
list_workflows = WorkflowsListWorkflowsOperator(
task_id="list_workflows",
location=LOCATION,
project_id=PROJECT_ID,
)
Delete workflow¶
To delete a workflow use
WorkflowsDeleteWorkflowOperator
.
delete_workflow = WorkflowsDeleteWorkflowOperator(
task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
Create execution¶
To create an execution use
WorkflowsCreateExecutionOperator
.
This operator is not idempotent due to API limitation.
create_execution = WorkflowsCreateExecutionOperator(
task_id="create_execution",
location=LOCATION,
project_id=PROJECT_ID,
execution=EXECUTION,
workflow_id=WORKFLOW_ID,
)
The create operator does not wait for execution to complete. To wait for execution result use
WorkflowExecutionSensor
.
wait_for_execution = WorkflowExecutionSensor(
task_id="wait_for_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
execution_id=create_execution_id,
)
Get execution¶
To get an execution use
WorkflowsGetExecutionOperator
.
get_execution = WorkflowsGetExecutionOperator(
task_id="get_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
execution_id=create_execution_id,
)
List executions¶
To list executions use
WorkflowsListExecutionsOperator
.
By default this operator will return only executions for last 60 minutes.
list_executions = WorkflowsListExecutionsOperator(
task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
Cancel execution¶
To cancel an execution use
WorkflowsCancelExecutionOperator
.
cancel_execution = WorkflowsCancelExecutionOperator(
task_id="cancel_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=SLEEP_WORKFLOW_ID,
execution_id=cancel_execution_id,
)