# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import email
import imaplib
import os
import re
from airflow import LoggingMixin
from airflow.hooks.base_hook import BaseHook
[docs]class ImapHook(BaseHook):
"""
This hook connects to a mail server by using the imap protocol.
:param imap_conn_id: The connection id that contains the information
used to authenticate the client.
The default value is 'imap_default'.
:type imap_conn_id: str
"""
def __init__(self, imap_conn_id='imap_default'):
super(ImapHook, self).__init__(imap_conn_id)
self.conn = self.get_connection(imap_conn_id)
self.mail_client = imaplib.IMAP4_SSL(self.conn.host)
def __enter__(self):
self.mail_client.login(self.conn.login, self.conn.password)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.mail_client.logout()
[docs] def has_mail_attachment(self, name, mail_folder='INBOX', check_regex=False):
"""
Checks the mail folder for mails containing attachments with the given name.
:param name: The name of the attachment that will be searched for.
:type name: str
:param mail_folder: The mail folder where to look at.
The default value is 'INBOX'.
:type mail_folder: str
:param check_regex: Checks the name for a regular expression.
The default value is False.
:type check_regex: bool
:returns: True if there is an attachment with the given name and False if not.
:rtype: bool
"""
mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder,
check_regex,
latest_only=True)
return len(mail_attachments) > 0
[docs] def retrieve_mail_attachments(self, name, mail_folder='INBOX', check_regex=False,
latest_only=False):
"""
Retrieves mail's attachments in the mail folder by its name.
:param name: The name of the attachment that will be downloaded.
:type name: str
:param mail_folder: The mail folder where to look at.
The default value is 'INBOX'.
:type mail_folder: str
:param check_regex: Checks the name for a regular expression.
The default value is False.
:type check_regex: bool
:param latest_only: If set to True it will only retrieve
the first matched attachment.
The default value is False.
:type latest_only: bool
:returns: a list of tuple each containing the attachment filename and its payload.
:rtype: a list of tuple
"""
mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder,
check_regex,
latest_only)
return mail_attachments
[docs] def download_mail_attachments(self, name, local_output_directory, mail_folder='INBOX',
check_regex=False, latest_only=False):
"""
Downloads mail's attachments in the mail folder by its name
to the local directory.
:param name: The name of the attachment that will be downloaded.
:type name: str
:param local_output_directory: The output directory on the local machine
where the files will be downloaded to.
:type local_output_directory: str
:param mail_folder: The mail folder where to look at.
The default value is 'INBOX'.
:type mail_folder: str
:param check_regex: Checks the name for a regular expression.
The default value is False.
:type check_regex: bool
:param latest_only: If set to True it will only download
the first matched attachment.
The default value is False.
:type latest_only: bool
"""
mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder,
check_regex, latest_only)
self._create_files(mail_attachments, local_output_directory)
def _retrieve_mails_attachments_by_name(self, name, mail_folder, check_regex,
latest_only):
all_matching_attachments = []
self.mail_client.select(mail_folder)
for mail_id in self._list_mail_ids_desc():
response_mail_body = self._fetch_mail_body(mail_id)
matching_attachments = self._check_mail_body(response_mail_body, name, check_regex, latest_only)
if matching_attachments:
all_matching_attachments.extend(matching_attachments)
if latest_only:
break
self.mail_client.close()
return all_matching_attachments
def _list_mail_ids_desc(self):
result, data = self.mail_client.search(None, 'All')
mail_ids = data[0].split()
return reversed(mail_ids)
def _fetch_mail_body(self, mail_id):
result, data = self.mail_client.fetch(mail_id, '(RFC822)')
mail_body = data[0][1] # The mail body is always in this specific location
mail_body_str = mail_body.decode('utf-8')
return mail_body_str
def _check_mail_body(self, response_mail_body, name, check_regex, latest_only):
mail = Mail(response_mail_body)
if mail.has_attachments():
return mail.get_attachments_by_name(name, check_regex, find_first=latest_only)
def _create_files(self, mail_attachments, local_output_directory):
for name, payload in mail_attachments:
if self._is_symlink(name):
self.log.error('Can not create file because it is a symlink!')
elif self._is_escaping_current_directory(name):
self.log.error('Can not create file because it is escaping the current directory!')
else:
self._create_file(name, payload, local_output_directory)
def _is_symlink(self, name):
return os.path.islink(name)
def _is_escaping_current_directory(self, name):
return '../' in name
def _correct_path(self, name, local_output_directory):
return local_output_directory + name if local_output_directory.endswith('/') \
else local_output_directory + '/' + name
def _create_file(self, name, payload, local_output_directory):
file_path = self._correct_path(name, local_output_directory)
with open(file_path, 'wb') as file:
file.write(payload)
class Mail(LoggingMixin):
"""
This class simplifies working with mails returned by the imaplib client.
:param mail_body: The mail body of a mail received from imaplib client.
:type mail_body: str
"""
def __init__(self, mail_body):
super(Mail, self).__init__()
self.mail = email.message_from_string(mail_body)
def has_attachments(self):
"""
Checks the mail for a attachments.
:returns: True if it has attachments and False if not.
:rtype: bool
"""
return self.mail.get_content_maintype() == 'multipart'
def get_attachments_by_name(self, name, check_regex, find_first=False):
"""
Gets all attachments by name for the mail.
:param name: The name of the attachment to look for.
:type name: str
:param check_regex: Checks the name for a regular expression.
:type check_regex: bool
:param find_first: If set to True it will only find the first match and then quit.
The default value is False.
:type find_first: bool
:returns: a list of tuples each containing name and payload
where the attachments name matches the given name.
:rtype: list of tuple
"""
attachments = []
for part in self.mail.walk():
mail_part = MailPart(part)
if mail_part.is_attachment():
found_attachment = mail_part.has_matching_name(name) if check_regex \
else mail_part.has_equal_name(name)
if found_attachment:
file_name, file_payload = mail_part.get_file()
self.log.info('Found attachment: {}'.format(file_name))
attachments.append((file_name, file_payload))
if find_first:
break
return attachments
class MailPart:
"""
This class is a wrapper for a Mail object's part and gives it more features.
:param part: The mail part in a Mail object.
:type part: any
"""
def __init__(self, part):
self.part = part
def is_attachment(self):
"""
Checks if the part is a valid mail attachment.
:returns: True if it is an attachment and False if not.
:rtype: bool
"""
return self.part.get_content_maintype() != 'multipart' and self.part.get('Content-Disposition')
def has_matching_name(self, name):
"""
Checks if the given name matches the part's name.
:param name: The name to look for.
:type name: str
:returns: True if it matches the name (including regular expression).
:rtype: tuple
"""
return re.match(name, self.part.get_filename())
def has_equal_name(self, name):
"""
Checks if the given name is equal to the part's name.
:param name: The name to look for.
:type name: str
:returns: True if it is equal to the given name.
:rtype: bool
"""
return self.part.get_filename() == name
def get_file(self):
"""
Gets the file including name and payload.
:returns: the part's name and payload.
:rtype: tuple
"""
return self.part.get_filename(), self.part.get_payload(decode=True)