Google Managed Service for Apache Spark Operators

Managed Service for Apache Spark is a managed Apache Spark and Apache Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming and machine learning. Managed Spark automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don’t need them.

For more information about the service visit Managed Spark production documentation <Product documentation

Prerequisite Tasks

To use these operators, you must do a few things:

Sovereign Cloud from Google guidance

Sovereign Cloud from Google is a Google Cloud deployment with a separate service endpoint surface from standard Google Cloud regions. Dataproc support in Sovereign Cloud from Google can therefore differ from Dataproc support in standard Google Cloud. If a Dataproc API method or job type is not implemented in Sovereign Cloud from Google, the corresponding Airflow operator or job configuration will not run there. This is a Dataproc service availability limitation, not a Cloud Composer or Airflow operator limitation.

The following Dataproc job configurations were available in the tested Sovereign Cloud from Google environment:

  • Hadoop jobs.

  • Hive jobs.

  • PySpark jobs.

  • Spark jobs.

  • Spark SQL jobs.

  • Trino jobs.

The following Dataproc job configurations were not available in the tested environment:

  • PigJob.

  • PrestoJob.

  • SparkRJob.

  • FlinkJob.

The following operators do not work in Sovereign Cloud from Google because the corresponding Dataproc API methods are not implemented there yet:

The same limitation was observed with gcloud commands, so this is not a Cloud Composer or Airflow operator limitation.

When running in Sovereign Cloud from Google, verify that the Dataproc API method and job type are available in that environment before relying on the corresponding Airflow operator in a production DAG.

Create a Cluster

When you create a Managed Spark cluster, you have the option to choose Compute Engine as the deployment platform. In this configuration, Managed Spark automatically provisions the required Compute Engine VM instances to run the cluster. The VM instances are used for the main node, primary worker and secondary worker nodes (if specified). These VM instances are created and managed by Compute Engine, while Managed Spark takes care of configuring the software and orchestration required for the big data processing tasks. By providing the configuration for your nodes, you describe the configuration of primary and secondary nodes, and status of a cluster of Compute Engine instances. Configuring secondary worker nodes, you can specify the number of workers and their types. By enabling the Preemptible option to use Preemptible VMs (equivalent to Spot instances) for those nodes, you can take advantage of the cost savings provided by these instances for your Managed Spark workloads. The primary node, which typically hosts the cluster main and various control services, does not have the Preemptible option because it’s crucial for the primary node to maintain stability and availability. Once a cluster is created, the configuration settings, including the preemptibility of secondary worker nodes, cannot be modified directly.

For more information about the available fields to pass when creating a cluster, visit Managed Spark create cluster API.

A cluster configuration can look as followed:

tests/system/google/cloud/dataproc/example_dataproc_hive.py[source]


CLUSTER_CONFIG = {
    "cluster_type": "STANDARD",
    "cluster_tier": "CLUSTER_TIER_STANDARD",
    "engine": "DEFAULT",
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "secondary_worker_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 32,
        },
        "is_preemptible": True,
        "preemptibility": "PREEMPTIBLE",
    },
}

With this configuration we can create the cluster: DataprocCreateClusterOperator

The executable example below still imports the compatibility name DataprocCreateClusterOperator. The preferred alias for new code is ManagedSparkCreateClusterOperator.

tests/system/google/cloud/dataproc/example_dataproc_hive.py[source]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

Managed Spark on GKE deploys Managed Spark virtual clusters on a GKE cluster. Unlike Managed Spark on Compute Engine clusters, Managed Spark on GKE virtual clusters do not include separate main and worker VMs. Instead, when you create a Managed Spark on GKE virtual cluster, Managed Spark on GKE creates node pools within a GKE cluster. Managed Spark on GKE jobs are run as pods on these node pools. The node pools and scheduling of pods on the node pools are managed by GKE.

When creating a GKE Managed Spark cluster, you can specify the usage of Preemptible VMs for the underlying compute resources. GKE supports the use of Preemptible VMs as a cost-saving measure. By enabling Preemptible VMs, GKE will provision the cluster nodes using Preemptible VMs. Or you can create nodes as Spot VM instances, which are the latest update to legacy preemptible VMs. This can be beneficial for running Managed Spark workloads on GKE while optimizing costs.

To create Managed Spark cluster in Google Kubernetes Engine you could pass cluster configuration:

tests/system/google/cloud/dataproc/example_dataproc_gke.py[source]


