Google Cloud Composer Operators

Cloud Composer is a fully managed workflow orchestration service, enabling you to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers.

Cloud Composer is built on the popular Apache Airflow open source project and operates using the Python programming language.

By using Cloud Composer instead of a local instance of Apache Airflow, you can benefit from the best of Airflow with no installation or management overhead. Cloud Composer helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command-line tools, so you can focus on your workflows and not your infrastructure.

For more information about the service visit Cloud Composer production documentation <Product documentation

Create a environment

Before you create a cloud composer environment you need to define it. For more information about the available fields to pass when creating a environment, visit Cloud Composer create environment API.

A simple environment configuration can look as followed:

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]


ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")

ENVIRONMENT = {
    "config": {
        "software_config": {"image_version": "composer-2.5.0-airflow-2.5.3"},
    }
}

With this configuration we can create the environment: CloudComposerCreateEnvironmentOperator

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

create_env = CloudComposerCreateEnvironmentOperator(
    task_id="create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    environment=ENVIRONMENT,
)

or you can define the same operator in the deferrable mode: CloudComposerCreateEnvironmentOperator

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

defer_create_env = CloudComposerCreateEnvironmentOperator(
    task_id="defer_create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    environment=ENVIRONMENT,
    deferrable=True,
)

Get a environment

To get a environment you can use:

CloudComposerGetEnvironmentOperator

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

get_env = CloudComposerGetEnvironmentOperator(
    task_id="get_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

List a environments

To get a environment you can use:

CloudComposerListEnvironmentsOperator

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

list_envs = CloudComposerListEnvironmentsOperator(
    task_id="list_envs", project_id=PROJECT_ID, region=REGION
)

Update a environments

You can update the environments by providing a environments config and an updateMask. In the updateMask argument you specifies the path, relative to Environment, of the field to update. For more information on updateMask and other parameters take a look at Cloud Composer update environment API.

An example of a new service config and the updateMask:

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

UPDATED_ENVIRONMENT = {
    "labels": {
        "label": "testing",
    }
}
UPDATE_MASK = {"paths": ["labels.label"]}

To update a service you can use: CloudComposerUpdateEnvironmentOperator

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
)

or you can define the same operator in the deferrable mode: CloudComposerCreateEnvironmentOperator

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

defer_update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="defer_update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
    deferrable=True,
)

Delete a service

To delete a service you can use:

CloudComposerDeleteEnvironmentOperator

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

or you can define the same operator in the deferrable mode: CloudComposerDeleteEnvironmentOperator

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

defer_delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="defer_delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    deferrable=True,
)

List of Composer Images

You can also list all supported Cloud Composer images:

CloudComposerListImageVersionsOperator

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

image_versions = CloudComposerListImageVersionsOperator(
    task_id="image_versions",
    project_id=PROJECT_ID,
    region=REGION,
)

Run Airflow CLI commands

You can run Airflow CLI commands in your environments, use: CloudComposerRunAirflowCLICommandOperator

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    command=COMMAND,
)

or you can define the same operator in the deferrable mode:

tests/system/providers/google/cloud/composer/example_cloud_composer.py[source]

defer_run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="defer_run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    command=COMMAND,
    deferrable=True,
)

Was this entry helpful?