Apache Spark Operators

Prerequisite

To use SparkJDBCOperator and SparkSubmitOperator, you must configure a Spark Connection. For SparkJDBCOperator, you must also configure a JDBC connection.

SparkSqlOperator gets all the configurations from operator parameters.

SparkJDBCOperator

Launches applications on a Apache Spark server, it uses SparkSubmitOperator to perform data transfers to/from JDBC-based databases.

For parameter definition take a look at SparkJDBCOperator.

Using the operator

Using cmd_type parameter, is possible to transfer data from Spark to a database (spark_to_jdbc) or from a database to Spark (jdbc_to_spark), which will write the table using the Spark command saveAsTable.

airflow/providers/apache/spark/example_dags/example_spark_dag.py[source]

jdbc_to_spark_job = SparkJDBCOperator(
    cmd_type='jdbc_to_spark',
    jdbc_table="foo",
    spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
    jdbc_driver="org.postgresql.Driver",
    metastore_table="bar",
    save_mode="overwrite",
    save_format="JSON",
    task_id="jdbc_to_spark_job",
)

spark_to_jdbc_job = SparkJDBCOperator(
    cmd_type='spark_to_jdbc',
    jdbc_table="foo",
    spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
    jdbc_driver="org.postgresql.Driver",
    metastore_table="bar",
    save_mode="append",
    task_id="spark_to_jdbc_job",
)

Reference

For further information, look at Apache Spark DataFrameWriter documentation.

SparkSqlOperator

Launches applications on a Apache Spark server, it requires that the spark-sql script is in the PATH. The operator will run the SQL query on Spark Hive metastore service, the sql parameter can be templated and be a .sql or .hql file.

For parameter definition take a look at SparkSqlOperator.

Using the operator

airflow/providers/apache/spark/example_dags/example_spark_dag.py[source]

sql_job = SparkSqlOperator(sql="SELECT * FROM bar", master="local", task_id="sql_job")

Reference

For further information, look at Running the Spark SQL CLI.

SparkSubmitOperator

Launches applications on a Apache Spark server, it uses the spark-submit script that takes care of setting up the classpath with Spark and its dependencies, and can support different cluster managers and deploy modes that Spark supports.

For parameter definition take a look at SparkSubmitOperator.

Using the operator

airflow/providers/apache/spark/example_dags/example_spark_dag.py[source]

submit_job = SparkSubmitOperator(
    application="${SPARK_HOME}/examples/src/main/python/pi.py", task_id="submit_job"
)

Reference

For further information, look at Apache Spark submitting applications.

Was this entry helpful?