Google Cloud Run Operators

Cloud Run is used to build and deploy scalable containerized apps written in any language (including Go, Python, Java, Node.js, .NET, and Ruby) on a fully managed platform.

For more information about the service visit Google Cloud Run documentation.

Prerequisite Tasks

To use these operators, you must do a few things:

Create a job

Before you create a job in Cloud Run, you need to define it. For more information about the Job object fields, visit Google Cloud Run Job description

If you want to specify the regional endpoint that will be used to create a Cloud Run client, you can set the flag use_regional_endpoint as True, and the API endpoint will be configured with the location you have specified in the operator.

A simple job configuration can be created with a Job object:

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

def _create_job_instance() -> Job:
    """
    Create a Cloud Run job configuration with google.cloud.run_v2.Job object.

    As a minimum the configuration must contain a container image name in its template.
    The rest of the configuration parameters are optional and will be populated with default values if not set.
    """
    job = Job()
    container = k8s_min.Container()
    container.image = "us-docker.pkg.dev/cloudrun/container/job:latest"
    container.resources.limits = {"cpu": "2", "memory": "1Gi"}
    job.template.template.containers.append(container)
    return job


or with a Python dictionary:

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

def _create_job_dict() -> dict:
    """
    Create a Cloud Run job configuration with a Python dict.

    As a minimum the configuration must contain a container image name in its template.
    """
    return {
        "template": {
            "template": {
                "containers": [
                    {
                        "image": "us-docker.pkg.dev/cloudrun/container/job:latest",
                        "resources": {
                            "limits": {"cpu": "1", "memory": "512Mi"},
                            "cpu_idle": False,
                            "startup_cpu_boost": False,
                        },
                        "name": "",
                        "command": [],
                        "args": [],
                        "env": [],
                        "ports": [],
                        "volume_mounts": [],
                        "working_dir": "",
                        "depends_on": [],
                    }
                ],
                "volumes": [],
                "execution_environment": 0,
                "encryption_key": "",
            },
            "labels": {},
            "annotations": {},
            "parallelism": 0,
            "task_count": 0,
        },
        "name": "",
        "uid": "",
        "generation": "0",
        "labels": {},
        "annotations": {},
        "creator": "",
        "last_modifier": "",
        "client": "",
        "client_version": "",
        "launch_stage": 0,
        "observed_generation": "0",
        "conditions": [],
        "execution_count": 0,
        "reconciling": False,
        "satisfies_pzs": False,
        "etag": "",
    }


You can create a Cloud Run Job with any of these configurations : CloudRunCreateJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

create1 = CloudRunCreateJobOperator(
    task_id=create1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job_instance(),
    use_regional_endpoint=False,
    dag=dag,
)

Note that this operator only creates the job without executing it. The Job’s dictionary representation is pushed to XCom.

Create a service

Before you create a service in Cloud Run, you need to define it. For more information about the Service object fields, visit Google Cloud Run Service description

If you want to specify the regional endpoint that will be used to create a Cloud Run client, you can set the flag use_regional_endpoint as True, and the API endpoint will be configured with the location you have specified in the operator.

A simple service configuration can look as follows:

tests/system/google/cloud/cloud_run/example_cloud_run_service.py[source]

def _create_service():
    service = Service()
    container = k8s_min.Container()
    container.image = "us-docker.pkg.dev/cloudrun/container/placeholder:latest"
    service.template.containers.append(container)
    return service


With this configuration we can create the service: CloudRunCreateServiceOperator

tests/system/google/cloud/cloud_run/example_cloud_run_service.py[source]

create_cloud_run_service = CloudRunCreateServiceOperator(
    task_id="create-cloud-run-service",
    project_id=PROJECT_ID,
    region=REGION,
    service=_create_service(),
    service_name=SERVICE_NAME,
    use_regional_endpoint=False,
)

Note that this operator only creates the service without executing it. The Service’s dictionary representation is pushed to XCom.

Delete a service

If you want to specify the regional endpoint that will be used to create a Cloud Run client, you can set the flag use_regional_endpoint as True, and the API endpoint will be configured with the location you have specified in the operator.

With this configuration we can delete the service: CloudRunDeleteServiceOperator

tests/system/google/cloud/cloud_run/example_cloud_run_service.py[source]

delete_cloud_run_service = CloudRunDeleteServiceOperator(
    task_id="delete-cloud-run-service",
    project_id=PROJECT_ID,
    region=REGION,
    service_name=SERVICE_NAME,
    use_regional_endpoint=False,
    dag=dag,
)

Note this operator waits for the service to be deleted, and the deleted Service’s dictionary representation is pushed to XCom.

Execute a job

If you want to specify the regional endpoint that will be used to create a Cloud Run client, you can set the flag use_regional_endpoint as True, and the API endpoint will be configured with the location you have specified in the operator.

