Source code for airflow.providers.dingding.hooks.dingding
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportrequestsfromrequestsimportSessionfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.http.hooks.httpimportHttpHook
[docs]classDingdingHook(HttpHook):""" This hook allows you send Dingding message using Dingding custom bot. Get Dingding token from conn_id.password. And prefer set domain to conn_id.host, if not will use default ``https://oapi.dingtalk.com``. For more detail message in `Dingding custom bot <https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq>`_ :param dingding_conn_id: The name of the Dingding connection to use :param message_type: Message type you want to send to Dingding, support five type so far including text, link, markdown, actionCard, feedCard :param message: The message send to Dingding chat group :param at_mobiles: Remind specific users with this message :param at_all: Remind all people in group or not. If True, will overwrite ``at_mobiles`` """
def__init__(self,dingding_conn_id='dingding_default',message_type:str='text',message:str|dict|None=None,at_mobiles:list[str]|None=None,at_all:bool=False,*args,**kwargs,)->None:super().__init__(http_conn_id=dingding_conn_id,*args,**kwargs)# type: ignore[misc]self.message_type=message_typeself.message=messageself.at_mobiles=at_mobilesself.at_all=at_alldef_get_endpoint(self)->str:"""Get Dingding endpoint for sending message."""conn=self.get_connection(self.http_conn_id)token=conn.passwordifnottoken:raiseAirflowException('Dingding token is requests but get nothing, check you conn_id configuration.')returnf'robot/send?access_token={token}'def_build_message(self)->str:""" Build different type of Dingding message As most commonly used type, text message just need post message content rather than a dict like ``{'content': 'message'}`` """ifself.message_typein['text','markdown']:data={'msgtype':self.message_type,self.message_type:{'content':self.message}ifself.message_type=='text'elseself.message,'at':{'atMobiles':self.at_mobiles,'isAtAll':self.at_all},}else:data={'msgtype':self.message_type,self.message_type:self.message}returnjson.dumps(data)
[docs]defget_conn(self,headers:dict|None=None)->Session:""" Overwrite HttpHook get_conn because just need base_url and headers and not don't need generic params :param headers: additional headers to be passed through as a dictionary """conn=self.get_connection(self.http_conn_id)self.base_url=conn.hostifconn.hostelse'https://oapi.dingtalk.com'session=requests.Session()ifheaders:session.headers.update(headers)returnsession
[docs]defsend(self)->None:"""Send Dingding message"""support_type=['text','link','markdown','actionCard','feedCard']ifself.message_typenotinsupport_type:raiseValueError(f'DingdingWebhookHook only support {support_type} so far, but receive {self.message_type}')data=self._build_message()self.log.info('Sending Dingding type %s message %s',self.message_type,data)resp=self.run(endpoint=self._get_endpoint(),data=data,headers={'Content-Type':'application/json'})# Dingding success send message will with errcode equal to 0ifint(resp.json().get('errcode'))!=0:raiseAirflowException(f'Send Dingding message failed, receive error message {resp.text}')self.log.info('Success Send Dingding message')