Complete the airflow survey & get a free airflow 3 certification!

Google Cloud Batch Operators

Cloud Batch is a fully managed batch service to schedule, queue, and execute batch jobs on Google’s infrastructure.

For more information about the service visit Google Cloud Batch documentation.

Prerequisite Tasks

To use these operators, you must do a few things:

Submit a job

Before you submit a job in Cloud Batch, you need to define it. For more information about the Job object fields, visit Google Cloud Batch Job description.

A simple job configuration can look as follows:

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[source]

def _create_job():
    runnable = batch_v1.Runnable()
    runnable.container = batch_v1.Runnable.Container()
    runnable.container.image_uri = "gcr.io/google-containers/busybox"
    runnable.container.entrypoint = "/bin/sh"
    runnable.container.commands = [
        "-c",
        "echo Hello world! This is task ${BATCH_TASK_INDEX}.\
          This job has a total of ${BATCH_TASK_COUNT} tasks.",
    ]

    task = batch_v1.TaskSpec()
    task.runnables = [runnable]

    resources = batch_v1.ComputeResource()
    resources.cpu_milli = 2000
    resources.memory_mib = 16
    task.compute_resource = resources
    task.max_retry_count = 2

    group = batch_v1.TaskGroup()
    group.task_count = 2
    group.task_spec = task
    policy = batch_v1.AllocationPolicy.InstancePolicy()
    policy.machine_type = "e2-standard-4"
    instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
    instances.policy = policy
    allocation_policy = batch_v1.AllocationPolicy()
    allocation_policy.instances = [instances]

    job = batch_v1.Job()
    job.task_groups = [group]
    job.allocation_policy = allocation_policy
    job.labels = {"env": "testing", "type": "container"}

    job.logs_policy = batch_v1.LogsPolicy()
    job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

    return job


With this configuration we can submit the job: CloudBatchSubmitJobOperator

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[source]

submit1 = CloudBatchSubmitJobOperator(
    task_id="submit-job1",
    project_id=PROJECT_ID,
    region=REGION,
    job_name=job1_name,
    job=_create_job(),
    dag=dag,
    deferrable=False,
)

or you can define the same operator in the deferrable mode: CloudBatchSubmitJobOperator

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[source]

submit2 = CloudBatchSubmitJobOperator(
    task_id="submit-job2",
    project_id=PROJECT_ID,
    region=REGION,
    job_name=job2_name,
    job=batch_v1.Job.to_dict(_create_job()),
    dag=dag,
    deferrable=True,
)

Note that this operator waits for the job complete its execution, and the Job’s dictionary representation is pushed to XCom.

List a job’s tasks

To list the tasks of a certain job, you can use:

CloudBatchListTasksOperator

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[source]

list_tasks = CloudBatchListTasksOperator(
    task_id=list_tasks_task_name, project_id=PROJECT_ID, region=REGION, job_name=job1_name, dag=dag
)

The operator takes two optional parameters: “limit” to limit the number of tasks returned, and “filter” to only list the tasks matching the filter.

List jobs

To list the jobs, you can use:

CloudBatchListJobsOperator

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[source]

list_jobs = CloudBatchListJobsOperator(
    task_id=list_jobs_task_name,
    project_id=PROJECT_ID,
    region=REGION,
    limit=10,
    filter=f"name:projects/{PROJECT_ID}/locations/{REGION}/jobs/{job_name_prefix}*",
    dag=dag,
)

The operator takes two optional parameters: “limit” to limit the number of tasks returned, and “filter” to only list the tasks matching the filter.

Delete a job

To delete a job you can use:

CloudBatchDeleteJobOperator

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[source]

delete_job1 = CloudBatchDeleteJobOperator(
    task_id="delete-job1",
    project_id=PROJECT_ID,
    region=REGION,
    job_name=job1_name,
    dag=dag,
    trigger_rule=TriggerRule.ALL_DONE,
)

Note that this operator waits for the job to be deleted, and the deleted Job’s dictionary representation is pushed to XCom.

Was this entry helpful?