VIRTUAL_CLUSTER_CONFIG = {
    "kubernetes_cluster_config": {
        "gke_cluster_config": {
            "gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
            "node_pool_target": [
                {
                    "node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",
                    "roles": ["DEFAULT"],
                    "node_pool_config": {
                        "config": {
                            "preemptible": False,
                            "machine_type": "e2-standard-4",
                        }
                    },
                }
            ],
        },
        "kubernetes_software_config": {"component_version": {"SPARK": "3"}},
    },
    "staging_bucket": "test-staging-bucket",
}

With this configuration we can create the cluster: DataprocCreateClusterOperator

The executable example below still imports the compatibility name DataprocCreateClusterOperator. The preferred alias for new code is ManagedSparkCreateClusterOperator.

tests/system/google/cloud/dataproc/example_dataproc_gke.py[source]

create_cluster_in_gke = DataprocCreateClusterOperator(
    task_id="create_cluster_in_gke",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

You can also create Managed Spark cluster with optional component Presto. To do so, please use the following configuration. Note that default image might not support the chosen optional component. If this is your case, please specify correct image_version that you can find in the documentation.

tests/system/google/cloud/dataproc/example_dataproc_presto.py[source]

CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "PRESTO",
        ],
        "image_version": "2.0",
    },
}

You can also create Managed Spark cluster with optional component Trino. To do so, please use the following configuration. Note that default image might not support the chosen optional component. If this is your case, please specify correct image_version that you can find in the documentation.

tests/system/google/cloud/dataproc/example_dataproc_trino.py[source]


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "TRINO",
        ],
        "image_version": "2.1",
    },
}

You can use deferrable mode for this action in order to run the operator asynchronously:

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[source]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    deferrable=True,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

Generating Cluster Config

You can also generate CLUSTER_CONFIG using functional API, this could be easily done using make() of ClusterGenerator You can generate and use config as followed:

tests/system/google/cloud/dataproc/example_dataproc_cluster_generator.py[source]

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone=ZONE,
    master_machine_type="n1-standard-4",
    master_disk_size=32,
    worker_machine_type="n1-standard-4",
    worker_disk_size=32,
    num_workers=2,
    storage_bucket=BUCKET_NAME,
    init_actions_uris=[GCS_INIT_FILE],
    metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
    num_preemptible_workers=1,
    preemptibility="PREEMPTIBLE",
    internal_ip_only=False,
    cluster_tier="CLUSTER_TIER_STANDARD",
    cluster_type="STANDARD",
    engine="DEFAULT",
).make()

Diagnose a cluster

Managed Spark supports the collection of cluster diagnostic information like system, Spark, Hadoop, and Managed Spark logs, cluster configuration files that can be used to troubleshoot a Managed Spark cluster or job. It is important to note that this information can only be collected before the cluster is deleted. For more information about the available fields to pass when diagnosing a cluster, visit Managed Spark diagnose cluster API.

To diagnose a Managed Spark cluster use: DataprocDiagnoseClusterOperator.

The executable example below still imports the compatibility name DataprocDiagnoseClusterOperator. The preferred alias for new code is ManagedSparkDiagnoseClusterOperator.

tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py[source]

    diagnose_cluster = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        gcp_conn_id="google_cloud_default",
    )

You can also use deferrable mode in order to run the operator asynchronously:

tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py[source]

    diagnose_cluster_deferrable = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster_deferrable",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        gcp_conn_id="google_cloud_default",
        deferrable=True,
    )

Update a cluster

You can scale the cluster up or down by providing a cluster config and a updateMask. In the updateMask argument you specifies the path, relative to Cluster, of the field to update. For more information on updateMask and other parameters take a look at Managed Spark update cluster API.

An example of a new cluster config and the updateMask:

tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py[source]

