Files
stacosys/app/services/processor.py
T

418 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
2016-11-08 13:51:26 +01:00
import os
import logging
import re
from datetime import datetime
from threading import Thread
from queue import Queue
2015-09-20 12:15:41 +02:00
from jinja2 import Environment
from jinja2 import FileSystemLoader
from app.models.site import Site
from app.models.comment import Comment
2015-05-24 19:40:46 +02:00
from app.models.reader import Reader
2015-09-11 20:23:53 +02:00
from app.models.report import Report
2015-05-16 18:12:18 +02:00
import requests
import json
import config
2015-09-27 19:40:40 +02:00
import PyRSS2Gen
import markdown
logger = logging.getLogger(__name__)
queue = Queue()
proc = None
env = None
class Processor(Thread):
def stop(self):
logger.info("stop requested")
self.is_running = False
def run(self):
2017-07-07 19:42:10 +02:00
logger.info('processor thread started')
self.is_running = True
while self.is_running:
2015-05-16 18:12:18 +02:00
try:
msg = queue.get()
if msg['request'] == 'new_comment':
new_comment(msg['data'])
2015-05-16 20:55:55 +02:00
elif msg['request'] == 'new_mail':
reply_comment_email(msg['data'])
2015-05-24 19:40:46 +02:00
elif msg['request'] == 'unsubscribe':
unsubscribe_reader(msg['data'])
2015-09-06 18:58:07 +02:00
elif msg['request'] == 'report':
report(msg['data'])
2015-09-20 12:33:42 +02:00
elif msg['request'] == 'late_accept':
late_accept_comment(msg['data'])
elif msg['request'] == 'late_reject':
late_reject_comment(msg['data'])
2015-05-16 18:12:18 +02:00
else:
logger.info("throw unknown request " + str(msg))
except:
logger.exception("processing failure")
def new_comment(data):
2015-05-16 18:12:18 +02:00
logger.info('new comment received: %s' % data)
token = data.get('token', '')
url = data.get('url', '')
2015-09-05 15:57:45 +02:00
author_name = data.get('author', '').strip()
author_email = data.get('email', '').strip()
author_site = data.get('site', '').strip()
2015-05-16 18:12:18 +02:00
message = data.get('message', '')
subscribe = data.get('subscribe', '')
# create a new comment row
site = Site.select().where(Site.token == token).get()
if author_site and author_site[:4] != 'http':
author_site = 'http://' + author_site
created = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# add a row to Comment table
comment = Comment(site=site, url=url, author_name=author_name,
2015-05-18 13:23:24 +02:00
author_site=author_site, author_email=author_email,
content=message, created=created, published=None)
2015-05-16 18:12:18 +02:00
comment.save()
2015-09-11 21:28:09 +02:00
article_url = "http://" + site.url + url
2015-09-11 21:27:06 +02:00
2015-05-16 18:12:18 +02:00
# render email body template
comment_list = (
'author: %s' % author_name,
'email: %s' % author_email,
'site: %s' % author_site,
'date: %s' % created,
'url: %s' % url,
'',
'%s' % message,
''
)
comment_text = '\n'.join(comment_list)
2015-05-18 13:23:24 +02:00
email_body = get_template('new_comment').render(
2015-09-11 21:27:06 +02:00
url=article_url, comment=comment_text)
2015-05-16 18:12:18 +02:00
# send email
2015-05-17 19:26:19 +02:00
subject = '%s: [%d:%s]' % (site.name, comment.id, token)
2015-05-16 18:12:18 +02:00
mail(site.admin_email, subject, email_body)
# Reader subscribes to further comments
2015-09-04 13:26:55 +02:00
if subscribe and author_email:
2015-05-24 19:40:46 +02:00
subscribe_reader(author_email, token, url)
2015-05-16 18:12:18 +02:00
logger.debug("new comment processed ")
2015-05-16 20:55:55 +02:00
def reply_comment_email(data):
2015-05-24 19:40:46 +02:00
from_email = data['from']
2015-05-16 20:55:55 +02:00
subject = data['subject']
message = ''
for part in data['parts']:
if part['content-type'] == 'text/plain':
message = part['content']
break
2015-05-17 19:26:19 +02:00
m = re.search('\[(\d+)\:(\w+)\]', subject)
if not m:
logger.warn('ignore corrupted email. No token %s' % subject)
return
2015-05-17 19:26:19 +02:00
comment_id = int(m.group(1))
token = m.group(2)
2015-05-16 20:55:55 +02:00
# retrieve site and comment rows
comment = Comment.select().where(Comment.id == comment_id).get()
if comment.site.token != token:
logger.warn('ignore corrupted email. Unknown token %d' % comment_id)
2015-05-16 20:55:55 +02:00
return
if not message:
logger.warn('ignore empty email')
return
# safe logic: no answer or unknown answer is a go for publishing
if message[:2].upper() == 'NO':
2015-09-06 18:58:07 +02:00
# report event
report_rejected(comment)
2015-05-16 20:55:55 +02:00
logger.info('discard comment: %d' % comment_id)
comment.delete_instance()
email_body = get_template('drop_comment').render(original=message)
2015-05-24 19:40:46 +02:00
mail(from_email, 'Re: ' + subject, email_body)
2015-05-16 20:55:55 +02:00
else:
2015-09-06 18:58:07 +02:00
# report event
report_published(comment)
2015-05-18 13:23:24 +02:00
# update Comment row
2015-05-16 20:55:55 +02:00
comment.published = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
comment.save()
logger.info('commit comment: %d' % comment_id)
2015-09-27 19:40:40 +02:00
# rebuild RSS
rss(token)
2015-05-16 20:55:55 +02:00
# send approval confirmation email to admin
email_body = get_template('approve_comment').render(original=message)
2015-05-24 19:40:46 +02:00
mail(from_email, 'Re: ' + subject, email_body)
2015-05-16 20:55:55 +02:00
# notify reader once comment is published
2015-05-24 19:40:46 +02:00
reader_email = get_email_metadata(message)
if reader_email:
notify_reader(from_email, reader_email, comment.site.token,
2015-09-11 21:36:35 +02:00
comment.site.url, comment.url)
2015-05-16 20:55:55 +02:00
# notify subscribers every time a new comment is published
2015-09-11 21:36:35 +02:00
notify_subscribed_readers(
comment.site.token, comment.site.url, comment.url)
2015-09-20 12:33:42 +02:00
def late_reject_comment(id):
# retrieve site and comment rows
comment = Comment.select().where(Comment.id == id).get()
# report event
report_rejected(comment)
# delete Comment row
comment.delete_instance()
2015-09-20 12:39:09 +02:00
logger.info('late reject comment: %s' % id)
2015-09-20 12:33:42 +02:00
def late_accept_comment(id):
# retrieve site and comment rows
comment = Comment.select().where(Comment.id == id).get()
# report event
report_published(comment)
# update Comment row
comment.published = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
comment.save()
2015-09-20 12:39:09 +02:00
logger.info('late accept comment: %s' % id)
2015-09-20 12:33:42 +02:00
def get_email_metadata(message):
2015-05-24 19:40:46 +02:00
# retrieve metadata reader email from email body sent by admin
email = ""
m = re.search('email:\s(.+@.+\..+)', message)
if m:
email = m.group(1)
2015-05-24 19:40:46 +02:00
return email
2015-05-24 19:40:46 +02:00
def subscribe_reader(email, token, url):
logger.info('subscribe reader %s to %s [%s]' % (email, url, token))
recorded = Reader.select().join(Site).where(Site.token == token,
Reader.email == email,
Reader.url == url).count()
if recorded:
logger.debug('reader %s is already recorded' % email)
else:
site = Site.select().where(Site.token == token).get()
reader = Reader(site=site, email=email, url=url)
reader.save()
2015-09-06 18:58:07 +02:00
# report event
report_subscribed(reader)
2015-05-24 19:40:46 +02:00
def unsubscribe_reader(data):
token = data.get('token', '')
url = data.get('url', '')
email = data.get('email', '')
logger.info('unsubscribe reader %s from %s (%s)' % (email, url, token))
for reader in Reader.select().join(Site).where(Site.token == token,
Reader.email == email,
Reader.url == url):
2015-09-06 18:58:07 +02:00
# report event
report_unsubscribed(reader)
2015-05-24 19:40:46 +02:00
reader.delete_instance()
2015-09-11 21:21:35 +02:00
def notify_subscribed_readers(token, site_url, url):
2015-05-24 19:40:46 +02:00
logger.info('notify subscribers for %s (%s)' % (url, token))
2015-09-11 21:21:35 +02:00
article_url = "http://" + site_url + url
2015-05-24 19:40:46 +02:00
for reader in Reader.select().join(Site).where(Site.token == token,
Reader.url == url):
to_email = reader.email
logger.info('notify reader %s' % to_email)
2015-09-20 12:15:41 +02:00
unsubscribe_url = '%s/unsubscribe?email=%s&token=%s&url=%s' % (
config.ROOT_URL, to_email, token, reader.url)
2015-05-24 19:40:46 +02:00
email_body = get_template(
2015-09-11 21:21:35 +02:00
'notify_subscriber').render(article_url=article_url,
2015-05-24 19:40:46 +02:00
unsubscribe_url=unsubscribe_url)
subject = get_template('notify_message').render()
2015-05-24 19:40:46 +02:00
mail(to_email, subject, email_body)
2015-09-11 21:21:35 +02:00
def notify_reader(from_email, to_email, token, site_url, url):
2015-05-24 19:40:46 +02:00
logger.info('notify reader: email %s about URL %s' % (to_email, url))
2015-09-11 21:21:35 +02:00
article_url = "http://" + site_url + url
email_body = get_template('notify_reader').render(article_url=article_url)
subject = get_template('notify_message').render()
2015-05-24 19:40:46 +02:00
mail(to_email, subject, email_body)
2015-09-06 18:58:07 +02:00
def report_rejected(comment):
2015-09-11 20:23:53 +02:00
report = Report(site=comment.site, url=comment.url,
name=comment.author_name, email=comment.author_email,
rejected=True)
report.save()
2015-09-06 18:58:07 +02:00
def report_published(comment):
2015-09-11 20:23:53 +02:00
report = Report(site=comment.site, url=comment.url,
name=comment.author_name, email=comment.author_email,
published=True)
report.save()
2015-09-06 18:58:07 +02:00
2015-10-09 20:45:44 +02:00
def report_subscribed(reader):
report = Report(site=reader.site, url=reader.url,
name='', email=reader.email,
2015-09-11 20:23:53 +02:00
subscribed=True)
report.save()
2015-09-06 18:58:07 +02:00
2015-10-09 20:45:44 +02:00
def report_unsubscribed(reader):
report = Report(site=reader.site, url=reader.url,
name='', email=reader.email,
2015-09-11 20:23:53 +02:00
unsubscribed=True)
report.save()
2015-09-06 18:58:07 +02:00
def report(token):
2015-09-11 20:23:53 +02:00
site = Site.select().where(Site.token == token).get()
2015-09-20 12:15:41 +02:00
standbys = []
for row in Comment.select().join(Site).where(
Site.token == token, Comment.published.is_null(True)):
standbys.append({'url': "http://" + site.url + row.url,
'created': row.created.strftime('%d/%m/%y %H:%M'),
'name': row.author_name, 'content': row.content,
'id': row.id})
published = []
for row in Report.select().join(Site).where(
2015-09-27 19:40:40 +02:00
Site.token == token, Report.published):
2015-09-20 12:15:41 +02:00
published.append({'url': "http://" + site.url + row.url,
2015-09-27 19:40:40 +02:00
'name': row.name, 'email': row.email})
2015-09-20 12:15:41 +02:00
rejected = []
for row in Report.select().join(Site).where(
2015-09-27 19:40:40 +02:00
Site.token == token, Report.rejected):
2015-09-20 12:15:41 +02:00
rejected.append({'url': "http://" + site.url + row.url,
'name': row.name, 'email': row.email})
subscribed = []
for row in Report.select().join(Site).where(
2015-09-27 19:40:40 +02:00
Site.token == token, Report.subscribed):
2015-09-20 12:15:41 +02:00
subscribed.append({'url': "http://" + site.url + row.url,
2015-09-27 19:40:40 +02:00
'name': row.name, 'email': row.email})
2015-09-20 12:15:41 +02:00
unsubscribed = []
for row in Report.select().join(Site).where(
2015-09-27 19:40:40 +02:00
Site.token == token, Report.subscribed):
2015-09-20 12:15:41 +02:00
unsubscribed.append({'url': "http://" + site.url + row.url,
2015-09-27 19:40:40 +02:00
'name': row.name, 'email': row.email})
2015-09-20 12:15:41 +02:00
email_body = get_template('report').render(secret=config.SECRET,
root_url=config.ROOT_URL,
standbys=standbys,
published=published,
rejected=rejected,
subscribed=subscribed,
unsubscribed=unsubscribed)
2015-09-11 20:23:53 +02:00
subject = get_template('report_message').render(site=site.name)
2015-09-20 12:20:28 +02:00
mail(site.admin_email, subject, email_body)
2015-09-11 20:23:53 +02:00
# delete report table
2015-09-20 12:42:15 +02:00
Report.delete().execute()
2015-09-06 18:58:07 +02:00
2016-11-08 13:51:26 +01:00
def rss(token, onstart=False):
if onstart and os.path.isfile(config.RSS_FILE):
return
2015-09-27 19:40:40 +02:00
site = Site.select().where(Site.token == token).get()
rss_title = get_template('rss_title_message').render(site=site.name)
md = markdown.Markdown()
items = []
for row in Comment.select().join(Site).where(
Site.token == token, Comment.published).order_by(
-Comment.published).limit(10):
2015-10-05 16:40:39 +02:00
item_link = "http://%s%s" % (site.url, row.url)
2015-09-27 19:40:40 +02:00
items.append(PyRSS2Gen.RSSItem(
title='%s - http://%s%s' % (row.author_name, site.url, row.url),
link=item_link,
2015-09-27 19:40:40 +02:00
description=md.convert(row.content),
2016-11-14 14:34:52 +01:00
guid=PyRSS2Gen.Guid('%s/%d' % (item_link, row.id)),
2015-09-27 19:40:40 +02:00
pubDate=row.published
))
rss = PyRSS2Gen.RSS2(
title=rss_title,
link="http://" + site.url,
description="Commentaires du site '%s'" % site.name,
lastBuildDate=datetime.now(),
items=items)
rss.write_xml(open(config.RSS_FILE, "w"), encoding="utf-8")
2015-05-16 18:12:18 +02:00
def mail(to_email, subject, message):
2015-05-16 18:12:18 +02:00
headers = {'Content-Type': 'application/json; charset=utf-8'}
msg = {
'to': to_email,
'subject': subject,
'content': message
}
r = requests.post(config.MAIL_URL, data=json.dumps(msg), headers=headers)
if r.status_code in (200, 201):
logger.debug('Email for %s posted' % to_email)
else:
logger.warn('Cannot post email for %s' % to_email)
def get_template(name):
2015-05-16 18:12:18 +02:00
return env.get_template(config.LANG + '/' + name + '.tpl')
def enqueue(something):
queue.put(something)
def get_processor():
return proc
def start(template_dir):
global proc, env
# initialize Jinja 2 templating
logger.info("load templates from directory %s" % template_dir)
env = Environment(loader=FileSystemLoader(template_dir))
2015-09-27 19:40:40 +02:00
# generate RSS for all sites
for site in Site.select():
2016-11-08 13:51:26 +01:00
rss(site.token, True)
2015-09-27 19:40:40 +02:00
# start processor thread
proc = Processor()
proc.start()