Source code for airflow.contrib.hooks.imap_hook

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import email
import imaplib
import os
import re

from airflow import LoggingMixin
from airflow.hooks.base_hook import BaseHook


[docs]class ImapHook(BaseHook): """ This hook connects to a mail server by using the imap protocol. :param imap_conn_id: The connection id that contains the information used to authenticate the client. The default value is 'imap_default'. :type imap_conn_id: str """ def __init__(self, imap_conn_id='imap_default'): super(ImapHook, self).__init__(imap_conn_id) self.conn = self.get_connection(imap_conn_id) self.mail_client = imaplib.IMAP4_SSL(self.conn.host) def __enter__(self): self.mail_client.login(self.conn.login, self.conn.password) return self def __exit__(self, exc_type, exc_val, exc_tb): self.mail_client.logout()
[docs] def has_mail_attachment(self, name, mail_folder='INBOX', check_regex=False): """ Checks the mail folder for mails containing attachments with the given name. :param name: The name of the attachment that will be searched for. :type name: str :param mail_folder: The mail folder where to look at. The default value is 'INBOX'. :type mail_folder: str :param check_regex: Checks the name for a regular expression. The default value is False. :type check_regex: bool :returns: True if there is an attachment with the given name and False if not. :rtype: bool """ mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder, check_regex, latest_only=True) return len(mail_attachments) > 0
[docs] def retrieve_mail_attachments(self, name, mail_folder='INBOX', check_regex=False, latest_only=False): """ Retrieves mail's attachments in the mail folder by its name. :param name: The name of the attachment that will be downloaded. :type name: str :param mail_folder: The mail folder where to look at. The default value is 'INBOX'. :type mail_folder: str :param check_regex: Checks the name for a regular expression. The default value is False. :type check_regex: bool :param latest_only: If set to True it will only retrieve the first matched attachment. The default value is False. :type latest_only: bool :returns: a list of tuple each containing the attachment filename and its payload. :rtype: a list of tuple """ mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder, check_regex, latest_only) return mail_attachments
[docs] def download_mail_attachments(self, name, local_output_directory, mail_folder='INBOX', check_regex=False, latest_only=False): """ Downloads mail's attachments in the mail folder by its name to the local directory. :param name: The name of the attachment that will be downloaded. :type name: str :param local_output_directory: The output directory on the local machine where the files will be downloaded to. :type local_output_directory: str :param mail_folder: The mail folder where to look at. The default value is 'INBOX'. :type mail_folder: str :param check_regex: Checks the name for a regular expression. The default value is False. :type check_regex: bool :param latest_only: If set to True it will only download the first matched attachment. The default value is False. :type latest_only: bool """ mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder, check_regex, latest_only) self._create_files(mail_attachments, local_output_directory)
def _retrieve_mails_attachments_by_name(self, name, mail_folder, check_regex, latest_only): all_matching_attachments = [] self.mail_client.select(mail_folder) for mail_id in self._list_mail_ids_desc(): response_mail_body = self._fetch_mail_body(mail_id) matching_attachments = self._check_mail_body(response_mail_body, name, check_regex, latest_only) if matching_attachments: all_matching_attachments.extend(matching_attachments) if latest_only: break self.mail_client.close() return all_matching_attachments def _list_mail_ids_desc(self): result, data = self.mail_client.search(None, 'All') mail_ids = data[0].split() return reversed(mail_ids) def _fetch_mail_body(self, mail_id): result, data = self.mail_client.fetch(mail_id, '(RFC822)') mail_body = data[0][1] # The mail body is always in this specific location mail_body_str = mail_body.decode('utf-8') return mail_body_str def _check_mail_body(self, response_mail_body, name, check_regex, latest_only): mail = Mail(response_mail_body) if mail.has_attachments(): return mail.get_attachments_by_name(name, check_regex, find_first=latest_only) def _create_files(self, mail_attachments, local_output_directory): for name, payload in mail_attachments: if self._is_symlink(name): self.log.error('Can not create file because it is a symlink!') elif self._is_escaping_current_directory(name): self.log.error('Can not create file because it is escaping the current directory!') else: self._create_file(name, payload, local_output_directory) def _is_symlink(self, name): return os.path.islink(name) def _is_escaping_current_directory(self, name): return '../' in name def _correct_path(self, name, local_output_directory): return local_output_directory + name if local_output_directory.endswith('/') \ else local_output_directory + '/' + name def _create_file(self, name, payload, local_output_directory): file_path = self._correct_path(name, local_output_directory) with open(file_path, 'wb') as file: file.write(payload)
class Mail(LoggingMixin): """ This class simplifies working with mails returned by the imaplib client. :param mail_body: The mail body of a mail received from imaplib client. :type mail_body: str """ def __init__(self, mail_body): super(Mail, self).__init__() self.mail = email.message_from_string(mail_body) def has_attachments(self): """ Checks the mail for a attachments. :returns: True if it has attachments and False if not. :rtype: bool """ return self.mail.get_content_maintype() == 'multipart' def get_attachments_by_name(self, name, check_regex, find_first=False): """ Gets all attachments by name for the mail. :param name: The name of the attachment to look for. :type name: str :param check_regex: Checks the name for a regular expression. :type check_regex: bool :param find_first: If set to True it will only find the first match and then quit. The default value is False. :type find_first: bool :returns: a list of tuples each containing name and payload where the attachments name matches the given name. :rtype: list of tuple """ attachments = [] for part in self.mail.walk(): mail_part = MailPart(part) if mail_part.is_attachment(): found_attachment = mail_part.has_matching_name(name) if check_regex \ else mail_part.has_equal_name(name) if found_attachment: file_name, file_payload = mail_part.get_file() self.log.info('Found attachment: {}'.format(file_name)) attachments.append((file_name, file_payload)) if find_first: break return attachments class MailPart: """ This class is a wrapper for a Mail object's part and gives it more features. :param part: The mail part in a Mail object. :type part: any """ def __init__(self, part): self.part = part def is_attachment(self): """ Checks if the part is a valid mail attachment. :returns: True if it is an attachment and False if not. :rtype: bool """ return self.part.get_content_maintype() != 'multipart' and self.part.get('Content-Disposition') def has_matching_name(self, name): """ Checks if the given name matches the part's name. :param name: The name to look for. :type name: str :returns: True if it matches the name (including regular expression). :rtype: tuple """ return re.match(name, self.part.get_filename()) def has_equal_name(self, name): """ Checks if the given name is equal to the part's name. :param name: The name to look for. :type name: str :returns: True if it is equal to the given name. :rtype: bool """ return self.part.get_filename() == name def get_file(self): """ Gets the file including name and payload. :returns: the part's name and payload. :rtype: tuple """ return self.part.get_filename(), self.part.get_payload(decode=True)