Source code for airflow.hooks.mysql_hook

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import MySQLdb
import MySQLdb.cursors
import json
import six

from airflow.hooks.dbapi_hook import DbApiHook


[docs]class MySqlHook(DbApiHook): """ Interact with MySQL. You can specify charset in the extra field of your connection as ``{"charset": "utf8"}``. Also you can choose cursor as ``{"cursor": "SSCursor"}``. Refer to the MySQLdb.cursors for more details. Note: For AWS IAM authentication, use iam in the extra connection parameters and set it to true. Leave the password field empty. This will use the the "aws_default" connection to get the temporary token unless you override in extras. extras example: ``{"iam":true, "aws_conn_id":"my_aws_conn"}`` """
[docs] conn_name_attr = 'mysql_conn_id'
[docs] default_conn_name = 'mysql_default'
[docs] supports_autocommit = True
def __init__(self, *args, **kwargs): super(MySqlHook, self).__init__(*args, **kwargs) self.schema = kwargs.pop("schema", None)
[docs] def set_autocommit(self, conn, autocommit): """ MySql connection sets autocommit in a different way. """ conn.autocommit(autocommit)
[docs] def get_autocommit(self, conn): """ MySql connection gets autocommit in a different way. :param conn: connection to get autocommit setting from. :type conn: connection object. :return: connection autocommit setting :rtype: bool """ return conn.get_autocommit()
[docs] def get_conn(self): """ Returns a mysql connection object """ conn = self.get_connection(self.mysql_conn_id) conn_config = { "user": conn.login, "passwd": conn.password or '', "host": conn.host or 'localhost', "db": self.schema or conn.schema or '' } # check for authentication via AWS IAM if conn.extra_dejson.get('iam', False): conn_config['passwd'], conn.port = self.get_iam_token(conn) conn_config["read_default_group"] = 'enable-cleartext-plugin' if not conn.port: conn_config["port"] = 3306 else: conn_config["port"] = int(conn.port) if conn.extra_dejson.get('charset', False): conn_config["charset"] = conn.extra_dejson["charset"] if (conn_config["charset"]).lower() == 'utf8' or\ (conn_config["charset"]).lower() == 'utf-8': conn_config["use_unicode"] = True if conn.extra_dejson.get('cursor', False): if (conn.extra_dejson["cursor"]).lower() == 'sscursor': conn_config["cursorclass"] = MySQLdb.cursors.SSCursor elif (conn.extra_dejson["cursor"]).lower() == 'dictcursor': conn_config["cursorclass"] = MySQLdb.cursors.DictCursor elif (conn.extra_dejson["cursor"]).lower() == 'ssdictcursor': conn_config["cursorclass"] = MySQLdb.cursors.SSDictCursor local_infile = conn.extra_dejson.get('local_infile', False) if conn.extra_dejson.get('ssl', False): # SSL parameter for MySQL has to be a dictionary and in case # of extra/dejson we can get string if extra is passed via # URL parameters dejson_ssl = conn.extra_dejson['ssl'] if isinstance(dejson_ssl, six.string_types): dejson_ssl = json.loads(dejson_ssl) conn_config['ssl'] = dejson_ssl if conn.extra_dejson.get('unix_socket'): conn_config['unix_socket'] = conn.extra_dejson['unix_socket'] if local_infile: conn_config["local_infile"] = 1 conn = MySQLdb.connect(**conn_config) return conn
[docs] def get_uri(self): conn = self.get_connection(getattr(self, self.conn_name_attr)) uri = super(MySqlHook, self).get_uri() if conn.extra_dejson.get('charset', False): charset = conn.extra_dejson["charset"] return "{uri}?charset={charset}".format(uri=uri, charset=charset) return uri
[docs] def bulk_load(self, table, tmp_file): """ Loads a tab-delimited file into a database table """ conn = self.get_conn() cur = conn.cursor() cur.execute(""" LOAD DATA LOCAL INFILE '{tmp_file}' INTO TABLE {table} """.format(tmp_file=tmp_file, table=table)) conn.commit()
[docs] def bulk_dump(self, table, tmp_file): """ Dumps a database table into a tab-delimited file """ conn = self.get_conn() cur = conn.cursor() cur.execute(""" SELECT * INTO OUTFILE '{tmp_file}' FROM {table} """.format(tmp_file=tmp_file, table=table)) conn.commit()
@staticmethod
[docs] def _serialize_cell(cell, conn): """ MySQLdb converts an argument to a literal when passing those separately to execute. Hence, this method does nothing. :param cell: The cell to insert into the table :type cell: object :param conn: The database connection :type conn: connection object :return: The same cell :rtype: object """ return cell
[docs] def get_iam_token(self, conn): """ Uses AWSHook to retrieve a temporary password to connect to MySQL Port is required. If none is provided, default 3306 is used """ from airflow.contrib.hooks.aws_hook import AwsHook aws_conn_id = conn.extra_dejson.get('aws_conn_id', 'aws_default') aws_hook = AwsHook(aws_conn_id) if conn.port is None: port = 3306 else: port = conn.port client = aws_hook.get_client_type('rds') token = client.generate_db_auth_token(conn.host, port, conn.login) return token, port
[docs] def bulk_load_custom(self, table, tmp_file, duplicate_key_handling='IGNORE', extra_options=''): """ A more configurable way to load local data from a file into the database. .. warning:: According to the mysql docs using this function is a `security risk <https://dev.mysql.com/doc/refman/8.0/en/load-data-local.html>`_. If you want to use it anyway you can do so by setting a client-side + server-side option. This depends on the mysql client library used. :param table: The table were the file will be loaded into. :type table: str :param tmp_file: The file (name) that contains the data. :type tmp_file: str :param duplicate_key_handling: Specify what should happen to duplicate data. You can choose either `IGNORE` or `REPLACE`. .. seealso:: https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-duplicate-key-handling :type duplicate_key_handling: str :param extra_options: More sql options to specify exactly how to load the data. .. seealso:: https://dev.mysql.com/doc/refman/8.0/en/load-data.html :type extra_options: str """ conn = self.get_conn() cursor = conn.cursor() cursor.execute(""" LOAD DATA LOCAL INFILE '{tmp_file}' {duplicate_key_handling} INTO TABLE {table} {extra_options} """.format( tmp_file=tmp_file, table=table, duplicate_key_handling=duplicate_key_handling, extra_options=extra_options )) cursor.close() conn.commit()

Was this entry helpful?