Google Cloud Run Operators

Cloud Run is used to build and deploy scalable containerized apps written in any language (including Go, Python, Java, Node.js, .NET, and Ruby) on a fully managed platform.

For more information about the service visit Google Cloud Run documentation.

Prerequisite Tasks

To use these operators, you must do a few things:

Create a job

Before you create a job in Cloud Run, you need to define it. For more information about the Job object fields, visit Google Cloud Run Job description

A simple job configuration can be created with a Job object:

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

def _create_job_instance() -> Job:
    """
    Create a Cloud Run job configuration with google.cloud.run_v2.Job object.

    As a minimum the configuration must contain a container image name in its template.
    The rest of the configuration parameters are optional and will be populated with default values if not set.
    """
    job = Job()
    container = k8s_min.Container()
    container.image = "us-docker.pkg.dev/cloudrun/container/job:latest"
    container.resources.limits = {"cpu": "2", "memory": "1Gi"}
    job.template.template.containers.append(container)
    return job


or with a Python dictionary:

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

def _create_job_dict() -> dict:
    """
    Create a Cloud Run job configuration with a Python dict.

    As a minimum the configuration must contain a container image name in its template.
    """
    return {
        "template": {
            "template": {
                "containers": [
                    {
                        "image": "us-docker.pkg.dev/cloudrun/container/job:latest",
                        "resources": {
                            "limits": {"cpu": "1", "memory": "512Mi"},
                            "cpu_idle": False,
                            "startup_cpu_boost": False,
                        },
                        "name": "",
                        "command": [],
                        "args": [],
                        "env": [],
                        "ports": [],
                        "volume_mounts": [],
                        "working_dir": "",
                        "depends_on": [],
                    }
                ],
                "volumes": [],
                "execution_environment": 0,
                "encryption_key": "",
            },
            "labels": {},
            "annotations": {},
            "parallelism": 0,
            "task_count": 0,
        },
        "name": "",
        "uid": "",
        "generation": "0",
        "labels": {},
        "annotations": {},
        "creator": "",
        "last_modifier": "",
        "client": "",
        "client_version": "",
        "launch_stage": 0,
        "observed_generation": "0",
        "conditions": [],
        "execution_count": 0,
        "reconciling": False,
        "satisfies_pzs": False,
        "etag": "",
    }


You can create a Cloud Run Job with any of these configurations : CloudRunCreateJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

create1 = CloudRunCreateJobOperator(
    task_id=create1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job_instance(),
    dag=dag,
)

Note that this operator only creates the job without executing it. The Job’s dictionary representation is pushed to XCom.

Execute a job

To execute a job, you can use:

CloudRunExecuteJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

execute1 = CloudRunExecuteJobOperator(
    task_id=execute1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    deferrable=False,
)

or you can define the same operator in the deferrable mode:

CloudRunExecuteJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

execute2 = CloudRunExecuteJobOperator(
    task_id=execute2_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job2_name,
    dag=dag,
    deferrable=True,
)

You can also specify overrides that allow you to give a new entrypoint command to the job and more:

CloudRunExecuteJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

overrides = {
    "container_overrides": [
        {
            "name": "job",
            "args": ["python", "main.py"],
            "env": [{"name": "ENV_VAR", "value": "value"}],
            "clear_args": False,
        }
    ],
    "task_count": 1,
    "timeout": "60s",
}

execute3 = CloudRunExecuteJobOperator(
    task_id=execute3_task_name,
    project_id=PROJECT_ID,
    region=region,
    overrides=overrides,
    job_name=job3_name,
    dag=dag,
    deferrable=False,
)

Update a job

To update a job, you can use:

CloudRunUpdateJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

update_job1 = CloudRunUpdateJobOperator(
    task_id=update_job1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job_instance_with_label(),
    dag=dag,
)

The Job’s dictionary representation is pushed to XCom.

List jobs

To list the jobs, you can use:

CloudRunListJobsOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

list_jobs = CloudRunListJobsOperator(
    task_id=list_jobs_task_name, project_id=PROJECT_ID, region=region, dag=dag
)

The operator takes two optional parameters: “limit” to limit the number of tasks returned, and “show_deleted” to include deleted jobs in the result.

Delete a job

To delete a job you can use:

CloudRunDeleteJobOperator

tests/system/providers/google/cloud/cloud_run/example_cloud_run.py[source]

delete_job1 = CloudRunDeleteJobOperator(
    task_id="delete-job1",
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    trigger_rule=TriggerRule.ALL_DONE,
)

Note this operator waits for the job to be deleted, and the deleted Job’s dictionary representation is pushed to XCom.

Was this entry helpful?