Google Cloud Tasks¶
Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch, and delivery of a large number of distributed tasks. Using Cloud Tasks, you can perform work asynchronously outside of a user or service-to-service request.
For more information about the service visit Cloud Tasks product documentation
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Queue operations¶
Create queue¶
To create new Queue use
CloudTasksQueueCreateOperator
create_queue = CloudTasksQueueCreateOperator(
location=LOCATION,
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_queue",
)
Delete queue¶
To delete Queue use
CloudTasksQueueDeleteOperator
delete_queue = CloudTasksQueueDeleteOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="delete_queue",
)
Resume queue¶
To resume Queue use
CloudTasksQueueResumeOperator
resume_queue = CloudTasksQueueResumeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="resume_queue",
)
Pause queue¶
To pause Queue use
CloudTasksQueuePauseOperator
pause_queue = CloudTasksQueuePauseOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="pause_queue",
)
Purge queue¶
To purge Queue use
CloudTasksQueuePurgeOperator
purge_queue = CloudTasksQueuePurgeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="purge_queue",
)
Get queue¶
To get Queue use
CloudTasksQueueGetOperator
get_queue = CloudTasksQueueGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="get_queue",
)
get_queue_result = BashOperator(
task_id="get_queue_result",
bash_command=f"echo {get_queue.output}",
)
Update queue¶
To update Queue use
CloudTasksQueueUpdateOperator
update_queue = CloudTasksQueueUpdateOperator(
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
task_id="update_queue",
)
List queues¶
To list all Queues use
CloudTasksQueuesListOperator
list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")
Tasks operations¶
Create task¶
To create new Task in a particular queue use
CloudTasksTaskCreateOperator
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
Get task¶
To get the Tasks in a particular queue use
CloudTasksTaskGetOperator
tasks_get = CloudTasksTaskGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="tasks_get",
)
Run task¶
To run the Task in a particular queue use
CloudTasksTaskRunOperator
run_task = CloudTasksTaskRunOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
task_id="run_task",
)
List tasks¶
To list all Tasks in a particular queue use
CloudTasksTasksListOperator
list_tasks = CloudTasksTasksListOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="list_tasks",
)
Delete task¶
To delete the Task from particular queue use
CloudTasksTaskDeleteOperator
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
References¶
For further information, take a look at: