Source code for airflow.providers.databricks.hooks.databricks_sql
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importrefromcontextlibimportclosingfromcopyimportcopyfromtypingimportAny,Dict,List,Optional,Unionfromdatabricksimportsql# type: ignore[attr-defined]fromdatabricks.sql.clientimportConnection# type: ignore[attr-defined]fromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.dbapiimportDbApiHookfromairflow.providers.databricks.hooks.databricks_baseimportBaseDatabricksHook
[docs]classDatabricksSqlHook(BaseDatabricksHook,DbApiHook):""" Interact with Databricks SQL. :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`. :param http_path: Optional string specifying HTTP path of Databricks SQL Endpoint or cluster. If not specified, it should be either specified in the Databricks connection's extra parameters, or ``sql_endpoint_name`` must be specified. :param sql_endpoint_name: Optional name of Databricks SQL Endpoint. If not specified, ``http_path`` must be provided as described above. :param session_configuration: An optional dictionary of Spark session parameters. Defaults to None. If not specified, it could be specified in the Databricks connection's extra parameters. """
def__init__(self,databricks_conn_id:str=BaseDatabricksHook.default_conn_name,http_path:Optional[str]=None,sql_endpoint_name:Optional[str]=None,session_configuration:Optional[Dict[str,str]]=None,)->None:super().__init__(databricks_conn_id)self._sql_conn=Noneself._token:Optional[str]=Noneself._http_path=http_pathself._sql_endpoint_name=sql_endpoint_nameself.supports_autocommit=Trueself.session_config=session_configurationdef_get_extra_config(self)->Dict[str,Optional[Any]]:extra_params=copy(self.databricks_conn.extra_dejson)forargin['http_path','session_configuration']+self.extra_parameters:ifarginextra_params:delextra_params[arg]returnextra_paramsdef_get_sql_endpoint_by_name(self,endpoint_name)->Dict[str,Any]:result=self._do_api_call(LIST_SQL_ENDPOINTS_ENDPOINT)if'endpoints'notinresult:raiseAirflowException("Can't list Databricks SQL endpoints")lst=[endpointforendpointinresult['endpoints']ifendpoint['name']==endpoint_name]iflen(lst)==0:raiseAirflowException(f"Can't f Databricks SQL endpoint with name '{endpoint_name}'")returnlst[0]
[docs]defget_conn(self)->Connection:"""Returns a Databricks SQL connection object"""ifnotself._http_path:ifself._sql_endpoint_name:endpoint=self._get_sql_endpoint_by_name(self._sql_endpoint_name)self._http_path=endpoint['odbc_params']['path']elif'http_path'inself.databricks_conn.extra_dejson:self._http_path=self.databricks_conn.extra_dejson['http_path']else:raiseAirflowException("http_path should be provided either explicitly, ""or in extra parameter of Databricks connection, ""or sql_endpoint_name should be specified")requires_init=Trueifnotself._token:self._token=self._get_token(raise_error=True)else:new_token=self._get_token(raise_error=True)ifnew_token!=self._token:self._token=new_tokenelse:requires_init=Falseifnotself.session_config:self.session_config=self.databricks_conn.extra_dejson.get('session_configuration')ifnotself._sql_connorrequires_init:ifself._sql_conn:# close already existing connectionself._sql_conn.close()self._sql_conn=sql.connect(self.host,self._http_path,self._token,session_configuration=self.session_config,**self._get_extra_config(),)returnself._sql_conn
@staticmethod
[docs]defmaybe_split_sql_string(sql:str)->List[str]:""" Splits strings consisting of multiple SQL expressions into an TODO: do we need something more sophisticated? :param sql: SQL string potentially consisting of multiple expressions :return: list of individual expressions """splits=[s.strip()forsinre.split(";\\s*\r?\n",sql)ifs.strip()!=""]returnsplits
[docs]defrun(self,sql:Union[str,List[str]],autocommit=True,parameters=None,handler=None):""" Runs a command or a list of commands. Pass a list of sql statements to the sql parameter to get them to execute sequentially :param sql: the sql statement to be executed (str) or a list of sql statements to execute :param autocommit: What to set the connection's autocommit setting to before executing the query. :param parameters: The parameters to render the SQL query with. :param handler: The result handler which is called with the result of each statement. :return: query results. """ifisinstance(sql,str):sql=self.maybe_split_sql_string(sql)self.log.debug("Executing %d statements",len(sql))conn=Noneforsql_statementinsql:# when using AAD tokens, it could expire if previous query run longer than token lifetimeconn=self.get_conn()withclosing(conn.cursor())ascur:self.log.info("Executing statement: '%s', parameters: '%s'",sql_statement,parameters)ifparameters:cur.execute(sql_statement,parameters)else:cur.execute(sql_statement)schema=cur.descriptionresults=[]ifhandlerisnotNone:cur=handler(cur)forrowincur:self.log.debug("Statement results: %s",row)results.append(row)self.log.info("Rows affected: %s",cur.rowcount)ifconn:conn.close()self._sql_conn=None# Return only result of the last SQL expressionreturnschema,results
[docs]deftest_connection(self):"""Test the Databricks SQL connection by running a simple query."""try:self.run(sql="select 42")exceptExceptionase:returnFalse,str(e)returnTrue,"Connection successfully checked"