CLUSTER_UPDATE = {
    "config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
    "paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}

To update a cluster you can use: DataprocUpdateClusterOperator

The executable example below still imports the compatibility name DataprocUpdateClusterOperator. The preferred alias for new code is ManagedSparkUpdateClusterOperator.

tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py[source]

scale_cluster = DataprocUpdateClusterOperator(
    task_id="scale_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)

You can use deferrable mode for this action in order to run the operator asynchronously:

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[source]

update_cluster = DataprocUpdateClusterOperator(
    task_id="update_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    deferrable=True,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)

Starting a cluster

To start a cluster you can use the DataprocStartClusterOperator:

The executable example below still imports the compatibility name DataprocStartClusterOperator. The preferred alias for new code is ManagedSparkStartClusterOperator.

tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py[source]

start_cluster = DataprocStartClusterOperator(
    task_id="start_cluster",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

Stopping a cluster

To stop a cluster you can use the DataprocStopClusterOperator:

The executable example below still imports the compatibility name DataprocStopClusterOperator. The preferred alias for new code is ManagedSparkStopClusterOperator.

tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py[source]

stop_cluster = DataprocStopClusterOperator(
    task_id="stop_cluster",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

Deleting a cluster

To delete a cluster you can use: DataprocDeleteClusterOperator.

The executable example below still imports the compatibility name DataprocDeleteClusterOperator. The preferred alias for new code is ManagedSparkDeleteClusterOperator.

tests/system/google/cloud/dataproc/example_dataproc_hive.py[source]

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
)

You can use deferrable mode for this action in order to run the operator asynchronously:

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py[source]

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
    trigger_rule=TriggerRule.ALL_DONE,
    deferrable=True,
)

Submit a job to a cluster

Managed Spark supports submitting jobs of different big data components. The list currently includes Spark, PySpark, Hadoop, Trino, Pig, Flink and Hive. For more information on versions and images take a look at Cloud Managed Spark Image version list

To submit a job to the cluster you need to provide a job source file. The job source file can be on GCS, the cluster or on your local file system. You can specify a file:/// path to refer to a local file on a cluster’s primary node.

The job configuration can be submitted by using: DataprocSubmitJobOperator.

The executable example below still imports the compatibility name DataprocSubmitJobOperator. The preferred alias for new code is ManagedSparkSubmitJobOperator.

tests/system/google/cloud/dataproc/example_dataproc_pyspark.py[source]

pyspark_task = DataprocSubmitJobOperator(
    task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)

Examples of job configurations to submit

We have provided an example for every framework below. There are more arguments to provide in the jobs than the examples show. For the complete list of arguments take a look at DataProc Job arguments

Example of the configuration for a PySpark Job:

tests/system/google/cloud/dataproc/example_dataproc_pyspark.py[source]

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {"main_python_file_uri": GCS_JOB_FILE},
}

Example of the configuration for a SparkSQl Job:

tests/system/google/cloud/dataproc/example_dataproc_spark_sql.py[source]

SPARK_SQL_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Example of the configuration for a Spark Job:

tests/system/google/cloud/dataproc/example_dataproc_spark.py[source]

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

Example of the configuration for a Spark Job running in deferrable mode:

tests/system/google/cloud/dataproc/example_dataproc_spark_deferrable.py[source]

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

Example of the configuration for a Hive Job:

tests/system/google/cloud/dataproc/example_dataproc_hive.py[source]

HIVE_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Example of the configuration for a Hadoop Job:

tests/system/google/cloud/dataproc/example_dataproc_hadoop.py[source]

HADOOP_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hadoop_job": {
        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
    },
}

Example of the configuration for a Pig Job:

tests/system/google/cloud/dataproc/example_dataproc_pig.py[source]

PIG_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}

Example of the configuration for a SparkR Job:

tests/system/google/cloud/dataproc/example_dataproc_sparkr.py[source]

SPARKR_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_r_job": {"main_r_file_uri": GCS_JOB_FILE},
}

Example of the configuration for a Presto Job:

tests/system/google/cloud/dataproc/example_dataproc_presto.py[source]

PRESTO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Example of the configuration for a Trino Job:

tests/system/google/cloud/dataproc/example_dataproc_trino.py[source]

TRINO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Example of the configuration for a Flink Job:

tests/system/google/cloud/dataproc/example_dataproc_flink.py[source]

FLINK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "flink_job": {
        "main_class": "org.apache.flink.examples.java.wordcount.WordCount",
        "jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
    },
}

Working with workflows templates

Managed Spark supports creating workflow templates that can be triggered later on.

A workflow template can be created using: DataprocCreateWorkflowTemplateOperator.

The executable example below still imports the compatibility name DataprocCreateWorkflowTemplateOperator. The preferred alias for new code is ManagedSparkCreateWorkflowTemplateOperator.

tests/system/google/cloud/dataproc/example_dataproc_workflow.py[source]

create_workflow_template = DataprocCreateWorkflowTemplateOperator(
    task_id="create_workflow_template",
    template=WORKFLOW_TEMPLATE,
    project_id=PROJECT_ID,
    region=REGION,
)

