Source code for airflow.providers.mysql.hooks.mysql
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module allows to connect to a MySQL database."""from__future__importannotationsimportjsonimportloggingfromtypingimportTYPE_CHECKING,Any,Unionfromairflow.exceptionsimportAirflowOptionalProviderFeatureExceptionfromairflow.providers.common.sql.hooks.sqlimportDbApiHook
ifTYPE_CHECKING:fromairflow.modelsimportConnectiontry:frommysql.connector.abstractsimportMySQLConnectionAbstractexceptModuleNotFoundError:logger.warning("The package 'mysql-connector-python' is not installed. Import skipped")try:fromMySQLdb.connectionsimportConnectionasMySQLdbConnectionexceptImportError:raiseRuntimeError("You do not have `mysqlclient` package installed. ""Please install it with `pip install mysqlclient` and make sure you have system ""mysql libraries installed, as well as well as `pkg-config` system package ""installed in case you see compilation error during installation.")
[docs]classMySqlHook(DbApiHook):""" Interact with MySQL. You can specify charset in the extra field of your connection as ``{"charset": "utf8"}``. Also you can choose cursor as ``{"cursor": "SSCursor"}``. Refer to the MySQLdb.cursors for more details. Note: For AWS IAM authentication, use iam in the extra connection parameters and set it to true. Leave the password field empty. This will use the "aws_default" connection to get the temporary token unless you override in extras. extras example: ``{"iam":true, "aws_conn_id":"my_aws_conn"}`` You can also add "local_infile" parameter to determine whether local_infile feature of MySQL client is going to be enabled (it is disabled by default). :param schema: The MySQL database schema to connect to. :param connection: The :ref:`MySQL connection id <howto/connection:mysql>` used for MySQL credentials. :param local_infile: Boolean flag determining if local_infile should be used :param init_command: Initial command to issue to MySQL server upon connection """
[docs]defset_autocommit(self,conn:MySQLConnectionTypes,autocommit:bool)->None:""" Set *autocommit*. *mysqlclient* uses an *autocommit* method rather than an *autocommit* property, so we need to override this to support it. :param conn: connection to set autocommit setting :param autocommit: autocommit setting """ifhasattr(conn.__class__,"autocommit")andisinstance(conn.__class__.autocommit,property):conn.autocommit=autocommitelse:conn.autocommit(autocommit)# type: ignore[operator]
[docs]defget_autocommit(self,conn:MySQLConnectionTypes)->bool:""" Whether *autocommit* is active. *mysqlclient* uses an *get_autocommit* method rather than an *autocommit* property, so we need to override this to support it. :param conn: connection to get autocommit setting from. :return: connection autocommit setting """ifhasattr(conn.__class__,"autocommit")andisinstance(conn.__class__.autocommit,property):returnconn.autocommitelse:returnconn.get_autocommit()# type: ignore[union-attr]
def_get_conn_config_mysql_client(self,conn:Connection)->dict:conn_config={"user":conn.login,"passwd":conn.passwordor"","host":conn.hostor"localhost","db":self.schemaorconn.schemaor"",}# check for authentication via AWS IAMifconn.extra_dejson.get("iam",False):conn_config["passwd"],conn.port=self.get_iam_token(conn)conn_config["read_default_group"]="enable-cleartext-plugin"conn_config["port"]=int(conn.port)ifconn.portelse3306ifconn.extra_dejson.get("charset",False):conn_config["charset"]=conn.extra_dejson["charset"]ifconn_config["charset"].lower()in("utf8","utf-8"):conn_config["use_unicode"]=Trueifconn.extra_dejson.get("cursor",False):try:importMySQLdb.cursorsexceptImportError:raiseRuntimeError("You do not have `mysqlclient` package installed. ""Please install it with `pip install mysqlclient` and make sure you have system ""mysql libraries installed, as well as well as `pkg-config` system package ""installed in case you see compilation error during installation.")cursor_type=conn.extra_dejson.get("cursor","").lower()# Dictionary mapping cursor types to their respective classescursor_classes={"sscursor":MySQLdb.cursors.SSCursor,"dictcursor":MySQLdb.cursors.DictCursor,"ssdictcursor":MySQLdb.cursors.SSDictCursor,}# Set the cursor class in the connection configuration based on the cursor typeifcursor_typeincursor_classes:conn_config["cursorclass"]=cursor_classes[cursor_type]ifconn.extra_dejson.get("ssl",False):# SSL parameter for MySQL has to be a dictionary and in case# of extra/dejson we can get string if extra is passed via# URL parametersdejson_ssl=conn.extra_dejson["ssl"]ifisinstance(dejson_ssl,str):dejson_ssl=json.loads(dejson_ssl)conn_config["ssl"]=dejson_sslifconn.extra_dejson.get("ssl_mode",False):conn_config["ssl_mode"]=conn.extra_dejson["ssl_mode"]ifconn.extra_dejson.get("unix_socket"):conn_config["unix_socket"]=conn.extra_dejson["unix_socket"]ifself.local_infile:conn_config["local_infile"]=1ifself.init_command:conn_config["init_command"]=self.init_commandreturnconn_configdef_get_conn_config_mysql_connector_python(self,conn:Connection)->dict:conn_config={"user":conn.login,"password":conn.passwordor"","host":conn.hostor"localhost","database":self.schemaorconn.schemaor"","port":int(conn.port)ifconn.portelse3306,}ifself.local_infile:conn_config["allow_local_infile"]=Trueifself.init_command:conn_config["init_command"]=self.init_command# Ref: https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.htmlforkey,valueinconn.extra_dejson.items():ifkey.startswith("ssl_"):conn_config[key]=valuereturnconn_config
[docs]defget_conn(self)->MySQLConnectionTypes:""" Get connection to a MySQL database. Establishes a connection to a mysql database by extracting the connection configuration from the Airflow connection. .. note:: By default it connects to the database via the mysqlclient library. But you can also choose the mysql-connector-python library which lets you connect through ssl without any further ssl parameters required. :return: a mysql connection object """conn=self.connectionorself.get_connection(self.get_conn_id())client_name=conn.extra_dejson.get("client","mysqlclient")ifclient_name=="mysqlclient":try:importMySQLdbexceptImportError:raiseRuntimeError("You do not have `mysqlclient` package installed. ""Please install it with `pip install mysqlclient` and make sure you have system ""mysql libraries installed, as well as well as `pkg-config` system package ""installed in case you see compilation error during installation.")conn_config=self._get_conn_config_mysql_client(conn)returnMySQLdb.connect(**conn_config)ifclient_name=="mysql-connector-python":try:importmysql.connectorexceptModuleNotFoundError:raiseAirflowOptionalProviderFeatureException("The pip package 'mysql-connector-python' is not installed, therefore the connection ""wasn't established. Please, consider using default driver or pip install the package ""'mysql-connector-python'. Warning! It might cause dependency conflicts.")conn_config=self._get_conn_config_mysql_connector_python(conn)returnmysql.connector.connect(**conn_config)raiseValueError("Unknown MySQL client name provided!")
[docs]defbulk_load(self,table:str,tmp_file:str)->None:"""Load a tab-delimited file into a database table."""conn=self.get_conn()cur=conn.cursor()cur.execute(f"LOAD DATA LOCAL INFILE %s INTO TABLE {table}",(tmp_file,),)conn.commit()conn.close()# type: ignore[misc]
[docs]defbulk_dump(self,table:str,tmp_file:str)->None:"""Dump a database table into a tab-delimited file."""conn=self.get_conn()cur=conn.cursor()cur.execute(f"SELECT * INTO OUTFILE %s FROM {table}",(tmp_file,),)conn.commit()conn.close()# type: ignore[misc]
@staticmethoddef_serialize_cell(cell:object,conn:Connection|None=None)->Any:""" Convert argument to a literal. The package MySQLdb converts an argument to a literal when passing those separately to execute. Hence, this method does nothing. :param cell: The cell to insert into the table :param conn: The database connection :return: The same cell """returncell
[docs]defget_iam_token(self,conn:Connection)->tuple[str,int]:""" Retrieve a temporary password to connect to MySQL. Uses AWSHook to retrieve a temporary password to connect to MySQL Port is required. If none is provided, default 3306 is used """fromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHookaws_conn_id=conn.extra_dejson.get("aws_conn_id","aws_default")aws_hook=AwsBaseHook(aws_conn_id,client_type="rds")ifconn.portisNone:port=3306else:port=conn.portclient=aws_hook.get_conn()token=client.generate_db_auth_token(conn.host,port,conn.login)returntoken,port
[docs]defbulk_load_custom(self,table:str,tmp_file:str,duplicate_key_handling:str="IGNORE",extra_options:str="")->None:""" Load local data from a file into the database in a more configurable way. .. warning:: According to the mysql docs using this function is a `security risk <https://dev.mysql.com/doc/refman/8.0/en/load-data-local.html>`_. If you want to use it anyway you can do so by setting a client-side + server-side option. This depends on the mysql client library used. :param table: The table were the file will be loaded into. :param tmp_file: The file (name) that contains the data. :param duplicate_key_handling: Specify what should happen to duplicate data. You can choose either `IGNORE` or `REPLACE`. .. seealso:: https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-duplicate-key-handling :param extra_options: More sql options to specify exactly how to load the data. .. seealso:: https://dev.mysql.com/doc/refman/8.0/en/load-data.html """conn=self.get_conn()cursor=conn.cursor()cursor.execute(f"LOAD DATA LOCAL INFILE %s %s INTO TABLE {table} %s",(tmp_file,duplicate_key_handling,extra_options),)cursor.close()conn.commit()conn.close()# type: ignore[misc]
[docs]defget_openlineage_database_info(self,connection):"""Return MySQL specific information for OpenLineage."""fromairflow.providers.openlineage.sqlparserimportDatabaseInforeturnDatabaseInfo(scheme=self.get_openlineage_database_dialect(connection),authority=DbApiHook.get_openlineage_authority_part(connection,default_port=3306),information_schema_columns=["table_schema","table_name","column_name","ordinal_position","column_type",],normalize_name_method=lambdaname:name.upper(),)