Google Cloud Composer Operators

Cloud Composer is a fully managed workflow orchestration service, enabling you to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers.

Cloud Composer is built on the popular Apache Airflow open source project and operates using the Python programming language.

By using Cloud Composer instead of a local instance of Apache Airflow, you can benefit from the best of Airflow with no installation or management overhead. Cloud Composer helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command-line tools, so you can focus on your workflows and not your infrastructure.

For more information about the service visit Cloud Composer production documentation <Product documentation

Create a environment

Before you create a cloud composer environment you need to define it. For more information about the available fields to pass when creating a environment, visit Cloud Composer create environment API.

A simple environment configuration can look as followed:

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

ENVIRONMENT_ID = os.environ.get("ENVIRONMENT_ID", "ENVIRONMENT_ID>")
ENVIRONMENT = {
    "config": {
        "node_count": 3,
        "software_config": {"image_version": "composer-1.17.7-airflow-2.1.4"},
    }
}

With this configuration we can create the environment: CloudComposerCreateEnvironmentOperator

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

create_env = CloudComposerCreateEnvironmentOperator(
    task_id="create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    environment=ENVIRONMENT,
)

or you can define the same operator in the deferrable mode: CloudComposerCreateEnvironmentOperator

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

defer_create_env = CloudComposerCreateEnvironmentOperator(
    task_id="defer_create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    environment=ENVIRONMENT,
    deferrable=True,
)

Get a environment

To get a environment you can use:

CloudComposerGetEnvironmentOperator

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

get_env = CloudComposerGetEnvironmentOperator(
    task_id="get_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

List a environments

To get a environment you can use:

CloudComposerListEnvironmentsOperator

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

list_envs = CloudComposerListEnvironmentsOperator(
    task_id="list_envs", project_id=PROJECT_ID, region=REGION
)

Update a environments

You can update the environments by providing a environments config and an updateMask. In the updateMask argument you specifies the path, relative to Environment, of the field to update. For more information on updateMask and other parameters take a look at Cloud Composer update environment API.

An example of a new service config and the updateMask:

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

UPDATED_ENVIRONMENT = {
    "labels": {
        "label1": "testing",
    }
}
UPDATE_MASK = {"paths": ["labels.label1"]}

To update a service you can use: CloudComposerUpdateEnvironmentOperator

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
)

or you can define the same operator in the deferrable mode: CloudComposerCreateEnvironmentOperator

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

defer_update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="defer_update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
    deferrable=True,
)

Delete a service

To delete a service you can use:

CloudComposerDeleteEnvironmentOperator

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

or you can define the same operator in the deferrable mode: CloudComposerDeleteEnvironmentOperator

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

defer_delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="defer_delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    deferrable=True,
)

List of Composer Images

You can also list all supported Cloud Composer images:

CloudComposerListImageVersionsOperator

airflow/providers/google/cloud/example_dags/example_cloud_composer.py[source]

image_versions = CloudComposerListImageVersionsOperator(
    task_id="image_versions",
    project_id=PROJECT_ID,
    region=REGION,
)

Was this entry helpful?