sonarlint

pull/6/head
Yax 3 years ago
parent 89c1b09c57
commit f2c668425e

@ -31,14 +31,14 @@ class Mailbox(object):
self.imap.login(self.login, self.password) self.imap.login(self.login, self.password)
return self return self
def __exit__(self, type, value, traceback): def __exit__(self, _type, value, traceback):
self.imap.close() self.imap.close()
self.imap.logout() self.imap.logout()
def get_count(self): def get_count(self):
self.imap.select("Inbox") self.imap.select("Inbox")
_, data = self.imap.search(None, "ALL") _, data = self.imap.search(None, "ALL")
return sum(1 for num in data[0].split()) return sum(1 for _ in data[0].split())
def fetch_raw_message(self, num): def fetch_raw_message(self, num):
self.imap.select("Inbox") self.imap.select("Inbox")
@ -56,43 +56,26 @@ class Mailbox(object):
if part.is_multipart(): if part.is_multipart():
continue continue
content_disposition = part.get("Content-Disposition", None) if _is_part_attachment(part):
if content_disposition: attachments.append(_get_attachment(part))
# we have attachment
r = filename_re.findall(content_disposition)
if r:
filename = sorted(r[0])[1]
else:
filename = "undefined"
content = base64.b64encode(part.get_payload(decode=True))
content = content.decode()
attachments.append(
Attachment(
filename=email_nonascii_to_uft8(filename),
content=content,
content_type=part.get_content_type(),
)
)
else: else:
try: try:
content = to_plain_text_content(part) content = _to_plain_text_content(part)
except Exception:
logging.exception("cannot extract content from mail part")
parts.append( parts.append(
Part(content=content, content_type=part.get_content_type()) Part(content=content, content_type=part.get_content_type())
) )
if part.get_content_type() == "text/plain": if part.get_content_type() == "text/plain":
plain_text_content = content plain_text_content = content
except Exception:
logging.exception("cannot extract content from mail part")
return Email( return Email(
id=num, id=num,
encoding="UTF-8", encoding="UTF-8",
date=parse_date(raw_msg["Date"]).strftime("%Y-%m-%d %H:%M:%S"), date=_parse_date(raw_msg["Date"]).strftime("%Y-%m-%d %H:%M:%S"),
from_addr=raw_msg["From"], from_addr=raw_msg["From"],
to_addr=raw_msg["To"], to_addr=raw_msg["To"],
subject=email_nonascii_to_uft8(raw_msg["Subject"]), subject=_email_non_ascii_to_uft8(raw_msg["Subject"]),
parts=parts, parts=parts,
attachments=attachments, attachments=attachments,
plain_text_content=plain_text_content, plain_text_content=plain_text_content,
@ -118,26 +101,22 @@ class Mailbox(object):
self.logger.debug("Message %s\n%s\n" % (num, data[0][1])) self.logger.debug("Message %s\n%s\n" % (num, data[0][1]))
def parse_date(v): def _parse_date(v):
if v is None: if v is None:
return datetime.datetime.now() return datetime.datetime.now()
tt = email.utils.parsedate_tz(v) tt = email.utils.parsedate_tz(v)
if tt is None: if tt is None:
return datetime.datetime.now() return datetime.datetime.now()
timestamp = email.utils.mktime_tz(tt) timestamp = email.utils.mktime_tz(tt)
date = datetime.datetime.fromtimestamp(timestamp) date = datetime.datetime.fromtimestamp(timestamp)
return date return date
def to_utf8(string, charset): def _to_utf8(string, charset):
return string.decode(charset).encode("UTF-8").decode("UTF-8") return string.decode(charset).encode("UTF-8").decode("UTF-8")
def email_nonascii_to_uft8(string): def _email_non_ascii_to_uft8(string):
# RFC 1342 is a recommendation that provides a way to represent non ASCII # RFC 1342 is a recommendation that provides a way to represent non ASCII
# characters inside e-mail in a way that wont confuse e-mail servers # characters inside e-mail in a way that wont confuse e-mail servers
subject = "" subject = ""
@ -147,16 +126,36 @@ def email_nonascii_to_uft8(string):
v = v.decode() v = v.decode()
subject = subject + v subject = subject + v
else: else:
subject = subject + to_utf8(v, charset) subject = subject + _to_utf8(v, charset)
return subject return subject
def to_plain_text_content(part: Message) -> str: def _to_plain_text_content(part: Message) -> str:
content = part.get_payload(decode=True) content = part.get_payload(decode=True)
charset = part.get_param("charset", None) charset = part.get_param("charset", None)
if charset: if charset:
content = to_utf8(content, charset) content = _to_utf8(content, charset)
elif type(content) == bytes: elif type(content) == bytes:
content = content.decode("utf8") content = content.decode("utf8")
# RFC 3676: remove automatic word-wrapping # RFC 3676: remove automatic word-wrapping
return content.replace(" \r\n", " ") return content.replace(" \r\n", " ")
def _is_part_attachment(part):
return part.get("Content-Disposition", None)
def _get_attachment(part) -> Attachment:
content_disposition = part.get("Content-Disposition", None)
r = filename_re.findall(content_disposition)
if r:
filename = sorted(r[0])[1]
else:
filename = "undefined"
content = base64.b64encode(part.get_payload(decode=True))
content = content.decode()
return Attachment(
filename=_email_non_ascii_to_uft8(filename),
content=content,
content_type=part.get_content_type(),
)

@ -4,18 +4,20 @@ from datetime import datetime
from stacosys.model.comment import Comment from stacosys.model.comment import Comment
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
def find_comment_by_id(id): def find_comment_by_id(id):
return Comment.get_by_id(id) return Comment.get_by_id(id)
def notify_comment(comment: Comment): def notify_comment(comment: Comment):
comment.notified = datetime.now().strftime("%Y-%m-%d %H:%M:%S") comment.notified = datetime.now().strftime(TIME_FORMAT)
comment.save() comment.save()
def publish_comment(comment: Comment): def publish_comment(comment: Comment):
comment.published = datetime.now().strftime("%Y-%m-%d %H:%M:%S") comment.published = datetime.now().strftime(TIME_FORMAT)
comment.save() comment.save()

Loading…
Cancel
Save