To execute a job, you can use:

CloudRunExecuteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

execute1 = CloudRunExecuteJobOperator(
    task_id=execute1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    deferrable=False,
)

or you can define the same operator in the deferrable mode:

CloudRunExecuteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

execute2 = CloudRunExecuteJobOperator(
    task_id=execute2_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job2_name,
    dag=dag,
    deferrable=True,
)

Transport

The CloudRunExecuteJobOperator accepts an optional transport parameter to choose the underlying API transport.

  • transport="grpc" (default): use gRPC transport. If transport is not set, gRPC is used.

  • transport="rest": use REST/HTTP transport.

In deferrable mode, when using gRPC (explicitly or by default), the trigger uses an async gRPC client internally; for non-deferrable execution, the operator uses the regular (synchronous) gRPC client.

In general, it is better to use gRPC (or leave transport unset) unless there is a specific reason you must use REST (for example, if gRPC is not available or fails in your environment).

Deferrable mode considerations

When using deferrable mode, the operator defers to an async trigger that polls the long-running operation status.

  • With gRPC (explicitly or by default), the trigger uses the native async gRPC client internally. The grpc_asyncio transport is

    an implementation detail of the Google client library and is not a user-facing transport value.

  • With REST, the REST transport is synchronous-only in the Google Cloud library. To remain compatible with deferrable mode, the

    trigger performs REST calls using the synchronous client wrapped in a background thread.

REST can be used with deferrable mode, but it may be less efficient than gRPC and is generally best reserved for cases where gRPC cannot be used.

Capturing container logs in the Airflow task log

By default the operator only reports its own status messages; the container’s stdout / stderr is only available in Cloud Logging. Pass verbose=True to have the operator fetch the container log entries from Cloud Logging once the execution finishes and forward each line into the Airflow task log. This works in both eager and deferrable mode.

CloudRunExecuteJobOperator(
    task_id="execute_cloud_run_job",
    project_id=PROJECT_ID,
    region=region,
    job_name=job_name,
    verbose=True,
)

When enabling verbose:

  • The service account used by gcp_conn_id (or by impersonation_chain) must have the roles/logging.viewer role on the project that runs the job.

  • Each task instance issues at least one Cloud Logging Read API request, with additional pages fetched automatically when the execution produces enough log entries to span multiple pages. Failed executions wait briefly before fetching logs to catch entries that arrive just after the Cloud Run operation reports failure. Plan around the project-wide quota of 60 read requests per minute documented at https://cloud.google.com/logging/quotas#api-limits.

  • If the log fetch itself fails (for example missing IAM permission or quota exhausted), the operator emits a warning and the task result is unaffected.

You can also specify overrides that allow you to give a new entrypoint command to the job and more:

CloudRunExecuteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

overrides = {
    "container_overrides": [
        {
            "name": "job",
            "args": ["python", "main.py"],
            "env": [{"name": "ENV_VAR", "value": "value"}],
            "clear_args": False,
        }
    ],
    "task_count": 1,
    "timeout": "60s",
}

execute3 = CloudRunExecuteJobOperator(
    task_id=execute3_task_name,
    project_id=PROJECT_ID,
    region=region,
    overrides=overrides,
    job_name=job3_name,
    dag=dag,
    deferrable=False,
)

Update a job

If you want to specify the regional endpoint that will be used to create a Cloud Run client, you can set the flag use_regional_endpoint as True, and the API endpoint will be configured with the location you have specified in the operator.

To update a job, you can use:

CloudRunUpdateJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

update_job1 = CloudRunUpdateJobOperator(
    task_id=update_job1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job_instance_with_label(),
    dag=dag,
)

The Job’s dictionary representation is pushed to XCom.

List jobs

If you want to specify the regional endpoint that will be used to create a Cloud Run client, you can set the flag use_regional_endpoint as True, and the API endpoint will be configured with the location you have specified in the operator.

To list the jobs, you can use:

CloudRunListJobsOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

list_jobs = CloudRunListJobsOperator(
    task_id=list_jobs_task_name, project_id=PROJECT_ID, region=region, dag=dag
)

The operator takes two optional parameters: “limit” to limit the number of tasks returned, and “show_deleted” to include deleted jobs in the result.

Delete a job

If you want to specify the regional endpoint that will be used to create a Cloud Run client, you can set the flag use_regional_endpoint as True, and the API endpoint will be configured with the location you have specified in the operator.

To delete a job you can use:

CloudRunDeleteJobOperator

tests/system/google/cloud/cloud_run/example_cloud_run.py[source]

delete_job1 = CloudRunDeleteJobOperator(
    task_id="delete-job1",
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    trigger_rule=TriggerRule.ALL_DONE,
)

Note this operator waits for the job to be deleted, and the deleted Job’s dictionary representation is pushed to XCom.

Was this entry helpful?