airflow.providers.cncf.kubernetes.operators.spark_kubernetes

Classes

SparkKubernetesOperator

Creates sparkApplication object in kubernetes cluster.

Module Contents

class airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator(*, image=None, code_path=None, namespace='default', name=None, application_file=None, template_spec=None, get_logs=True, do_xcom_push=False, success_run_history_limit=1, startup_timeout_seconds=600, log_events_on_failure=False, reattach_on_restart=True, delete_on_termination=True, kubernetes_conn_id='kubernetes_default', random_name_suffix=True, **kwargs)[source]

Bases: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator

Creates sparkApplication object in kubernetes cluster.

See also

For more detail about Spark Application Object have a look at the reference: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.3.3-3.1.1/docs/api-docs.md#sparkapplication

Parameters:
  • image (str | None) – Docker image you wish to launch. Defaults to hub.docker.com,

  • code_path (str | None) – path to the spark code in image,

  • namespace (str) – kubernetes namespace to put sparkApplication

  • name (str | None) – name of the pod in which the task will run, will be used (plus a random suffix if random_name_suffix is True) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).

  • application_file (str | None) – filepath to kubernetes custom_resource_definition of sparkApplication

  • template_spec – kubernetes sparkApplication specification

  • get_logs (bool) – get the stdout of the container as logs of the tasks.

  • do_xcom_push (bool) – If True, the content of the file /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes.

  • success_run_history_limit (int) – Number of past successful runs of the application to keep.

  • startup_timeout_seconds – timeout in seconds to startup the pod.

  • log_events_on_failure (bool) – Log the pod’s events if a failure occurs

  • reattach_on_restart (bool) – if the scheduler dies while the pod is running, reattach and monitor. When enabled, the operator automatically adds Airflow task context labels (dag_id, task_id, run_id) to the driver and executor pods to enable finding them for reattachment.

  • delete_on_termination (bool) – What to do when the pod reaches its final state, or the execution is interrupted. If True (default), delete the pod; if False, leave the pod.

  • kubernetes_conn_id (str) – the connection to Kubernetes cluster

  • random_name_suffix (bool) – If True, adds a random suffix to the pod name

template_fields = ['application_file', 'namespace', 'template_spec', 'kubernetes_conn_id'][source]
template_fields_renderers[source]
template_ext = ('yaml', 'yml', 'json')[source]
ui_color = '#f4a460'[source]
BASE_CONTAINER_NAME = 'spark-kubernetes-driver'[source]
image = None[source]
code_path = None[source]
application_file = None[source]
template_spec = None[source]
kubernetes_conn_id = 'kubernetes_default'[source]
startup_timeout_seconds = 600[source]
reattach_on_restart = True[source]
delete_on_termination = True[source]
do_xcom_push = False[source]
namespace = 'default'[source]
get_logs = True[source]
log_events_on_failure = False[source]
success_run_history_limit = 1[source]
random_name_suffix = True[source]
base_container_name: str[source]
container_logs: list[str][source]
manage_template_specs()[source]
create_job_name()[source]
property pod_manager: airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager[source]
property template_body[source]

Templated body for CustomObjectLauncher.

find_spark_job(context, exclude_checked=True)[source]

Find an existing Spark driver pod for this task instance.

The pod is identified using Airflow task context labels. If multiple driver pods match the same labels (which can occur if cleanup did not run after an abrupt failure), a single pod is selected deterministically for reattachment, preferring a Running driver pod when present.

process_pod_deletion(pod, *, reraise=True)[source]
property hook: airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook[source]
property client: kubernetes.client.CoreV1Api[source]
property custom_obj_api: kubernetes.client.CustomObjectsApi[source]
property launcher: airflow.providers.cncf.kubernetes.operators.custom_object_launcher.CustomObjectLauncher[source]
get_or_create_spark_crd(launcher, context)[source]
execute(context)[source]

Based on the deferrable parameter runs the pod asynchronously or synchronously.

find_pod(namespace, context, *, exclude_checked=True)[source]

Override parent’s find_pod to use our Spark-specific find_spark_job method.

on_kill()[source]

Override this method to clean up subprocesses when a task instance gets killed.

Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up, or it will leave ghost processes behind.

patch_already_checked(pod, *, reraise=True)[source]

Add an “already checked” annotation to ensure we don’t reattach on retries.

dry_run()[source]

Print out the spark job that would be created by this operator.

Was this entry helpful?