Google Cloud Run Operators

Cloud Run is used to build and deploy scalable containerized apps written in any language (including Go, Python, Java, Node.js, .NET, and Ruby) on a fully managed platform.

For more information about the service visit Google Cloud Run documentation.

Prerequisite Tasks

To use these operators, you must do a few things:

Create a job

Before you create a job in Cloud Run, you need to define it. For more information about the Job object fields, visit Google Cloud Run Job description

A simple job configuration can look as follows:

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

def _create_job():
    job = Job()
    container = k8s_min.Container()
    container.image = "us-docker.pkg.dev/cloudrun/container/job:latest"
    job.template.template.containers.append(container)
    return job


With this configuration we can create the job: CloudRunCreateJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

create1 = CloudRunCreateJobOperator(
    task_id=create1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job(),
    dag=dag,
)

Note that this operator only creates the job without executing it. The Job’s dictionary representation is pushed to XCom.

Execute a job

To execute a job, you can use:

CloudRunExecuteJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

execute1 = CloudRunExecuteJobOperator(
    task_id=execute1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    deferrable=False,
)

or you can define the same operator in the deferrable mode:

CloudRunExecuteJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

execute2 = CloudRunExecuteJobOperator(
    task_id=execute2_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job2_name,
    dag=dag,
    deferrable=True,
)

You can also specify overrides that allow you to give a new entrypoint command to the job and more:

CloudRunExecuteJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

overrides = {
    "container_overrides": [
        {
            "name": "job",
            "args": ["python", "main.py"],
            "env": [{"name": "ENV_VAR", "value": "value"}],
            "clearArgs": False,
        }
    ],
    "task_count": 1,
    "timeout": "60s",
}

execute3 = CloudRunExecuteJobOperator(
    task_id=execute3_task_name,
    project_id=PROJECT_ID,
    region=region,
    overrides=overrides,
    job_name=job3_name,
    dag=dag,
    deferrable=False,
)

Update a job

To update a job, you can use:

CloudRunUpdateJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

update_job1 = CloudRunUpdateJobOperator(
    task_id=update_job1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job_with_label(),
    dag=dag,
)

The Job’s dictionary representation is pushed to XCom.

List jobs

To list the jobs, you can use:

CloudRunListJobsOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

list_jobs = CloudRunListJobsOperator(
    task_id=list_jobs_task_name, project_id=PROJECT_ID, region=region, dag=dag
)

The operator takes two optional parameters: “limit” to limit the number of tasks returned, and “show_deleted” to include deleted jobs in the result.

Delete a job

To delete a job you can use:

CloudRunDeleteJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

delete_job1 = CloudRunDeleteJobOperator(
    task_id="delete-job1",
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    trigger_rule=TriggerRule.ALL_DONE,
)

Note this operator waits for the job to be deleted, and the deleted Job’s dictionary representation is pushed to XCom.

Was this entry helpful?