Source code for airflow.providers.apache.sqoop.operators.sqoop

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains a sqoop 1 operator."""
from __future__ import annotations

import os
import signal
from typing import TYPE_CHECKING, Any, Sequence

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.apache.sqoop.hooks.sqoop import SqoopHook

if TYPE_CHECKING:
    from airflow.utils.context import Context


[docs]class SqoopOperator(BaseOperator): """ Execute a Sqoop job. Documentation for Apache Sqoop can be found here: https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html :param conn_id: str :param cmd_type: str specify command to execute "export" or "import" :param schema: Schema name :param table: Table to read :param query: Import result of arbitrary SQL query. Instead of using the table, columns and where arguments, you can specify a SQL statement with the query argument. Must also specify a destination directory with target_dir. :param target_dir: HDFS destination directory where the data from the rdbms will be written :param append: Append data to an existing dataset in HDFS :param file_type: "avro", "sequence", "text" Imports data to into the specified format. Defaults to text. :param columns: <col,col,col> Columns to import from table :param num_mappers: Use n mapper tasks to import/export in parallel :param split_by: Column of the table used to split work units :param where: WHERE clause to use during import :param export_dir: HDFS Hive database directory to export to the rdbms :param input_null_string: The string to be interpreted as null for string columns :param input_null_non_string: The string to be interpreted as null for non-string columns :param staging_table: The table in which data will be staged before being inserted into the destination table :param clear_staging_table: Indicate that any data present in the staging table can be deleted :param enclosed_by: Sets a required field enclosing character :param escaped_by: Sets the escape character :param input_fields_terminated_by: Sets the input field separator :param input_lines_terminated_by: Sets the input end-of-line character :param input_optionally_enclosed_by: Sets a field enclosing character :param batch: Use batch mode for underlying statement execution :param direct: Use direct export fast path :param driver: Manually specify JDBC driver class to use :param verbose: Switch to more verbose logging for debug purposes :param relaxed_isolation: use read uncommitted isolation level :param hcatalog_database: Specifies the database name for the HCatalog table :param hcatalog_table: The argument value for this option is the HCatalog table :param create_hcatalog_table: Have sqoop create the hcatalog table passed in or not :param properties: additional JVM properties passed to sqoop :param extra_options: Extra import/export options to pass as dict to the SqoopHook. If a key doesn't have a value, just pass an empty string to it. Don't include prefix of -- for sqoop options. :param libjars: Optional Comma separated jar files to include in the classpath. """
[docs] template_fields: Sequence[str] = ( "conn_id", "cmd_type", "table", "query", "target_dir", "file_type", "columns", "split_by", "where", "export_dir", "input_null_string", "input_null_non_string", "staging_table", "enclosed_by", "escaped_by", "input_fields_terminated_by", "input_lines_terminated_by", "input_optionally_enclosed_by", "properties", "extra_options", "driver", "hcatalog_database", "hcatalog_table", "schema", )
[docs] template_fields_renderers = {"query": "sql"}
[docs] ui_color = "#7D8CA4"
def __init__( self, *, conn_id: str = "sqoop_default", cmd_type: str = "import", table: str | None = None, query: str | None = None, target_dir: str | None = None, append: bool = False, file_type: str = "text", columns: str | None = None, num_mappers: int | None = None, split_by: str | None = None, where: str | None = None, export_dir: str | None = None, input_null_string: str | None = None, input_null_non_string: str | None = None, staging_table: str | None = None, clear_staging_table: bool = False, enclosed_by: str | None = None, escaped_by: str | None = None, input_fields_terminated_by: str | None = None, input_lines_terminated_by: str | None = None, input_optionally_enclosed_by: str | None = None, batch: bool = False, direct: bool = False, driver: Any | None = None, verbose: bool = False, relaxed_isolation: bool = False, properties: dict[str, Any] | None = None, hcatalog_database: str | None = None, hcatalog_table: str | None = None, create_hcatalog_table: bool = False, extra_options: dict[str, Any] | None = None, schema: str | None = None, libjars: str | None = None, **kwargs: Any, ) -> None: super().__init__(**kwargs) self.conn_id = conn_id self.cmd_type = cmd_type self.table = table self.query = query self.target_dir = target_dir self.append = append self.file_type = file_type self.columns = columns self.num_mappers = num_mappers self.split_by = split_by self.where = where self.export_dir = export_dir self.input_null_string = input_null_string self.input_null_non_string = input_null_non_string self.staging_table = staging_table self.clear_staging_table = clear_staging_table self.enclosed_by = enclosed_by self.escaped_by = escaped_by self.input_fields_terminated_by = input_fields_terminated_by self.input_lines_terminated_by = input_lines_terminated_by self.input_optionally_enclosed_by = input_optionally_enclosed_by self.batch = batch self.direct = direct self.driver = driver self.verbose = verbose self.relaxed_isolation = relaxed_isolation self.hcatalog_database = hcatalog_database self.hcatalog_table = hcatalog_table self.create_hcatalog_table = create_hcatalog_table self.properties = properties self.extra_options = extra_options or {} self.hook: SqoopHook | None = None self.schema = schema self.libjars = libjars
[docs] def execute(self, context: Context) -> None: """Execute sqoop job.""" if self.hook is None: self.hook = self._get_hook() if self.cmd_type == "export": self.hook.export_table( table=self.table, # type: ignore export_dir=self.export_dir, input_null_string=self.input_null_string, input_null_non_string=self.input_null_non_string, staging_table=self.staging_table, clear_staging_table=self.clear_staging_table, enclosed_by=self.enclosed_by, escaped_by=self.escaped_by, input_fields_terminated_by=self.input_fields_terminated_by, input_lines_terminated_by=self.input_lines_terminated_by, input_optionally_enclosed_by=self.input_optionally_enclosed_by, batch=self.batch, relaxed_isolation=self.relaxed_isolation, schema=self.schema, ) elif self.cmd_type == "import": if self.table and self.query: raise AirflowException("Cannot specify query and table together. Need to specify either or.") if self.table: self.hook.import_table( table=self.table, target_dir=self.target_dir, append=self.append, file_type=self.file_type, columns=self.columns, split_by=self.split_by, where=self.where, direct=self.direct, driver=self.driver, schema=self.schema, ) elif self.query: self.hook.import_query( query=self.query, target_dir=self.target_dir, append=self.append, file_type=self.file_type, split_by=self.split_by, direct=self.direct, driver=self.driver, ) else: raise AirflowException("Provide query or table parameter to import using Sqoop") else: raise AirflowException("cmd_type should be 'import' or 'export'")
[docs] def on_kill(self) -> None: if self.hook is None: self.hook = self._get_hook() self.log.info("Sending SIGTERM signal to bash process group") os.killpg(os.getpgid(self.hook.sub_process_pid), signal.SIGTERM)
def _get_hook(self) -> SqoopHook: """Return a SqoopHook instance.""" # Add `create-hcatalog-table` to extra options if option passed to operator in case of `import` # command. Similarly, if new parameters are added to the operator, you can pass them to # `extra_options` so that you don't need to modify `SqoopHook` for each new parameter. if self.cmd_type == "import" and self.create_hcatalog_table: self.extra_options["create-hcatalog-table"] = "" return SqoopHook( conn_id=self.conn_id, verbose=self.verbose, num_mappers=self.num_mappers, hcatalog_database=self.hcatalog_database, hcatalog_table=self.hcatalog_table, properties=self.properties, libjars=self.libjars, extra_options=self.extra_options, )

Was this entry helpful?