## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Search in emails for a specific attachment and also to download it.It uses the smtplib library that is already integrated in python 3."""from__future__importannotationsimportcollections.abcimportosimportreimportsmtplibimportsslfromemail.mime.applicationimportMIMEApplicationfromemail.mime.multipartimportMIMEMultipartfromemail.mime.textimportMIMETextfromemail.utilsimportformatdatefromtypingimportTYPE_CHECKING,Any,Iterablefromairflow.exceptionsimportAirflowException,AirflowNotFoundExceptionfromairflow.hooks.baseimportBaseHookifTYPE_CHECKING:fromairflow.models.connectionimportConnection
[docs]classSmtpHook(BaseHook):""" This hook connects to a mail server by using the smtp protocol. .. note:: Please call this Hook as context manager via `with` to automatically open and close the connection to the mail server. :param smtp_conn_id: The :ref:`smtp connection id <howto/connection:smtp>` that contains the information used to authenticate the client. """
[docs]defget_conn(self)->SmtpHook:""" Login to the smtp server. .. note:: Please call this Hook as context manager via `with` to automatically open and close the connection to the smtp server. :return: an authorized SmtpHook object. """ifnotself.smtp_client:try:self.smtp_connection=self.get_connection(self.smtp_conn_id)exceptAirflowNotFoundException:raiseAirflowException("SMTP connection is not found.")forattemptinrange(1,self.smtp_retry_limit+1):try:self.smtp_client=self._build_client()exceptsmtplib.SMTPServerDisconnected:ifattempt==self.smtp_retry_limit:raiseAirflowException("Unable to connect to smtp server")else:ifself.smtp_starttls:self.smtp_client.starttls()ifself.smtp_userandself.smtp_password:self.smtp_client.login(self.smtp_user,self.smtp_password)breakreturnself
def_build_client(self)->smtplib.SMTP_SSL|smtplib.SMTP:SMTP:type[smtplib.SMTP_SSL]|type[smtplib.SMTP]ifself.use_ssl:SMTP=smtplib.SMTP_SSLelse:SMTP=smtplib.SMTPsmtp_kwargs:dict[str,Any]={"host":self.host}ifself.port:smtp_kwargs["port"]=self.portsmtp_kwargs["timeout"]=self.timeoutifself.use_ssl:fromairflow.configurationimportconfextra_ssl_context=self.conn.extra_dejson.get("ssl_context",None)ifextra_ssl_context:ssl_context_string=extra_ssl_contextelse:ssl_context_string=conf.get("smtp_provider","SSL_CONTEXT",fallback=None)ifssl_context_stringisNone:ssl_context_string=conf.get("email","SSL_CONTEXT",fallback=None)ifssl_context_stringisNone:ssl_context_string="default"ifssl_context_string=="default":ssl_context=ssl.create_default_context()elifssl_context_string=="none":ssl_context=Noneelse:raiseRuntimeError(f"The email.ssl_context configuration variable must "f"be set to 'default' or 'none' and is '{ssl_context_string}'.")smtp_kwargs["context"]=ssl_contextreturnSMTP(**smtp_kwargs)@classmethod
[docs]defget_connection_form_widgets(cls)->dict[str,Any]:"""Return connection widgets to add to connection form."""fromflask_appbuilder.fieldwidgetsimportBS3TextFieldWidgetfromflask_babelimportlazy_gettextfromwtformsimportBooleanField,IntegerField,StringFieldfromwtforms.validatorsimportNumberRangereturn{"from_email":StringField(lazy_gettext("From email"),widget=BS3TextFieldWidget()),"timeout":IntegerField(lazy_gettext("Connection timeout"),validators=[NumberRange(min=0)],widget=BS3TextFieldWidget(),default=30,),"retry_limit":IntegerField(lazy_gettext("Number of Retries"),validators=[NumberRange(min=0)],widget=BS3TextFieldWidget(),default=5,),"disable_tls":BooleanField(lazy_gettext("Disable TLS"),default=False),"disable_ssl":BooleanField(lazy_gettext("Disable SSL"),default=False),}
[docs]deftest_connection(self)->tuple[bool,str]:"""Test SMTP connectivity from UI."""try:smtp_client=self.get_conn().smtp_clientifsmtp_client:status=smtp_client.noop()[0]ifstatus==250:returnTrue,"Connection successfully tested"exceptExceptionase:returnFalse,str(e)returnFalse,"Failed to establish connection"
[docs]defsend_email_smtp(self,*,to:str|Iterable[str],subject:str,html_content:str,from_email:str|None=None,files:list[str]|None=None,dryrun:bool=False,cc:str|Iterable[str]|None=None,bcc:str|Iterable[str]|None=None,mime_subtype:str="mixed",mime_charset:str="utf-8",custom_headers:dict[str,Any]|None=None,**kwargs,)->None:""" Send an email with html content. :param to: Recipient email address or list of addresses. :param subject: Email subject. :param html_content: Email body in HTML format. :param from_email: Sender email address. If it's None, the hook will check if there is an email provided in the connection, and raises an exception if not. :param files: List of file paths to attach to the email. :param dryrun: If True, the email will not be sent, but all other actions will be performed. :param cc: Carbon copy recipient email address or list of addresses. :param bcc: Blind carbon copy recipient email address or list of addresses. :param mime_subtype: MIME subtype of the email. :param mime_charset: MIME charset of the email. :param custom_headers: Dictionary of custom headers to include in the email. :param kwargs: Additional keyword arguments. >>> send_email_smtp( 'test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True ) """ifnotself.smtp_client:raiseAirflowException("The 'smtp_client' should be initialized before!")from_email=from_emailorself.from_emailifnotfrom_email:raiseAirflowException("You should provide `from_email` or define it in the connection.")mime_msg,recipients=self._build_mime_message(mail_from=from_email,to=to,subject=subject,html_content=html_content,files=files,cc=cc,bcc=bcc,mime_subtype=mime_subtype,mime_charset=mime_charset,custom_headers=custom_headers,)ifnotdryrun:forattemptinrange(1,self.smtp_retry_limit+1):try:self.smtp_client.sendmail(from_addr=from_email,to_addrs=recipients,msg=mime_msg.as_string())exceptsmtplib.SMTPServerDisconnectedase:ifattempt==self.smtp_retry_limit:raiseeelse:break
def_build_mime_message(self,mail_from:str|None,to:str|Iterable[str],subject:str,html_content:str,files:list[str]|None=None,cc:str|Iterable[str]|None=None,bcc:str|Iterable[str]|None=None,mime_subtype:str="mixed",mime_charset:str="utf-8",custom_headers:dict[str,Any]|None=None,)->tuple[MIMEMultipart,list[str]]:""" Build a MIME message that can be used to send an email and returns a full list of recipients. :param mail_from: Email address to set as the email's "From" field. :param to: A string or iterable of strings containing email addresses to set as the email's "To" field. :param subject: The subject of the email. :param html_content: The content of the email in HTML format. :param files: A list of paths to files to be attached to the email. :param cc: A string or iterable of strings containing email addresses to set as the email's "CC" field. :param bcc: A string or iterable of strings containing email addresses to set as the email's "BCC" field. :param mime_subtype: The subtype of the MIME message. Default: "mixed". :param mime_charset: The charset of the email. Default: "utf-8". :param custom_headers: Additional headers to add to the MIME message. No validations are run on these values, and they should be able to be encoded. :return: A tuple containing the email as a MIMEMultipart object and a list of recipient email addresses. """to=self._get_email_address_list(to)msg=MIMEMultipart(mime_subtype)msg["Subject"]=subjectifmail_from:msg["From"]=mail_frommsg["To"]=", ".join(to)recipients=toifcc:cc=self._get_email_address_list(cc)msg["CC"]=", ".join(cc)recipients+=ccifbcc:# don't add bcc in headerbcc=self._get_email_address_list(bcc)recipients+=bccmsg["Date"]=formatdate(localtime=True)mime_text=MIMEText(html_content,"html",mime_charset)msg.attach(mime_text)forfnameinfilesor[]:basename=os.path.basename(fname)withopen(fname,"rb")asfile:part=MIMEApplication(file.read(),Name=basename)part["Content-Disposition"]=f'attachment; filename="{basename}"'part["Content-ID"]=f"<{basename}>"msg.attach(part)ifcustom_headers:forheader_key,header_valueincustom_headers.items():msg[header_key]=header_valuereturnmsg,recipientsdef_get_email_address_list(self,addresses:str|Iterable[str])->list[str]:""" Return a list of email addresses from the provided input. :param addresses: A string or iterable of strings containing email addresses. :return: A list of email addresses. :raises TypeError: If the input is not a string or iterable of strings. """ifisinstance(addresses,str):returnself._get_email_list_from_str(addresses)elifisinstance(addresses,collections.abc.Iterable):ifnotall(isinstance(item,str)foriteminaddresses):raiseTypeError("The items in your iterable must be strings.")returnlist(addresses)else:raiseTypeError(f"Unexpected argument type: Received '{type(addresses).__name__}'.")def_get_email_list_from_str(self,addresses:str)->list[str]:""" Extract a list of email addresses from a string. The string can contain multiple email addresses separated by any of the following delimiters: ',' or ';'. :param addresses: A string containing one or more email addresses. :return: A list of email addresses. """pattern=r"\s*[,;]\s*"returnre.split(pattern,addresses)@property
[docs]defconn(self)->Connection:ifnotself.smtp_connection:raiseAirflowException("The smtp connection should be loaded before!")returnself.smtp_connection