Source code for airflow.providers.dingding.hooks.dingding
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportrequestsfromrequestsimportSessionfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.http.hooks.httpimportHttpHook
[docs]classDingdingHook(HttpHook):""" Send message using a DingTalk Custom Robot API. .. seealso:: `How to get webhook token <https://open.dingtalk.com/document/robots/custom-robot-access>`__ :param dingding_conn_id: Dingding connection id that has access token in the password field, and optional host name in host field, if host not set than default ``https://oapi.dingtalk.com`` will use. :param message_type: Message type you want to send to Dingding, support five type so far including ``text``, ``link``, ``markdown``, ``actionCard``, ``feedCard``. :param message: The message send to chat group :param at_mobiles: Remind specific users with this message :param at_all: Remind all people in group or not. If True, will overwrite ``at_mobiles`` """
def__init__(self,dingding_conn_id="dingding_default",message_type:str="text",message:str|dict|None=None,at_mobiles:list[str]|None=None,at_all:bool=False,*args,**kwargs,)->None:super().__init__(http_conn_id=dingding_conn_id,*args,**kwargs)# type: ignore[misc]self.message_type=message_typeself.message=messageself.at_mobiles=at_mobilesself.at_all=at_alldef_get_endpoint(self)->str:"""Get DingTalk Custom Robot endpoint for sending message."""conn=self.get_connection(self.http_conn_id)token=conn.passwordifnottoken:raiseAirflowException("Dingding token is requests but get nothing, check you conn_id configuration.")returnf"robot/send?access_token={token}"def_build_message(self)->str:"""Build different type of DingTalk custom robot messages."""ifself.message_typein["text","markdown"]:data={"msgtype":self.message_type,self.message_type:{"content":self.message}ifself.message_type=="text"elseself.message,"at":{"atMobiles":self.at_mobiles,"isAtAll":self.at_all},}else:data={"msgtype":self.message_type,self.message_type:self.message}returnjson.dumps(data)
[docs]defget_conn(self,headers:dict|None=None)->Session:""" Overwrite HttpHook get_conn. We just need base_url and headers, and not don't need generic params. :param headers: additional headers to be passed through as a dictionary """conn=self.get_connection(self.http_conn_id)self.base_url=conn.hostifconn.hostelse"https://oapi.dingtalk.com"session=requests.Session()ifheaders:session.headers.update(headers)returnsession
[docs]defsend(self)->None:"""Send DingTalk Custom Robot message."""support_type=["text","link","markdown","actionCard","feedCard"]ifself.message_typenotinsupport_type:raiseValueError(f"DingdingWebhookHook only support {support_type} so far, but receive {self.message_type}")data=self._build_message()self.log.info("Sending Dingding type %s message %s",self.message_type,data)resp=self.run(endpoint=self._get_endpoint(),data=data,headers={"Content-Type":"application/json"})# Success send message will return errcode = 0ifint(resp.json().get("errcode"))!=0:raiseAirflowException(f"Send Dingding message failed, receive error message {resp.text}")self.log.info("Success Send Dingding message")