Google Cloud Batch Operators¶
Cloud Batch is a fully managed batch service to schedule, queue, and execute batch jobs on Google’s infrastructure.
For more information about the service visit Google Cloud Batch documentation.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Submit a job¶
Before you submit a job in Cloud Batch, you need to define it. For more information about the Job object fields, visit Google Cloud Batch Job description.
A simple job configuration can look as follows:
def _create_job():
runnable = batch_v1.Runnable()
runnable.container = batch_v1.Runnable.Container()
runnable.container.image_uri = "gcr.io/google-containers/busybox"
runnable.container.entrypoint = "/bin/sh"
runnable.container.commands = [
"-c",
"echo Hello world! This is task ${BATCH_TASK_INDEX}.\
This job has a total of ${BATCH_TASK_COUNT} tasks.",
]
task = batch_v1.TaskSpec()
task.runnables = [runnable]
resources = batch_v1.ComputeResource()
resources.cpu_milli = 2000
resources.memory_mib = 16
task.compute_resource = resources
task.max_retry_count = 2
group = batch_v1.TaskGroup()
group.task_count = 2
group.task_spec = task
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "e2-standard-4"
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]
job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.labels = {"env": "testing", "type": "container"}
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
return job
With this configuration we can submit the job:
CloudBatchSubmitJobOperator
submit1 = CloudBatchSubmitJobOperator(
task_id=submit1_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
job=_create_job(),
dag=dag,
deferrable=False,
)
or you can define the same operator in the deferrable mode:
CloudBatchSubmitJobOperator
submit2 = CloudBatchSubmitJobOperator(
task_id=submit2_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job2_name,
job=batch_v1.Job.to_dict(_create_job()),
dag=dag,
deferrable=True,
)
Note that this operator waits for the job complete its execution, and the Job’s dictionary representation is pushed to XCom.
List a job’s tasks¶
To list the tasks of a certain job, you can use:
list_tasks = CloudBatchListTasksOperator(
task_id=list_tasks_task_name, project_id=PROJECT_ID, region=region, job_name=job1_name, dag=dag
)
The operator takes two optional parameters: “limit” to limit the number of tasks returned, and “filter” to only list the tasks matching the filter.
List jobs¶
To list the jobs, you can use:
list_jobs = CloudBatchListJobsOperator(
task_id=list_jobs_task_name,
project_id=PROJECT_ID,
region=region,
limit=2,
filter=f"name:projects/{PROJECT_ID}/locations/{region}/jobs/{job_name_prefix}*",
dag=dag,
)
The operator takes two optional parameters: “limit” to limit the number of tasks returned, and “filter” to only list the tasks matching the filter.
Delete a job¶
To delete a job you can use:
delete_job1 = CloudBatchDeleteJobOperator(
task_id="delete-job1",
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
dag=dag,
trigger_rule=TriggerRule.ALL_DONE,
)
Note that this operator waits for the job to be deleted, and the deleted Job’s dictionary representation is pushed to XCom.