Google Managed Service for Apache Spark Operators¶
Managed Service for Apache Spark is a managed Apache Spark and Apache Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming and machine learning. Managed Spark automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don’t need them.
For more information about the service visit Managed Spark production documentation <Product documentation
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Sovereign Cloud from Google guidance¶
Sovereign Cloud from Google is a Google Cloud deployment with a separate service endpoint surface from standard Google Cloud regions. Dataproc support in Sovereign Cloud from Google can therefore differ from Dataproc support in standard Google Cloud. If a Dataproc API method or job type is not implemented in Sovereign Cloud from Google, the corresponding Airflow operator or job configuration will not run there. This is a Dataproc service availability limitation, not a Cloud Composer or Airflow operator limitation.
The following Dataproc job configurations were available in the tested Sovereign Cloud from Google environment:
Hadoop jobs.
Hive jobs.
PySpark jobs.
Spark jobs.
Spark SQL jobs.
Trino jobs.
The following Dataproc job configurations were not available in the tested environment:
PigJob.PrestoJob.SparkRJob.FlinkJob.
The following operators do not work in Sovereign Cloud from Google because the corresponding Dataproc API methods are not implemented there yet:
Workflow template operators:
DataprocCreateWorkflowTemplateOperator,DataprocInstantiateWorkflowTemplateOperator, andDataprocInstantiateInlineWorkflowTemplateOperator.Batch operators:
DataprocCreateBatchOperator,DataprocGetBatchOperator,DataprocListBatchesOperator,DataprocDeleteBatchOperator, andDataprocCancelOperationOperator.
The same limitation was observed with gcloud commands, so this is not a Cloud Composer or Airflow
operator limitation.
When running in Sovereign Cloud from Google, verify that the Dataproc API method and job type are available in that environment before relying on the corresponding Airflow operator in a production DAG.
Create a Cluster¶
When you create a Managed Spark cluster, you have the option to choose Compute Engine as the deployment platform. In this configuration, Managed Spark automatically provisions the required Compute Engine VM instances to run the cluster. The VM instances are used for the main node, primary worker and secondary worker nodes (if specified). These VM instances are created and managed by Compute Engine, while Managed Spark takes care of configuring the software and orchestration required for the big data processing tasks. By providing the configuration for your nodes, you describe the configuration of primary and secondary nodes, and status of a cluster of Compute Engine instances. Configuring secondary worker nodes, you can specify the number of workers and their types. By enabling the Preemptible option to use Preemptible VMs (equivalent to Spot instances) for those nodes, you can take advantage of the cost savings provided by these instances for your Managed Spark workloads. The primary node, which typically hosts the cluster main and various control services, does not have the Preemptible option because it’s crucial for the primary node to maintain stability and availability. Once a cluster is created, the configuration settings, including the preemptibility of secondary worker nodes, cannot be modified directly.
For more information about the available fields to pass when creating a cluster, visit Managed Spark create cluster API.
A cluster configuration can look as followed:
CLUSTER_CONFIG = {
"cluster_type": "STANDARD",
"cluster_tier": "CLUSTER_TIER_STANDARD",
"engine": "DEFAULT",
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"secondary_worker_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 32,
},
"is_preemptible": True,
"preemptibility": "PREEMPTIBLE",
},
}
With this configuration we can create the cluster:
DataprocCreateClusterOperator
The executable example below still imports the compatibility name
DataprocCreateClusterOperator. The preferred alias for new code is
ManagedSparkCreateClusterOperator.
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
Managed Spark on GKE deploys Managed Spark virtual clusters on a GKE cluster. Unlike Managed Spark on Compute Engine clusters, Managed Spark on GKE virtual clusters do not include separate main and worker VMs. Instead, when you create a Managed Spark on GKE virtual cluster, Managed Spark on GKE creates node pools within a GKE cluster. Managed Spark on GKE jobs are run as pods on these node pools. The node pools and scheduling of pods on the node pools are managed by GKE.
When creating a GKE Managed Spark cluster, you can specify the usage of Preemptible VMs for the underlying compute resources. GKE supports the use of Preemptible VMs as a cost-saving measure. By enabling Preemptible VMs, GKE will provision the cluster nodes using Preemptible VMs. Or you can create nodes as Spot VM instances, which are the latest update to legacy preemptible VMs. This can be beneficial for running Managed Spark workloads on GKE while optimizing costs.
To create Managed Spark cluster in Google Kubernetes Engine you could pass cluster configuration:
VIRTUAL_CLUSTER_CONFIG = {
"kubernetes_cluster_config": {
"gke_cluster_config": {
"gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
"node_pool_target": [
{
"node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",
"roles": ["DEFAULT"],
"node_pool_config": {
"config": {
"preemptible": False,
"machine_type": "e2-standard-4",
}
},
}
],
},
"kubernetes_software_config": {"component_version": {"SPARK": "3"}},
},
"staging_bucket": "test-staging-bucket",
}
With this configuration we can create the cluster:
DataprocCreateClusterOperator
The executable example below still imports the compatibility name
DataprocCreateClusterOperator. The preferred alias for new code is
ManagedSparkCreateClusterOperator.
create_cluster_in_gke = DataprocCreateClusterOperator(
task_id="create_cluster_in_gke",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
You can also create Managed Spark cluster with optional component Presto.
To do so, please use the following configuration.
Note that default image might not support the chosen optional component.
If this is your case, please specify correct image_version that you can find in the
documentation.
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"software_config": {
"optional_components": [
"PRESTO",
],
"image_version": "2.0",
},
}
You can also create Managed Spark cluster with optional component Trino.
To do so, please use the following configuration.
Note that default image might not support the chosen optional component.
If this is your case, please specify correct image_version that you can find in the
documentation.
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"software_config": {
"optional_components": [
"TRINO",
],
"image_version": "2.1",
},
}
You can use deferrable mode for this action in order to run the operator asynchronously:
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
deferrable=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
Generating Cluster Config¶
You can also generate CLUSTER_CONFIG using functional API,
this could be easily done using make() of
ClusterGenerator
You can generate and use config as followed:
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
project_id=PROJECT_ID,
zone=ZONE,
master_machine_type="n1-standard-4",
master_disk_size=32,
worker_machine_type="n1-standard-4",
worker_disk_size=32,
num_workers=2,
storage_bucket=BUCKET_NAME,
init_actions_uris=[GCS_INIT_FILE],
metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
num_preemptible_workers=1,
preemptibility="PREEMPTIBLE",
internal_ip_only=False,
cluster_tier="CLUSTER_TIER_STANDARD",
cluster_type="STANDARD",
engine="DEFAULT",
).make()
Diagnose a cluster¶
Managed Spark supports the collection of cluster diagnostic information like system, Spark, Hadoop, and Managed Spark logs, cluster configuration files that can be used to troubleshoot a Managed Spark cluster or job. It is important to note that this information can only be collected before the cluster is deleted. For more information about the available fields to pass when diagnosing a cluster, visit Managed Spark diagnose cluster API.
To diagnose a Managed Spark cluster use:
DataprocDiagnoseClusterOperator.
The executable example below still imports the compatibility name
DataprocDiagnoseClusterOperator. The preferred alias for new code is
ManagedSparkDiagnoseClusterOperator.
diagnose_cluster = DataprocDiagnoseClusterOperator(
task_id="diagnose_cluster",
region=REGION,
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
gcp_conn_id="google_cloud_default",
)
You can also use deferrable mode in order to run the operator asynchronously:
diagnose_cluster_deferrable = DataprocDiagnoseClusterOperator(
task_id="diagnose_cluster_deferrable",
region=REGION,
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
gcp_conn_id="google_cloud_default",
deferrable=True,
)
Update a cluster¶
You can scale the cluster up or down by providing a cluster config and a updateMask. In the updateMask argument you specifies the path, relative to Cluster, of the field to update. For more information on updateMask and other parameters take a look at Managed Spark update cluster API.
An example of a new cluster config and the updateMask:
CLUSTER_UPDATE = {
"config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
"paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}
To update a cluster you can use:
DataprocUpdateClusterOperator
The executable example below still imports the compatibility name
DataprocUpdateClusterOperator. The preferred alias for new code is
ManagedSparkUpdateClusterOperator.
scale_cluster = DataprocUpdateClusterOperator(
task_id="scale_cluster",
cluster_name=CLUSTER_NAME,
cluster=CLUSTER_UPDATE,
update_mask=UPDATE_MASK,
graceful_decommission_timeout=TIMEOUT,
project_id=PROJECT_ID,
region=REGION,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)
You can use deferrable mode for this action in order to run the operator asynchronously:
update_cluster = DataprocUpdateClusterOperator(
task_id="update_cluster",
cluster_name=CLUSTER_NAME,
cluster=CLUSTER_UPDATE,
update_mask=UPDATE_MASK,
graceful_decommission_timeout=TIMEOUT,
project_id=PROJECT_ID,
region=REGION,
deferrable=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)
Starting a cluster¶
To start a cluster you can use the
DataprocStartClusterOperator:
The executable example below still imports the compatibility name
DataprocStartClusterOperator. The preferred alias for new code is
ManagedSparkStartClusterOperator.
start_cluster = DataprocStartClusterOperator(
task_id="start_cluster",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
)
Stopping a cluster¶
To stop a cluster you can use the
DataprocStopClusterOperator:
The executable example below still imports the compatibility name
DataprocStopClusterOperator. The preferred alias for new code is
ManagedSparkStopClusterOperator.
stop_cluster = DataprocStopClusterOperator(
task_id="stop_cluster",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
)
Deleting a cluster¶
To delete a cluster you can use:
DataprocDeleteClusterOperator.
The executable example below still imports the compatibility name
DataprocDeleteClusterOperator. The preferred alias for new code is
ManagedSparkDeleteClusterOperator.
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
)
You can use deferrable mode for this action in order to run the operator asynchronously:
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
trigger_rule=TriggerRule.ALL_DONE,
deferrable=True,
)
Submit a job to a cluster¶
Managed Spark supports submitting jobs of different big data components. The list currently includes Spark, PySpark, Hadoop, Trino, Pig, Flink and Hive. For more information on versions and images take a look at Cloud Managed Spark Image version list
To submit a job to the cluster you need to provide a job source file. The job source file can be on GCS, the cluster or on your local file system. You can specify a file:/// path to refer to a local file on a cluster’s primary node.
The job configuration can be submitted by using:
DataprocSubmitJobOperator.
The executable example below still imports the compatibility name
DataprocSubmitJobOperator. The preferred alias for new code is
ManagedSparkSubmitJobOperator.
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)
Examples of job configurations to submit¶
We have provided an example for every framework below. There are more arguments to provide in the jobs than the examples show. For the complete list of arguments take a look at DataProc Job arguments
Example of the configuration for a PySpark Job:
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": GCS_JOB_FILE},
}
Example of the configuration for a SparkSQl Job:
SPARK_SQL_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
Example of the configuration for a Spark Job:
SPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_job": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
},
}
Example of the configuration for a Spark Job running in deferrable mode:
SPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_job": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
},
}
Example of the configuration for a Hive Job:
HIVE_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
Example of the configuration for a Hadoop Job:
HADOOP_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"hadoop_job": {
"main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
"args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
},
}
Example of the configuration for a Pig Job:
PIG_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}
Example of the configuration for a SparkR Job:
SPARKR_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_r_job": {"main_r_file_uri": GCS_JOB_FILE},
}
Example of the configuration for a Presto Job:
PRESTO_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}
Example of the configuration for a Trino Job:
TRINO_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}
Example of the configuration for a Flink Job:
FLINK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"flink_job": {
"main_class": "org.apache.flink.examples.java.wordcount.WordCount",
"jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
},
}
Working with workflows templates¶
Managed Spark supports creating workflow templates that can be triggered later on.
A workflow template can be created using:
DataprocCreateWorkflowTemplateOperator.
The executable example below still imports the compatibility name
DataprocCreateWorkflowTemplateOperator. The preferred alias for new code is
ManagedSparkCreateWorkflowTemplateOperator.
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
task_id="create_workflow_template",
template=WORKFLOW_TEMPLATE,
project_id=PROJECT_ID,
region=REGION,
)
Once a workflow is created users can trigger it using
DataprocInstantiateWorkflowTemplateOperator:
The executable example below still imports the compatibility name
DataprocInstantiateWorkflowTemplateOperator. The preferred alias for new code is
ManagedSparkInstantiateWorkflowTemplateOperator.
trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)
Also for all this action you can use operator in the deferrable mode:
trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
task_id="trigger_workflow_async",
region=REGION,
project_id=PROJECT_ID,
template_id=WORKFLOW_NAME,
deferrable=True,
)
The inline operator is an alternative. It creates a workflow, run it, and delete it afterwards:
DataprocInstantiateInlineWorkflowTemplateOperator:
The executable example below still imports the compatibility name
DataprocInstantiateInlineWorkflowTemplateOperator. The preferred alias for new code is
ManagedSparkInstantiateInlineWorkflowTemplateOperator.
instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)
Also for all this action you can use operator in the deferrable mode:
instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
task_id="instantiate_inline_workflow_template_async",
template=WORKFLOW_TEMPLATE,
region=REGION,
deferrable=True,
)
Create a Batch¶
Managed Spark supports creating a batch workload.
A batch can be created using:
DataprocCreateBatchOperator.
The executable example below still imports the compatibility name
DataprocCreateBatchOperator. The preferred alias for new code is
ManagedSparkCreateBatchOperator.
create_batch = DataprocCreateBatchOperator(
task_id="create_batch",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
create_batch_2 = DataprocCreateBatchOperator(
task_id="create_batch_2",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_2,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
create_batch_3 = DataprocCreateBatchOperator(
task_id="create_batch_3",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_3,
asynchronous=True,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
For creating a batch with Persistent History Server first you should create a Managed Spark Cluster with specific parameters. Documentation how create cluster you can find here:
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster_for_phs",
project_id=PROJECT_ID,
cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
After Cluster was created you should add it to the Batch configuration.
create_batch = DataprocCreateBatchOperator(
task_id="create_batch_with_phs",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG_WITH_PHS,
batch_id=BATCH_ID,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
To check if operation succeeded you can use
DataprocBatchSensor.
batch_async_sensor = DataprocBatchSensor(
task_id="batch_async_sensor",
region=REGION,
project_id=PROJECT_ID,
batch_id=BATCH_ID_3,
poke_interval=10,
)
Also for all this action you can use operator in the deferrable mode:
create_batch = DataprocCreateBatchOperator(
task_id="create_batch",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
deferrable=True,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
Get a Batch¶
To get a batch you can use:
DataprocGetBatchOperator.
The executable example below still imports the compatibility name
DataprocGetBatchOperator. The preferred alias for new code is
ManagedSparkGetBatchOperator.
get_batch = DataprocGetBatchOperator(
task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
List a Batch¶
To get a list of exists batches you can use:
DataprocListBatchesOperator.
The executable example below still imports the compatibility name
DataprocListBatchesOperator. The preferred alias for new code is
ManagedSparkListBatchesOperator.
list_batches = DataprocListBatchesOperator(
task_id="list_batches",
project_id=PROJECT_ID,
region=REGION,
)
Delete a Batch¶
To delete a batch you can use:
DataprocDeleteBatchOperator.
The executable example below still imports the compatibility name
DataprocDeleteBatchOperator. The preferred alias for new code is
ManagedSparkDeleteBatchOperator.
delete_batch = DataprocDeleteBatchOperator(
task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)
Cancel a Batch Operation¶
To cancel a operation you can use:
DataprocCancelOperationOperator.
The executable example below still imports the compatibility name
DataprocCancelOperationOperator. The preferred alias for new code is
ManagedSparkCancelOperationOperator.
cancel_operation = DataprocCancelOperationOperator(
task_id="cancel_operation",
project_id=PROJECT_ID,
region=REGION,
operation_name="{{ task_instance.xcom_pull('create_batch_4')['operation'] }}",
)
References¶
For further information, take a look at: