Google Cloud Dataproc Operators

Dataproc is a managed Apache Spark and Apache Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming and machine learning. Dataproc automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don’t need them.

For more information about the service visit Dataproc production documentation <Product documentation

Prerequisite Tasks

To use these operators, you must do a few things:

Create a Cluster

When you create a Dataproc cluster, you have the option to choose Compute Engine as the deployment platform. In this configuration, Dataproc automatically provisions the required Compute Engine VM instances to run the cluster. The VM instances are used for the main node, primary worker and secondary worker nodes (if specified). These VM instances are created and managed by Compute Engine, while Dataproc takes care of configuring the software and orchestration required for the big data processing tasks. By providing the configuration for your nodes, you describe the configuration of primary and secondary nodes, and status of a cluster of Compute Engine instances. Configuring secondary worker nodes, you can specify the number of workers and their types. By enabling the Preemptible option to use Preemptible VMs (equivalent to Spot instances) for those nodes, you can take advantage of the cost savings provided by these instances for your Dataproc workloads. The primary node, which typically hosts the cluster main and various control services, does not have the Preemptible option because it’s crucial for the primary node to maintain stability and availability. Once a cluster is created, the configuration settings, including the preemptibility of secondary worker nodes, cannot be modified directly.

For more information about the available fields to pass when creating a cluster, visit Dataproc create cluster API.

A cluster configuration can look as followed:

tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py[source]


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "secondary_worker_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 32,
        },
        "is_preemptible": True,
        "preemptibility": "PREEMPTIBLE",
    },
}

With this configuration we can create the cluster: DataprocCreateClusterOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py[source]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

Dataproc on GKE deploys Dataproc virtual clusters on a GKE cluster. Unlike Dataproc on Compute Engine clusters, Dataproc on GKE virtual clusters do not include separate main and worker VMs. Instead, when you create a Dataproc on GKE virtual cluster, Dataproc on GKE creates node pools within a GKE cluster. Dataproc on GKE jobs are run as pods on these node pools. The node pools and scheduling of pods on the node pools are managed by GKE.

When creating a GKE Dataproc cluster, you can specify the usage of Preemptible VMs for the underlying compute resources. GKE supports the use of Preemptible VMs as a cost-saving measure. By enabling Preemptible VMs, GKE will provision the cluster nodes using Preemptible VMs. Or you can create nodes as Spot VM instances, which are the latest update to legacy preemptible VMs. This can be beneficial for running Dataproc workloads on GKE while optimizing costs.

To create Dataproc cluster in Google Kubernetes Engine you could pass cluster configuration:

tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py[source]


VIRTUAL_CLUSTER_CONFIG = {
    "kubernetes_cluster_config": {
        "gke_cluster_config": {
            "gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
            "node_pool_target": [
                {
                    "node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",
                    "roles": ["DEFAULT"],
                    "node_pool_config": {
                        "config": {
                            "preemptible": True,
                        }
                    },
                }
            ],
        },
        "kubernetes_software_config": {"component_version": {"SPARK": b"3"}},
    },
    "staging_bucket": "test-staging-bucket",
}

With this configuration we can create the cluster: DataprocCreateClusterOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py[source]

create_cluster_in_gke = DataprocCreateClusterOperator(
    task_id="create_cluster_in_gke",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
)

You can also create Dataproc cluster with optional component Presto. To do so, please use the following configuration. Note that default image might not support the chosen optional component. If this is your case, please specify correct image_version that you can find in the documentation.

tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py[source]

CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "PRESTO",
        ],
        "image_version": "2.0",
    },
}

You can also create Dataproc cluster with optional component Trino. To do so, please use the following configuration. Note that default image might not support the chosen optional component. If this is your case, please specify correct image_version that you can find in the documentation.

tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py[source]


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "TRINO",
        ],
        "image_version": "2.1",
    },
}

You can use deferrable mode for this action in order to run the operator asynchronously:

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[source]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    deferrable=True,
)

Generating Cluster Config

You can also generate CLUSTER_CONFIG using functional API, this could be easily done using make() of ClusterGenerator You can generate and use config as followed:

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py[source]


INIT_FILE = "pip-install.sh"

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone=ZONE,
    master_machine_type="n1-standard-4",
    master_disk_size=32,
    worker_machine_type="n1-standard-4",
    worker_disk_size=32,
    num_workers=2,
    storage_bucket=BUCKET_NAME,
    init_actions_uris=[f"gs://{BUCKET_NAME}/{INIT_FILE}"],
    metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
    num_preemptible_workers=1,
    preemptibility="PREEMPTIBLE",
).make()

Diagnose a cluster

Dataproc supports the collection of cluster diagnostic information like system, Spark, Hadoop, and Dataproc logs, cluster configuration files that can be used to troubleshoot a Dataproc cluster or job. It is important to note that this information can only be collected before the cluster is deleted. For more information about the available fields to pass when diagnosing a cluster, visit Dataproc diagnose cluster API.

