airflow.providers.apache.spark.operators.spark_jdbc

Module Contents

Classes

SparkJDBCOperator

This operator extends the SparkSubmitOperator specifically for performing data

class airflow.providers.apache.spark.operators.spark_jdbc.SparkJDBCOperator(*, spark_app_name='airflow-spark-jdbc', spark_conn_id='spark-default', spark_conf=None, spark_py_files=None, spark_files=None, spark_jars=None, num_executors=None, executor_cores=None, executor_memory=None, driver_memory=None, verbose=False, principal=None, keytab=None, cmd_type='spark_to_jdbc', jdbc_table=None, jdbc_conn_id='jdbc-default', jdbc_driver=None, metastore_table=None, jdbc_truncate=False, save_mode=None, save_format=None, batch_size=None, fetch_size=None, num_partitions=None, partition_column=None, lower_bound=None, upper_bound=None, create_table_column_types=None, **kwargs)[source]

Bases: airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator

This operator extends the SparkSubmitOperator specifically for performing data transfers to/from JDBC-based databases with Apache Spark. As with the SparkSubmitOperator, it assumes that the “spark-submit” binary is available on the PATH.

See also

For more information on how to use this operator, take a look at the guide: SparkJDBCOperator

Parameters
  • spark_app_name (str) – Name of the job (default airflow-spark-jdbc)

  • spark_conn_id (str) – The spark connection id as configured in Airflow administration

  • spark_conf (Optional[Dict[str, Any]]) – Any additional Spark configuration properties

  • spark_py_files (Optional[str]) – Additional python files used (.zip, .egg, or .py)

  • spark_files (Optional[str]) – Additional files to upload to the container running the job

  • spark_jars (Optional[str]) – Additional jars to upload and add to the driver and executor classpath

  • num_executors (Optional[int]) – number of executor to run. This should be set so as to manage the number of connections made with the JDBC database

  • executor_cores (Optional[int]) – Number of cores per executor

  • executor_memory (Optional[str]) – Memory per executor (e.g. 1000M, 2G)

  • driver_memory (Optional[str]) – Memory allocated to the driver (e.g. 1000M, 2G)

  • verbose (bool) – Whether to pass the verbose flag to spark-submit for debugging

  • keytab (Optional[str]) – Full path to the file that contains the keytab

  • principal (Optional[str]) – The name of the kerberos principal used for keytab

  • cmd_type (str) – Which way the data should flow. 2 possible values: spark_to_jdbc: data written by spark from metastore to jdbc jdbc_to_spark: data written by spark from jdbc to metastore

  • jdbc_table (Optional[str]) – The name of the JDBC table

  • jdbc_conn_id (str) – Connection id used for connection to JDBC database

  • jdbc_driver (Optional[str]) – Name of the JDBC driver to use for the JDBC connection. This driver (usually a jar) should be passed in the ‘jars’ parameter

  • metastore_table (Optional[str]) – The name of the metastore table,

  • jdbc_truncate (bool) – (spark_to_jdbc only) Whether or not Spark should truncate or drop and recreate the JDBC table. This only takes effect if ‘save_mode’ is set to Overwrite. Also, if the schema is different, Spark cannot truncate, and will drop and recreate

  • save_mode (Optional[str]) – The Spark save-mode to use (e.g. overwrite, append, etc.)

  • save_format (Optional[str]) – (jdbc_to_spark-only) The Spark save-format to use (e.g. parquet)

  • batch_size (Optional[int]) – (spark_to_jdbc only) The size of the batch to insert per round trip to the JDBC database. Defaults to 1000

  • fetch_size (Optional[int]) – (jdbc_to_spark only) The size of the batch to fetch per round trip from the JDBC database. Default depends on the JDBC driver

  • num_partitions (Optional[int]) – The maximum number of partitions that can be used by Spark simultaneously, both for spark_to_jdbc and jdbc_to_spark operations. This will also cap the number of JDBC connections that can be opened

  • partition_column (Optional[str]) – (jdbc_to_spark-only) A numeric column to be used to partition the metastore table by. If specified, you must also specify: num_partitions, lower_bound, upper_bound

  • lower_bound (Optional[str]) – (jdbc_to_spark-only) Lower bound of the range of the numeric partition column to fetch. If specified, you must also specify: num_partitions, partition_column, upper_bound

  • upper_bound (Optional[str]) – (jdbc_to_spark-only) Upper bound of the range of the numeric partition column to fetch. If specified, you must also specify: num_partitions, partition_column, lower_bound

  • create_table_column_types (Optional[str]) – (spark_to_jdbc-only) The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: “name CHAR(64), comments VARCHAR(1024)”). The specified types should be valid spark sql data types.

execute(self, context)[source]

Call the SparkSubmitHook to run the provided spark job

on_kill(self)[source]

Override this method to cleanup subprocesses when a task instance gets killed. Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up or it will leave ghost processes behind.

Was this entry helpful?