Google Cloud Dataproc Operators¶
Dataproc is a managed Apache Spark and Apache Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming and machine learning. Dataproc automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don’t need them.
For more information about the service visit Dataproc production documentation <Product documentation
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Create a Cluster¶
When you create a Dataproc cluster, you have the option to choose Compute Engine as the deployment platform. In this configuration, Dataproc automatically provisions the required Compute Engine VM instances to run the cluster. The VM instances are used for the main node, primary worker and secondary worker nodes (if specified). These VM instances are created and managed by Compute Engine, while Dataproc takes care of configuring the software and orchestration required for the big data processing tasks. By providing the configuration for your nodes, you describe the configuration of primary and secondary nodes, and status of a cluster of Compute Engine instances. Configuring secondary worker nodes, you can specify the number of workers and their types. By enabling the Preemptible option to use Preemptible VMs (equivalent to Spot instances) for those nodes, you can take advantage of the cost savings provided by these instances for your Dataproc workloads. The primary node, which typically hosts the cluster main and various control services, does not have the Preemptible option because it’s crucial for the primary node to maintain stability and availability. Once a cluster is created, the configuration settings, including the preemptibility of secondary worker nodes, cannot be modified directly.
For more information about the available fields to pass when creating a cluster, visit Dataproc create cluster API.
A cluster configuration can look as followed:
CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
    },
    "secondary_worker_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 1024,
        },
        "is_preemptible": True,
        "preemptibility": "PREEMPTIBLE",
    },
}
With this configuration we can create the cluster:
DataprocCreateClusterOperator
create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)
Dataproc on GKE deploys Dataproc virtual clusters on a GKE cluster. Unlike Dataproc on Compute Engine clusters, Dataproc on GKE virtual clusters do not include separate main and worker VMs. Instead, when you create a Dataproc on GKE virtual cluster, Dataproc on GKE creates node pools within a GKE cluster. Dataproc on GKE jobs are run as pods on these node pools. The node pools and scheduling of pods on the node pools are managed by GKE.
When creating a GKE Dataproc cluster, you can specify the usage of Preemptible VMs for the underlying compute resources. GKE supports the use of Preemptible VMs as a cost-saving measure. By enabling Preemptible VMs, GKE will provision the cluster nodes using Preemptible VMs. Or you can create nodes as Spot VM instances, which are the latest update to legacy preemptible VMs. This can be beneficial for running Dataproc workloads on GKE while optimizing costs.
To create Dataproc cluster in Google Kubernetes Engine you could pass cluster configuration:
VIRTUAL_CLUSTER_CONFIG = {
    "kubernetes_cluster_config": {
        "gke_cluster_config": {
            "gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
            "node_pool_target": [
                {
                    "node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",  # noqa
                    "roles": ["DEFAULT"],
                    "node_pool_config": {
                        "config": {
                            "preemptible": True,
                        }
                    },
                }
            ],
        },
        "kubernetes_software_config": {"component_version": {"SPARK": b"3"}},
    },
    "staging_bucket": "test-staging-bucket",
}
With this configuration we can create the cluster:
DataprocCreateClusterOperator
create_cluster_in_gke = DataprocCreateClusterOperator(
    task_id="create_cluster_in_gke",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
)
You can also create Dataproc cluster with optional component Presto.
To do so, please use the following configuration.
Note that default image might not support the chosen optional component.
If this is your case, please specify correct image_version that you can find in the
documentation.
CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
    },
    "software_config": {
        "optional_components": [
            "PRESTO",
        ],
        "image_version": "2.0",
    },
}
You can also create Dataproc cluster with optional component Trino.
To do so, please use the following configuration.
Note that default image might not support the chosen optional component.
If this is your case, please specify correct image_version that you can find in the
documentation.
CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
    },
    "software_config": {
        "optional_components": [
            "TRINO",
        ],
        "image_version": "2.1",
    },
}
You can use deferrable mode for this action in order to run the operator asynchronously:
create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    deferrable=True,
)
Generating Cluster Config¶
You can also generate CLUSTER_CONFIG using functional API,
this could be easily done using make() of
ClusterGenerator
You can generate and use config as followed:
INIT_FILE = "pip-install.sh"
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone=ZONE,
    master_machine_type="n1-standard-4",
    worker_machine_type="n1-standard-4",
    num_workers=2,
    storage_bucket=BUCKET_NAME,
    init_actions_uris=[f"gs://{BUCKET_NAME}/{INIT_FILE}"],
    metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
    num_preemptible_workers=1,
    preemptibility="PREEMPTIBLE",
).make()
Update a cluster¶
You can scale the cluster up or down by providing a cluster config and a updateMask. In the updateMask argument you specifies the path, relative to Cluster, of the field to update. For more information on updateMask and other parameters take a look at Dataproc update cluster API.
An example of a new cluster config and the updateMask:
CLUSTER_UPDATE = {
    "config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
    "paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}
To update a cluster you can use:
DataprocUpdateClusterOperator
scale_cluster = DataprocUpdateClusterOperator(
    task_id="scale_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
)
You can use deferrable mode for this action in order to run the operator asynchronously:
update_cluster = DataprocUpdateClusterOperator(
    task_id="update_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    deferrable=True,
)
Deleting a cluster¶
To delete a cluster you can use:
DataprocDeleteClusterOperator.
delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
)
You can use deferrable mode for this action in order to run the operator asynchronously:
delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
    trigger_rule=TriggerRule.ALL_DONE,
    deferrable=True,
)
Submit a job to a cluster¶
Dataproc supports submitting jobs of different big data components. The list currently includes Spark, Hadoop, Pig and Hive. For more information on versions and images take a look at Cloud Dataproc Image version list
To submit a job to the cluster you need to provide a job source file. The job source file can be on GCS, the cluster or on your local file system. You can specify a file:/// path to refer to a local file on a cluster’s primary node.
The job configuration can be submitted by using:
DataprocSubmitJobOperator.
pyspark_task = DataprocSubmitJobOperator(
    task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)