To diagnose a Dataproc cluster use: `

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py[source]

    diagnose_cluster = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
    )

You can also use deferrable mode in order to run the operator asynchronously:

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py[source]

    diagnose_cluster_deferrable = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster_deferrable",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        deferrable=True,
    )

Update a cluster

You can scale the cluster up or down by providing a cluster config and a updateMask. In the updateMask argument you specifies the path, relative to Cluster, of the field to update. For more information on updateMask and other parameters take a look at Dataproc update cluster API.

An example of a new cluster config and the updateMask:

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py[source]

CLUSTER_UPDATE = {
    "config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
    "paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}

To update a cluster you can use: DataprocUpdateClusterOperator

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py[source]

scale_cluster = DataprocUpdateClusterOperator(
    task_id="scale_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
)

You can use deferrable mode for this action in order to run the operator asynchronously:

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[source]

update_cluster = DataprocUpdateClusterOperator(
    task_id="update_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    deferrable=True,
)

Deleting a cluster

To delete a cluster you can use:

DataprocDeleteClusterOperator.

tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py[source]

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
)

You can use deferrable mode for this action in order to run the operator asynchronously:

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[source]

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
    trigger_rule=TriggerRule.ALL_DONE,
    deferrable=True,
)

Submit a job to a cluster

Dataproc supports submitting jobs of different big data components. The list currently includes Spark, Hadoop, Pig and Hive. For more information on versions and images take a look at Cloud Dataproc Image version list

To submit a job to the cluster you need to provide a job source file. The job source file can be on GCS, the cluster or on your local file system. You can specify a file:/// path to refer to a local file on a cluster’s primary node.

The job configuration can be submitted by using: DataprocSubmitJobOperator.

tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py[source]

pyspark_task = DataprocSubmitJobOperator(
    task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)

Examples of job configurations to submit

We have provided an example for every framework below. There are more arguments to provide in the jobs than the examples show. For the complete list of arguments take a look at DataProc Job arguments

Example of the configuration for a PySpark Job:

tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py[source]

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {"main_python_file_uri": JOB_FILE_URI},
}

Example of the configuration for a SparkSQl Job:

tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py[source]

SPARK_SQL_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Example of the configuration for a Spark Job:

tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py[source]

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

Example of the configuration for a Spark Job running in deferrable mode:

tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py[source]

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

Example of the configuration for a Hive Job:

tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py[source]

HIVE_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Example of the configuration for a Hadoop Job:

tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py[source]

HADOOP_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hadoop_job": {
        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
    },
}

Example of the configuration for a Pig Job:

tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py[source]

PIG_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}

Example of the configuration for a SparkR Job:

tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py[source]

SPARKR_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_r_job": {"main_r_file_uri": JOB_FILE_URI},
}

Example of the configuration for a Presto Job:

tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py[source]

PRESTO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Example of the configuration for a Trino Job:

tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py[source]

TRINO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Working with workflows templates

Dataproc supports creating workflow templates that can be triggered later on.

A workflow template can be created using: DataprocCreateWorkflowTemplateOperator.

tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py[source]

create_workflow_template = DataprocCreateWorkflowTemplateOperator(
    task_id="create_workflow_template",
    template=WORKFLOW_TEMPLATE,
    project_id=PROJECT_ID,
    region=REGION,
)

Once a workflow is created users can trigger it using DataprocInstantiateWorkflowTemplateOperator:

tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py[source]

trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)

Also for all this action you can use operator in the deferrable mode:

tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py[source]

trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow_async",
    region=REGION,
    project_id=PROJECT_ID,
    template_id=WORKFLOW_NAME,
    deferrable=True,
)

The inline operator is an alternative. It creates a workflow, run it, and delete it afterwards: DataprocInstantiateInlineWorkflowTemplateOperator:

tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py[source]

instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)

Also for all this action you can use operator in the deferrable mode:

tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py[source]

instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template_async",
    template=WORKFLOW_TEMPLATE,
    region=REGION,
    deferrable=True,
)

Create a Batch

Dataproc supports creating a batch workload.

A batch can be created using: :class: ~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator.

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[source]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
)

create_batch_2 = DataprocCreateBatchOperator(
    task_id="create_batch_2",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_2,
    result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
)

create_batch_3 = DataprocCreateBatchOperator(
    task_id="create_batch_3",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_3,
    asynchronous=True,
)

For creating a batch with Persistent History Server first you should create a Dataproc Cluster with specific parameters. Documentation how create cluster you can find here: https://cloud.google.com/dataproc/docs/concepts/jobs/history-server#setting_up_a_persistent_history_server

tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py[source]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster_for_phs",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

After Cluster was created you should add it to the Batch configuration.

tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py[source]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch_with_phs",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG_WITH_PHS,
    batch_id=BATCH_ID,
)

To check if operation succeeded you can use

DataprocBatchSensor.

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[source]

batch_async_sensor = DataprocBatchSensor(
    task_id="batch_async_sensor",
    region=REGION,
    project_id=PROJECT_ID,
    batch_id=BATCH_ID_3,
    poke_interval=10,
)

Also for all this action you can use operator in the deferrable mode:

tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py[source]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    deferrable=True,
)

Get a Batch

To get a batch you can use: :class: ~airflow.providers.google.cloud.operators.dataproc.DataprocGetBatchOperator.

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[source]

get_batch = DataprocGetBatchOperator(
    task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)

get_batch_2 = DataprocGetBatchOperator(
    task_id="get_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)

List a Batch

To get a list of exists batches you can use: :class: ~airflow.providers.google.cloud.operators.dataproc.DataprocListBatchesOperator.

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[source]

list_batches = DataprocListBatchesOperator(
    task_id="list_batches",
    project_id=PROJECT_ID,
    region=REGION,
)

Delete a Batch

To delete a batch you can use: :class: ~airflow.providers.google.cloud.operators.dataproc.DataprocDeleteBatchOperator.

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[source]

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
    task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
    task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
    task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)

Cancel a Batch Operation

To cancel a operation you can use: :class: ~airflow.providers.google.cloud.operators.dataproc.DataprocCancelOperationOperator.

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py[source]

cancel_operation = DataprocCancelOperationOperator(
    task_id="cancel_operation",
    project_id=PROJECT_ID,
    region=REGION,
    operation_name="{{ task_instance.xcom_pull('create_batch_4') }}",
)

References

For further information, take a look at:

Was this entry helpful?