Complete the airflow survey & get a free airflow 3 certification!

JdbcOperator

Java Database Connectivity (JDBC) is an application programming interface (API) for the programming language Java, which defines how a client may access a database.

Warning

Previously, JdbcOperator was used to perform this kind of operation. But at the moment JdbcOperator is deprecated and will be removed in future versions of the provider. Please consider to switch to SQLExecuteQueryOperator as soon as possible.

Prerequisite Tasks

To use this operator you need:

  • Install the python module jaydebeapi: .. code-block:: bash

    pip install apache-airflow[jdbc]

  • Install a JVM and add a JAVA_HOME env variable.

  • Have the JDBC driver for your database installed.

Once these prerequisites are satisfied you should be able to run this Python snippet (replacing the variables values with the ones related to your driver).

Other error messages will inform you in case the jaydebeapi module is missing or the driver is not available. A Connection Refused error means that the connection string is pointing to host where no database is listening for new connections.

driver_class = "com.exasol.jdbc.EXADriver"
driver_path = "/opt/airflow/drivers/exasol/EXASolution_JDBC-7.0.2/exajdbc.jar"
connection_url = "jdbc:exa:localhost"
credentials = ["", ""]

conn = jaydebeapi.connect(
    driver_class,
    connection_url,
    credentials,
    driver_path,
)

Usage

Use the SQLExecuteQueryOperator to execute commands against a database (or data storage) accessible via a JDBC driver.

The JDBC Connection must be passed as conn_id.

tests/system/jdbc/example_jdbc_queries.py[source]

    insert_data = SQLExecuteQueryOperator(
        task_id="insert_data",
        sql="insert into my_schema.my_table select dt, value from my_schema.source_data",
        conn_id="my_jdbc_connection",
        autocommit=True,
    )

The parameter sql can receive a string or a list of strings. Each string can be an SQL statement or a reference to a template file. Template reference are recognized by ending in ‘.sql’.

The parameter autocommit if set to True will execute a commit after each command (default is False)

Templating

You can use Jinja templates to parameterize sql.

tests/system/jdbc/example_jdbc_queries.py[source]

    delete_data = SQLExecuteQueryOperator(
        task_id="delete_data",
        sql="delete from my_schema.my_table where dt = {{ ds }}",
        conn_id="my_jdbc_connection",
        autocommit=True,
    )

Was this entry helpful?