Google Cloud Run Operators¶
Cloud Run is used to build and deploy scalable containerized apps written in any language (including Go, Python, Java, Node.js, .NET, and Ruby) on a fully managed platform.
For more information about the service visit Google Cloud Run documentation.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Create a job¶
Before you create a job in Cloud Run, you need to define it. For more information about the Job object fields, visit Google Cloud Run Job description
A simple job configuration can look as follows:
def _create_job():
job = Job()
container = k8s_min.Container()
container.image = "us-docker.pkg.dev/cloudrun/container/job:latest"
job.template.template.containers.append(container)
return job
With this configuration we can create the job:
CloudRunCreateJobOperator
create1 = CloudRunCreateJobOperator(
task_id=create1_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
job=_create_job(),
dag=dag,
)
Note that this operator only creates the job without executing it. The Job’s dictionary representation is pushed to XCom.
Execute a job¶
To execute a job, you can use:
execute1 = CloudRunExecuteJobOperator(
task_id=execute1_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
dag=dag,
deferrable=False,
)
or you can define the same operator in the deferrable mode:
execute2 = CloudRunExecuteJobOperator(
task_id=execute2_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job2_name,
dag=dag,
deferrable=True,
)
You can also specify overrides that allow you to give a new entrypoint command to the job and more:
overrides = {
"container_overrides": [
{
"name": "job",
"args": ["python", "main.py"],
"env": [{"name": "ENV_VAR", "value": "value"}],
"clearArgs": False,
}
],
"task_count": 1,
"timeout": "60s",
}
execute3 = CloudRunExecuteJobOperator(
task_id=execute3_task_name,
project_id=PROJECT_ID,
region=region,
overrides=overrides,
job_name=job3_name,
dag=dag,
deferrable=False,
)
Update a job¶
To update a job, you can use:
update_job1 = CloudRunUpdateJobOperator(
task_id=update_job1_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
job=_create_job_with_label(),
dag=dag,
)
The Job’s dictionary representation is pushed to XCom.
List jobs¶
To list the jobs, you can use:
list_jobs = CloudRunListJobsOperator(
task_id=list_jobs_task_name, project_id=PROJECT_ID, region=region, dag=dag
)
The operator takes two optional parameters: “limit” to limit the number of tasks returned, and “show_deleted” to include deleted jobs in the result.
Delete a job¶
To delete a job you can use:
delete_job1 = CloudRunDeleteJobOperator(
task_id="delete-job1",
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
dag=dag,
trigger_rule=TriggerRule.ALL_DONE,
)
Note this operator waits for the job to be deleted, and the deleted Job’s dictionary representation is pushed to XCom.