Source code for airflow.providers.apache.druid.hooks.druid
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromenumimportEnumfromtypingimportAny,Iterableimportrequestsfrompydruid.dbimportconnectfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.providers.common.sql.hooks.sqlimportDbApiHook
[docs]classIngestionType(Enum):""" Druid Ingestion Type. Could be Native batch ingestion or SQL-based ingestion. https://druid.apache.org/docs/latest/ingestion/index.html """
[docs]classDruidHook(BaseHook):""" Connection to Druid overlord for ingestion. To connect to a Druid cluster that is secured with the druid-basic-security extension, add the username and password to the druid ingestion connection. :param druid_ingest_conn_id: The connection id to the Druid overlord machine which accepts index jobs :param timeout: The interval between polling the Druid job for the status of the ingestion job. Must be greater than or equal to 1 :param max_ingestion_time: The maximum ingestion time before assuming the job failed """def__init__(self,druid_ingest_conn_id:str="druid_ingest_default",timeout:int=1,max_ingestion_time:int|None=None,)->None:super().__init__()self.druid_ingest_conn_id=druid_ingest_conn_idself.timeout=timeoutself.max_ingestion_time=max_ingestion_timeself.header={"content-type":"application/json"}ifself.timeout<1:raiseValueError("Druid timeout should be equal or greater than 1")
[docs]defget_auth(self)->requests.auth.HTTPBasicAuth|None:""" Return username and password from connections tab as requests.auth.HTTPBasicAuth object. If these details have not been set then returns None. """conn=self.get_connection(self.druid_ingest_conn_id)user=conn.loginpassword=conn.passwordifuserisnotNoneandpasswordisnotNone:returnrequests.auth.HTTPBasicAuth(user,password)else:returnNone
[docs]defsubmit_indexing_job(self,json_index_spec:dict[str,Any]|str,ingestion_type:IngestionType=IngestionType.BATCH)->None:"""Submit Druid ingestion job."""url=self.get_conn_url(ingestion_type)self.log.info("Druid ingestion spec: %s",json_index_spec)req_index=requests.post(url,data=json_index_spec,headers=self.header,auth=self.get_auth())code=req_index.status_codenot_accepted=not(200<=code<300)ifnot_accepted:self.log.error("Error submitting the Druid job to %s (%s) %s",url,code,req_index.content)raiseAirflowException(f"Did not get 200 or 202 when submitting the Druid job to {url}")req_json=req_index.json()# Wait until the job is completedifingestion_type==IngestionType.BATCH:druid_task_id=req_json["task"]else:druid_task_id=req_json["taskId"]druid_task_status_url=f"{self.get_conn_url()}/{druid_task_id}/status"self.log.info("Druid indexing task-id: %s",druid_task_id)running=Truesec=0whilerunning:req_status=requests.get(druid_task_status_url,auth=self.get_auth())self.log.info("Job still running for %s seconds...",sec)ifself.max_ingestion_timeandsec>self.max_ingestion_time:# ensure that the job gets killed if the max ingestion time is exceededrequests.post(f"{url}/{druid_task_id}/shutdown",auth=self.get_auth())raiseAirflowException(f"Druid ingestion took more than {self.max_ingestion_time} seconds")time.sleep(self.timeout)sec+=self.timeoutstatus=req_status.json()["status"]["status"]ifstatus=="RUNNING":running=Trueelifstatus=="SUCCESS":running=False# Great success!elifstatus=="FAILED":raiseAirflowException("Druid indexing job failed, check console for more info")else:raiseAirflowException(f"Could not get status of the job, got {status}")self.log.info("Successful index")
[docs]classDruidDbApiHook(DbApiHook):""" Interact with Druid broker. This hook is purely for users to query druid broker. For ingestion, please use druidHook. :param context: Optional query context parameters to pass to the SQL endpoint. Example: ``{"sqlFinalizeOuterSketches": True}`` See: https://druid.apache.org/docs/latest/querying/sql-query-context/ """
[docs]defget_conn(self)->connect:"""Establish a connection to druid broker."""conn=self.get_connection(getattr(self,self.conn_name_attr))druid_broker_conn=connect(host=conn.host,port=conn.port,path=conn.extra_dejson.get("endpoint","/druid/v2/sql"),scheme=conn.extra_dejson.get("schema","http"),user=conn.login,password=conn.password,context=self.context,)self.log.info("Get the connection to druid broker on %s using user %s",conn.host,conn.login)returndruid_broker_conn
[docs]defget_uri(self)->str:""" Get the connection uri for druid broker. e.g: druid://localhost:8082/druid/v2/sql/ """conn=self.get_connection(getattr(self,self.conn_name_attr))host=conn.hostifconn.portisnotNone:host+=f":{conn.port}"conn_type=conn.conn_typeor"druid"endpoint=conn.extra_dejson.get("endpoint","druid/v2/sql")returnf"{conn_type}://{host}/{endpoint}"