Source code for airflow.providers.apache.hive.transfers.mysql_to_hive
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains an operator to move data from MySQL to Hive."""from__future__importannotationsimportcsvfromcollections.abcimportSequencefromcontextlibimportclosingfromtempfileimportNamedTemporaryFilefromtypingimportTYPE_CHECKINGtry:importMySQLdbexceptImportError:raiseRuntimeError("You do not have `mysqlclient` package installed. ""Please install it with `pip install mysqlclient` and make sure you have system ""mysql libraries installed, as well as well as `pkg-config` system package ""installed in case you see compilation error during installation.")fromairflow.modelsimportBaseOperatorfromairflow.providers.apache.hive.hooks.hiveimportHiveCliHookfromairflow.providers.mysql.hooks.mysqlimportMySqlHookifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classMySqlToHiveOperator(BaseOperator):""" Moves data from MySql to Hive. The operator runs your query against MySQL, stores the file locally before loading it into a Hive table. If the ``create`` or ``recreate`` arguments are set to ``True``, a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated. Hive data types are inferred from the cursor's metadata. Note that the table generated in Hive uses ``STORED AS textfile`` which isn't the most efficient serialization format. If a large amount of data is loaded and/or if the table gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a ``HiveOperator``. :param sql: SQL query to execute against the MySQL database. (templated) :param hive_table: target Hive table, use dot notation to target a specific database. (templated) :param create: whether to create the table if it doesn't exist :param recreate: whether to drop and recreate the table at every execution :param partition: target partition as a dict of partition columns and values. (templated) :param delimiter: field delimiter in the file :param quoting: controls when quotes should be generated by csv writer, It can take on any of the csv.QUOTE_* constants. :param quotechar: one-character string used to quote fields containing special characters. :param escapechar: one-character string used by csv writer to escape the delimiter or quotechar. :param mysql_conn_id: source mysql connection :param hive_cli_conn_id: Reference to the :ref:`Hive CLI connection id <howto/connection:hive_cli>`. :param hive_auth: optional authentication option passed for the Hive connection :param tblproperties: TBLPROPERTIES of the hive table being created """
[docs]deftype_map(cls,mysql_type:int)->str:"""Map MySQL type to Hive type."""types=MySQLdb.constants.FIELD_TYPEtype_map={types.BIT:"INT",types.DECIMAL:"DOUBLE",types.NEWDECIMAL:"DOUBLE",types.DOUBLE:"DOUBLE",types.FLOAT:"DOUBLE",types.INT24:"INT",types.LONG:"BIGINT",types.LONGLONG:"DECIMAL(38,0)",types.SHORT:"INT",types.TINY:"SMALLINT",types.YEAR:"INT",types.TIMESTAMP:"TIMESTAMP",}returntype_map.get(mysql_type,"STRING")
[docs]defexecute(self,context:Context):hive=HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id,auth=self.hive_auth)mysql=MySqlHook(mysql_conn_id=self.mysql_conn_id)self.log.info("Dumping MySQL query results to local file")withNamedTemporaryFile(mode="w",encoding="utf-8")asf:withclosing(mysql.get_conn())asconn,closing(conn.cursor())ascursor:cursor.execute(self.sql)csv_writer=csv.writer(f,delimiter=self.delimiter,quoting=self.quoting,quotechar=self.quotecharifself.quoting!=csv.QUOTE_NONEelseNone,escapechar=self.escapechar,)field_dict={}ifcursor.descriptionisnotNone:forfieldincursor.description:field_dict[field[0]]=self.type_map(field[1])csv_writer.writerows(cursor)# type: ignore[arg-type]f.flush()self.log.info("Loading file into Hive")hive.load_file(f.name,self.hive_table,field_dict=field_dict,create=self.create,partition=self.partition,delimiter=self.delimiter,recreate=self.recreate,tblproperties=self.tblproperties,)