Source code for airflow.providers.apache.druid.hooks.druid
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromtypingimportAny,Iterableimportrequestsfrompydruid.dbimportconnectfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.providers.common.sql.hooks.sqlimportDbApiHook
[docs]classDruidHook(BaseHook):""" Connection to Druid overlord for ingestion To connect to a Druid cluster that is secured with the druid-basic-security extension, add the username and password to the druid ingestion connection. :param druid_ingest_conn_id: The connection id to the Druid overlord machine which accepts index jobs :param timeout: The interval between polling the Druid job for the status of the ingestion job. Must be greater than or equal to 1 :param max_ingestion_time: The maximum ingestion time before assuming the job failed """def__init__(self,druid_ingest_conn_id:str='druid_ingest_default',timeout:int=1,max_ingestion_time:int|None=None,)->None:super().__init__()self.druid_ingest_conn_id=druid_ingest_conn_idself.timeout=timeoutself.max_ingestion_time=max_ingestion_timeself.header={'content-type':'application/json'}ifself.timeout<1:raiseValueError("Druid timeout should be equal or greater than 1")
[docs]defget_auth(self)->requests.auth.HTTPBasicAuth|None:""" Return username and password from connections tab as requests.auth.HTTPBasicAuth object. If these details have not been set then returns None. """conn=self.get_connection(self.druid_ingest_conn_id)user=conn.loginpassword=conn.passwordifuserisnotNoneandpasswordisnotNone:returnrequests.auth.HTTPBasicAuth(user,password)else:returnNone
[docs]defsubmit_indexing_job(self,json_index_spec:dict[str,Any]|str)->None:"""Submit Druid ingestion job"""url=self.get_conn_url()self.log.info("Druid ingestion spec: %s",json_index_spec)req_index=requests.post(url,data=json_index_spec,headers=self.header,auth=self.get_auth())ifreq_index.status_code!=200:raiseAirflowException(f'Did not get 200 when submitting the Druid job to {url}')req_json=req_index.json()# Wait until the job is completeddruid_task_id=req_json['task']self.log.info("Druid indexing task-id: %s",druid_task_id)running=Truesec=0whilerunning:req_status=requests.get(f"{url}/{druid_task_id}/status",auth=self.get_auth())self.log.info("Job still running for %s seconds...",sec)ifself.max_ingestion_timeandsec>self.max_ingestion_time:# ensure that the job gets killed if the max ingestion time is exceededrequests.post(f"{url}/{druid_task_id}/shutdown",auth=self.get_auth())raiseAirflowException(f'Druid ingestion took more than {self.max_ingestion_time} seconds')time.sleep(self.timeout)sec+=self.timeoutstatus=req_status.json()['status']['status']ifstatus=='RUNNING':running=Trueelifstatus=='SUCCESS':running=False# Great success!elifstatus=='FAILED':raiseAirflowException('Druid indexing job failed, check console for more info')else:raiseAirflowException(f'Could not get status of the job, got {status}')self.log.info('Successful index')
[docs]classDruidDbApiHook(DbApiHook):""" Interact with Druid broker This hook is purely for users to query druid broker. For ingestion, please use druidHook. """
[docs]defget_conn(self)->connect:"""Establish a connection to druid broker."""conn=self.get_connection(getattr(self,self.conn_name_attr))druid_broker_conn=connect(host=conn.host,port=conn.port,path=conn.extra_dejson.get('endpoint','/druid/v2/sql'),scheme=conn.extra_dejson.get('schema','http'),user=conn.login,password=conn.password,)self.log.info('Get the connection to druid broker on %s using user %s',conn.host,conn.login)returndruid_broker_conn
[docs]defget_uri(self)->str:""" Get the connection uri for druid broker. e.g: druid://localhost:8082/druid/v2/sql/ """conn=self.get_connection(getattr(self,self.conn_name_attr))host=conn.hostifconn.portisnotNone:host+=f':{conn.port}'conn_type=conn.conn_typeor'druid'endpoint=conn.extra_dejson.get('endpoint','druid/v2/sql')returnf'{conn_type}://{host}/{endpoint}'