Examples of job configurations to submit¶
We have provided an example for every framework below. There are more arguments to provide in the jobs than the examples show. For the complete list of arguments take a look at DataProc Job arguments
Example of the configuration for a PySpark Job:
PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {"main_python_file_uri": JOB_FILE_URI},
}
Example of the configuration for a SparkSQl Job:
SPARK_SQL_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
Example of the configuration for a Spark Job:
SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}
Example of the configuration for a Spark Job running in deferrable mode:
SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}
Example of the configuration for a Hive Job:
HIVE_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
Example of the configuration for a Hadoop Job:
HADOOP_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hadoop_job": {
        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
    },
}
Example of the configuration for a Pig Job:
PIG_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}
Example of the configuration for a SparkR Job:
SPARKR_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_r_job": {"main_r_file_uri": JOB_FILE_URI},
}
Example of the configuration for a Presto Job:
PRESTO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}
Example of the configuration for a Trino Job:
TRINO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}
Working with workflows templates¶
Dataproc supports creating workflow templates that can be triggered later on.
A workflow template can be created using:
DataprocCreateWorkflowTemplateOperator.
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
    task_id="create_workflow_template",
    template=WORKFLOW_TEMPLATE,
    project_id=PROJECT_ID,
    region=REGION,
)
Once a workflow is created users can trigger it using
DataprocInstantiateWorkflowTemplateOperator:
trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)
Also for all this action you can use operator in the deferrable mode:
trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow_async",
    region=REGION,
    project_id=PROJECT_ID,
    template_id=WORKFLOW_NAME,
    deferrable=True,
)
The inline operator is an alternative. It creates a workflow, run it, and delete it afterwards:
DataprocInstantiateInlineWorkflowTemplateOperator:
instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)
Also for all this action you can use operator in the deferrable mode:
instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template_async",
    template=WORKFLOW_TEMPLATE,
    region=REGION,
    deferrable=True,
)
Create a Batch¶
Dataproc supports creating a batch workload.
A batch can be created using:
:class: ~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator.
create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
)
create_batch_2 = DataprocCreateBatchOperator(
    task_id="create_batch_2",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_2,
    result_retry=Retry(maximum=10.0, initial=10.0, multiplier=1.0),
)
create_batch_3 = DataprocCreateBatchOperator(
    task_id="create_batch_3",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_3,
    asynchronous=True,
)
For creating a batch with Persistent History Server first you should create a Dataproc Cluster with specific parameters. Documentation how create cluster you can find here: https://cloud.google.com/dataproc/docs/concepts/jobs/history-server#setting_up_a_persistent_history_server
create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster_for_phs",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)
After Cluster was created you should add it to the Batch configuration.
create_batch = DataprocCreateBatchOperator(
    task_id="create_batch_with_phs",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG_WITH_PHS,
    batch_id=BATCH_ID,
)
To check if operation succeeded you can use
batch_async_sensor = DataprocBatchSensor(
    task_id="batch_async_sensor",
    region=REGION,
    project_id=PROJECT_ID,
    batch_id=BATCH_ID_3,
    poke_interval=10,
)
Also for all this action you can use operator in the deferrable mode:
create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    deferrable=True,
)
Get a Batch¶
To get a batch you can use:
:class: ~airflow.providers.google.cloud.operators.dataproc.DataprocGetBatchOperator.
get_batch = DataprocGetBatchOperator(
    task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
get_batch_2 = DataprocGetBatchOperator(
    task_id="get_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
List a Batch¶
To get a list of exists batches you can use:
:class: ~airflow.providers.google.cloud.operators.dataproc.DataprocListBatchesOperator.
list_batches = DataprocListBatchesOperator(
    task_id="list_batches",
    project_id=PROJECT_ID,
    region=REGION,
)
Delete a Batch¶
To delete a batch you can use:
:class: ~airflow.providers.google.cloud.operators.dataproc.DataprocDeleteBatchOperator.
delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
    task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
    task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
    task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)
Cancel a Batch Operation¶
To cancel a operation you can use:
:class: ~airflow.providers.google.cloud.operators.dataproc.DataprocCancelOperationOperator.
cancel_operation = DataprocCancelOperationOperator(
    task_id="cancel_operation",
    project_id=PROJECT_ID,
    region=REGION,
    operation_name="{{ task_instance.xcom_pull('create_batch_4') }}",
)
References¶
For further information, take a look at:
