Source code for airflow.providers.qubole.hooks.qubole_check
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportloggingfromioimportStringIOfromqds_sdk.commandsimportCommandfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.common.sql.hooks.sqlimportDbApiHookfromairflow.providers.qubole.hooks.quboleimportQuboleHook
[docs]defisint(value)->bool:"""Whether Qubole column are integer"""try:int(value)returnTrueexceptValueError:returnFalse
[docs]defisfloat(value)->bool:"""Whether Qubole column are float"""try:float(value)returnTrueexceptValueError:returnFalse
[docs]defisbool(value)->bool:"""Whether Qubole column are boolean"""try:returnvalue.lower()in["true","false"]exceptValueError:returnFalse
[docs]defparse_first_row(row_list)->list[bool|float|int|str]:"""Parse Qubole first record list"""record_list=[]first_row=row_list[0]ifrow_listelse""forcol_valueinfirst_row.split(COL_DELIM):ifisint(col_value):col_value=int(col_value)elifisfloat(col_value):col_value=float(col_value)elifisbool(col_value):col_value=col_value.lower()=="true"record_list.append(col_value)returnrecord_list
[docs]classQuboleCheckHook(QuboleHook,DbApiHook):"""Qubole check hook"""def__init__(self,context,*args,**kwargs)->None:super().__init__(*args,**kwargs)self.results_parser_callable=parse_first_rowif'results_parser_callable'inkwargsandkwargs['results_parser_callable']isnotNone:ifnotcallable(kwargs['results_parser_callable']):raiseAirflowException('`results_parser_callable` param must be callable')self.results_parser_callable=kwargs['results_parser_callable']self.context=context@staticmethod
[docs]defhandle_failure_retry(context)->None:ti=context['ti']cmd_id=ti.xcom_pull(key='qbol_cmd_id',task_ids=ti.task_id)ifcmd_idisnotNone:cmd=Command.find(cmd_id)ifcmdisnotNone:ifcmd.status=='running':log.info('Cancelling the Qubole Command Id: %s',cmd_id)cmd.cancel()
[docs]defget_first(self,sql):"""Get Qubole query first record list"""self.execute(context=self.context)query_result=self.get_query_results()row_list=list(filter(None,query_result.split(ROW_DELIM)))record_list=self.results_parser_callable(row_list)returnrecord_list
[docs]defget_query_results(self)->str|None:"""Get Qubole query result"""ifself.cmdisnotNone:cmd_id=self.cmd.idself.log.info("command id: %d",cmd_id)query_result_buffer=StringIO()self.cmd.get_results(fp=query_result_buffer,inline=True,delim=COL_DELIM,arguments=['true'])query_result=query_result_buffer.getvalue()query_result_buffer.close()returnquery_resultelse:self.log.error("Qubole command not found")returnNone