Once a workflow is created users can trigger it using DataprocInstantiateWorkflowTemplateOperator:

The executable example below still imports the compatibility name DataprocInstantiateWorkflowTemplateOperator. The preferred alias for new code is ManagedSparkInstantiateWorkflowTemplateOperator.

tests/system/google/cloud/dataproc/example_dataproc_workflow.py[source]

trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)

Also for all this action you can use operator in the deferrable mode:

tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py[source]

trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow_async",
    region=REGION,
    project_id=PROJECT_ID,
    template_id=WORKFLOW_NAME,
    deferrable=True,
)

The inline operator is an alternative. It creates a workflow, run it, and delete it afterwards: DataprocInstantiateInlineWorkflowTemplateOperator:

The executable example below still imports the compatibility name DataprocInstantiateInlineWorkflowTemplateOperator. The preferred alias for new code is ManagedSparkInstantiateInlineWorkflowTemplateOperator.

tests/system/google/cloud/dataproc/example_dataproc_workflow.py[source]

instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)

Also for all this action you can use operator in the deferrable mode:

tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py[source]

instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template_async",
    template=WORKFLOW_TEMPLATE,
    region=REGION,
    deferrable=True,
)

Create a Batch

Managed Spark supports creating a batch workload.

A batch can be created using: DataprocCreateBatchOperator.

The executable example below still imports the compatibility name DataprocCreateBatchOperator. The preferred alias for new code is ManagedSparkCreateBatchOperator.

tests/system/google/cloud/dataproc/example_dataproc_batch.py[source]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

create_batch_2 = DataprocCreateBatchOperator(
    task_id="create_batch_2",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_2,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

create_batch_3 = DataprocCreateBatchOperator(
    task_id="create_batch_3",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_3,
    asynchronous=True,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

For creating a batch with Persistent History Server first you should create a Managed Spark Cluster with specific parameters. Documentation how create cluster you can find here:

tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py[source]

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster_for_phs",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

After Cluster was created you should add it to the Batch configuration.

tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py[source]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch_with_phs",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG_WITH_PHS,
    batch_id=BATCH_ID,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

To check if operation succeeded you can use DataprocBatchSensor.

tests/system/google/cloud/dataproc/example_dataproc_batch.py[source]

batch_async_sensor = DataprocBatchSensor(
    task_id="batch_async_sensor",
    region=REGION,
    project_id=PROJECT_ID,
    batch_id=BATCH_ID_3,
    poke_interval=10,
)

Also for all this action you can use operator in the deferrable mode:

tests/system/google/cloud/dataproc/example_dataproc_batch_deferrable.py[source]

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    deferrable=True,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

Get a Batch

To get a batch you can use: DataprocGetBatchOperator.

The executable example below still imports the compatibility name DataprocGetBatchOperator. The preferred alias for new code is ManagedSparkGetBatchOperator.

tests/system/google/cloud/dataproc/example_dataproc_batch.py[source]

get_batch = DataprocGetBatchOperator(
    task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)

List a Batch

To get a list of exists batches you can use: DataprocListBatchesOperator.

The executable example below still imports the compatibility name DataprocListBatchesOperator. The preferred alias for new code is ManagedSparkListBatchesOperator.

tests/system/google/cloud/dataproc/example_dataproc_batch.py[source]

list_batches = DataprocListBatchesOperator(
    task_id="list_batches",
    project_id=PROJECT_ID,
    region=REGION,
)

Delete a Batch

To delete a batch you can use: DataprocDeleteBatchOperator.

The executable example below still imports the compatibility name DataprocDeleteBatchOperator. The preferred alias for new code is ManagedSparkDeleteBatchOperator.

tests/system/google/cloud/dataproc/example_dataproc_batch.py[source]

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
    task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
    task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
    task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)

Cancel a Batch Operation

To cancel a operation you can use: DataprocCancelOperationOperator.

The executable example below still imports the compatibility name DataprocCancelOperationOperator. The preferred alias for new code is ManagedSparkCancelOperationOperator.

tests/system/google/cloud/dataproc/example_dataproc_batch.py[source]

cancel_operation = DataprocCancelOperationOperator(
    task_id="cancel_operation",
    project_id=PROJECT_ID,
    region=REGION,
    operation_name="{{ task_instance.xcom_pull('create_batch_4')['operation'] }}",
)

References

For further information, take a look at:

Was this entry helpful?