Files
stacosys/stacosys/core/imap.py
T

163 lines
4.9 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import base64
import datetime
import email
import imaplib
import logging
import re
from email.message import Message
from stacosys.model.email import Attachment, Email, Part
filename_re = re.compile('filename="(.+)"|filename=([^;\n\r"\']+)', re.I | re.S)
class Mailbox(object):
def __init__(self, host, port, ssl, login, password):
self.logger = logging.getLogger(__name__)
self.host = host
self.port = port
self.ssl = ssl
self.login = login
self.password = password
def __enter__(self):
if self.ssl:
self.imap = imaplib.IMAP4_SSL(self.host, self.port)
else:
self.imap = imaplib.IMAP4(self.host, self.port)
self.imap.login(self.login, self.password)
return self
def __exit__(self, type, value, traceback):
self.imap.close()
self.imap.logout()
def get_count(self):
self.imap.select("Inbox")
_, data = self.imap.search(None, "ALL")
return sum(1 for num in data[0].split())
def fetch_raw_message(self, num):
self.imap.select("Inbox")
_, data = self.imap.fetch(str(num), "(RFC822)")
email_msg = email.message_from_bytes(data[0][1])
return email_msg
def fetch_message(self, num):
raw_msg = self.fetch_raw_message(num)
parts = []
attachments = []
plain_text_content = "no plain-text part"
for part in raw_msg.walk():
if part.is_multipart():
continue
content_disposition = part.get("Content-Disposition", None)
if content_disposition:
# we have attachment
r = filename_re.findall(content_disposition)
if r:
filename = sorted(r[0])[1]
else:
filename = "undefined"
content = base64.b64encode(part.get_payload(decode=True))
content = content.decode()
attachments.append(
Attachment(
filename=email_nonascii_to_uft8(filename),
content=content,
content_type=part.get_content_type(),
)
)
else:
try:
content = to_plain_text_content(part)
except Exception:
logging.exception("cannot extract content from mail part")
parts.append(
Part(content=content, content_type=part.get_content_type())
)
if part.get_content_type() == "text/plain":
plain_text_content = content
return Email(
id=num,
encoding="UTF-8",
date=parse_date(raw_msg["Date"]).strftime("%Y-%m-%d %H:%M:%S"),
from_addr=raw_msg["From"],
to_addr=raw_msg["To"],
subject=email_nonascii_to_uft8(raw_msg["Subject"]),
parts=parts,
attachments=attachments,
plain_text_content=plain_text_content,
)
def delete_message(self, num):
self.imap.select("Inbox")
self.imap.store(str(num), "+FLAGS", r"\Deleted")
self.imap.expunge()
def delete_all(self):
self.imap.select("Inbox")
_, data = self.imap.search(None, "ALL")
for num in data[0].split():
self.imap.store(num, "+FLAGS", r"\Deleted")
self.imap.expunge()
def print_msgs(self):
self.imap.select("Inbox")
_, data = self.imap.search(None, "ALL")
for num in reversed(data[0].split()):
status, data = self.imap.fetch(num, "(RFC822)")
self.logger.debug("Message %s\n%s\n" % (num, data[0][1]))
def parse_date(v):
if v is None:
return datetime.datetime.now()
tt = email.utils.parsedate_tz(v)
if tt is None:
return datetime.datetime.now()
timestamp = email.utils.mktime_tz(tt)
date = datetime.datetime.fromtimestamp(timestamp)
return date
def to_utf8(string, charset):
return string.decode(charset).encode("UTF-8").decode("UTF-8")
def email_nonascii_to_uft8(string):
# RFC 1342 is a recommendation that provides a way to represent non ASCII
# characters inside e-mail in a way that wont confuse e-mail servers
subject = ""
for v, charset in email.header.decode_header(string):
if charset is None:
if type(v) is bytes:
v = v.decode()
subject = subject + v
else:
subject = subject + to_utf8(v, charset)
return subject
def to_plain_text_content(part: Message) -> str:
content = part.get_payload(decode=True)
charset = part.get_param("charset", None)
if charset:
content = to_utf8(content, charset)
elif type(content) == bytes:
content = content.decode("utf8")
# RFC 3676: remove automatic word-wrapping
return content.replace(" \r\n", " ")