More pythonic singleton with module. Apply pylint recommandations

pull/9/head
Yax 2 years ago
parent f231ed1cbb
commit b1c64d2cc8

@ -1,15 +1,15 @@
all: black test typehint lint all: black test typehint lint
black: black:
isort --multi-line 3 --profile black stacosys/ poetry run isort --multi-line 3 --profile black stacosys/
black stacosys/ poetry run black stacosys/
test: test:
pytest poetry run pytest
typehint: typehint:
mypy --ignore-missing-imports stacosys/ poetry run mypy --ignore-missing-imports stacosys/
lint: lint:
pylint stacosys/ poetry run pylint stacosys/

@ -6,7 +6,8 @@ db_sqlite_file = db.sqlite
[site] [site]
name = "My blog" name = "My blog"
url = http://blog.mydomain.com proto = https
url = https://blog.mydomain.com
admin_email = admin@mydomain.com admin_email = admin@mydomain.com
redirect = /redirect redirect = /redirect
@ -15,7 +16,6 @@ host = 127.0.0.1
port = 8100 port = 8100
[rss] [rss]
proto = https
file = comments.xml file = comments.xml
[smtp] [smtp]

File diff suppressed because it is too large Load Diff

@ -1,66 +0,0 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from datetime import datetime
import markdown
import PyRSS2Gen
from stacosys.model.comment import Comment
class Rss:
def __init__(
self,
lang,
rss_file,
rss_proto,
site_name,
site_url,
):
self._lang = lang
self._rss_file = rss_file
self._rss_proto = rss_proto
self._site_name = site_name
self._site_url = site_url
def generate(self):
md = markdown.Markdown()
items = []
for row in (
Comment.select()
.where(Comment.published)
.order_by(-Comment.published)
.limit(10)
):
item_link = "%s://%s%s" % (
self._rss_proto,
self._site_url,
row.url,
)
items.append(
PyRSS2Gen.RSSItem(
title="%s - %s://%s%s"
% (
self._rss_proto,
row.author_name,
self._site_url,
row.url,
),
link=item_link,
description=md.convert(row.content),
guid=PyRSS2Gen.Guid("%s/%d" % (item_link, row.id)),
pubDate=row.published,
)
)
rss_title = 'Commentaires du site "%s"' % self._site_name
rss = PyRSS2Gen.RSS2(
title=rss_title,
link="%s://%s" % (self._rss_proto, self._site_url),
description=rss_title,
lastBuildDate=datetime.now(),
items=items,
)
rss.write_xml(open(self._rss_file, "w"), encoding="utf-8")

@ -7,6 +7,8 @@ from flask import abort, redirect, request
from stacosys.db import dao from stacosys.db import dao
from stacosys.interface import app from stacosys.interface import app
from stacosys.service import config, mailer
from stacosys.service.configuration import ConfigParameter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -46,7 +48,7 @@ def new_form_comment():
# send notification e-mail asynchronously # send notification e-mail asynchronously
submit_new_comment(comment) submit_new_comment(comment)
return redirect(app.config.get("SITE_REDIRECT"), code=302) return redirect(config.get(ConfigParameter.SITE_REDIRECT), code=302)
def check_form_data(posted_comment): def check_form_data(posted_comment):
@ -57,7 +59,7 @@ def check_form_data(posted_comment):
@background.task @background.task
def submit_new_comment(comment): def submit_new_comment(comment):
site_url = app.config.get("SITE_URL") site_url = config.get(ConfigParameter.SITE_URL)
comment_list = ( comment_list = (
f"Web admin interface: {site_url}/web/admin", f"Web admin interface: {site_url}/web/admin",
"", "",
@ -72,8 +74,9 @@ def submit_new_comment(comment):
email_body = "\n".join(comment_list) email_body = "\n".join(comment_list)
# send email to notify admin # send email to notify admin
subject = "STACOSYS " + app.config.get("SITE_NAME") site_name = config.get(ConfigParameter.SITE_NAME)
if app.config.get("MAILER").send(subject, email_body): subject = f"STACOSYS {site_name}"
if mailer.send(subject, email_body):
logger.debug("new comment processed") logger.debug("new comment processed")
# save notification datetime # save notification datetime

@ -8,6 +8,8 @@ from flask import flash, redirect, render_template, request, session
from stacosys.db import dao from stacosys.db import dao
from stacosys.interface import app from stacosys.interface import app
from stacosys.service import config, rss
from stacosys.service.configuration import ConfigParameter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -23,8 +25,8 @@ def index():
def is_login_ok(username, password): def is_login_ok(username, password):
hashed = hashlib.sha256(password.encode()).hexdigest().upper() hashed = hashlib.sha256(password.encode()).hexdigest().upper()
return ( return (
app.config.get("WEB_USERNAME") == username config.get(ConfigParameter.WEB_USERNAME) == username
and app.config.get("WEB_PASSWORD") == hashed and config.get(ConfigParameter.WEB_PASSWORD) == hashed
) )
@ -40,7 +42,7 @@ def login():
flash("Identifiant ou mot de passe incorrect") flash("Identifiant ou mot de passe incorrect")
return redirect("/web/login") return redirect("/web/login")
# GET # GET
return render_template("login_" + app.config.get("LANG", "fr") + ".html") return render_template("login_" + config.get(ConfigParameter.LANG) + ".html")
@app.route("/web/logout", methods=["GET"]) @app.route("/web/logout", methods=["GET"])
@ -51,16 +53,19 @@ def logout():
@app.route("/web/admin", methods=["GET"]) @app.route("/web/admin", methods=["GET"])
def admin_homepage(): def admin_homepage():
if not ("user" in session and session["user"] == app.config.get("WEB_USERNAME")): if not (
"user" in session
and session["user"] == config.get(ConfigParameter.WEB_USERNAME)
):
# TODO localization # TODO localization
flash("Vous avez été déconnecté.") flash("Vous avez été déconnecté.")
return redirect("/web/login") return redirect("/web/login")
comments = dao.find_not_published_comments() comments = dao.find_not_published_comments()
return render_template( return render_template(
"admin_" + app.config.get("LANG", "fr") + ".html", "admin_" + config.get(ConfigParameter.LANG) + ".html",
comments=comments, comments=comments,
baseurl=app.config.get("SITE_URL"), baseurl=config.get(ConfigParameter.SITE_URL),
) )
@ -72,7 +77,7 @@ def admin_action():
flash("Commentaire introuvable") flash("Commentaire introuvable")
elif request.form.get("action") == "APPROVE": elif request.form.get("action") == "APPROVE":
dao.publish_comment(comment) dao.publish_comment(comment)
app.config.get("RSS").generate() rss.generate()
# TODO localization # TODO localization
flash("Commentaire publié") flash("Commentaire publié")
else: else:

@ -6,12 +6,11 @@ import logging
import os import os
import sys import sys
from stacosys.conf.config import Config, ConfigParameter
from stacosys.core.mailer import Mailer
from stacosys.core.rss import Rss
from stacosys.db import database from stacosys.db import database
from stacosys.interface import api, app, form from stacosys.interface import api, app, form
from stacosys.interface.web import admin from stacosys.interface.web import admin
from stacosys.service import config, mailer, rss
from stacosys.service.configuration import ConfigParameter
# configure logging # configure logging
@ -37,16 +36,16 @@ def stacosys_server(config_pathname):
logger.error("Configuration file '%s' not found.", config_pathname) logger.error("Configuration file '%s' not found.", config_pathname)
sys.exit(1) sys.exit(1)
# load config # load and check config
conf = Config.load(config_pathname) config.load(config_pathname)
is_config_ok, erreur_config = conf.check() is_config_ok, erreur_config = config.check()
if not is_config_ok: if not is_config_ok:
logger.error("Configuration incorrecte '%s'", erreur_config) logger.error("Configuration incorrecte '%s'", erreur_config)
sys.exit(1) sys.exit(1)
logger.info(conf) logger.info(config)
# check database file exists (prevents from creating a fresh db) # check database file exists (prevents from creating a fresh db)
db_pathname = conf.get(ConfigParameter.DB_SQLITE_FILE) db_pathname = config.get(ConfigParameter.DB_SQLITE_FILE)
if not db_pathname or not os.path.isfile(db_pathname): if not db_pathname or not os.path.isfile(db_pathname):
logger.error("Database file '%s' not found.", db_pathname) logger.error("Database file '%s' not found.", db_pathname)
sys.exit(1) sys.exit(1)
@ -57,39 +56,29 @@ def stacosys_server(config_pathname):
logger.info("Start Stacosys application") logger.info("Start Stacosys application")
# generate RSS # generate RSS
rss = Rss( rss.configure(
conf.get(ConfigParameter.LANG), config.get(ConfigParameter.RSS_FILE),
conf.get(ConfigParameter.RSS_FILE), config.get(ConfigParameter.SITE_PROTO),
conf.get(ConfigParameter.RSS_PROTO), config.get(ConfigParameter.SITE_NAME),
conf.get(ConfigParameter.SITE_NAME), config.get(ConfigParameter.SITE_URL),
conf.get(ConfigParameter.SITE_URL),
) )
rss.generate() rss.generate()
# configure mailer # configure mailer
mailer = Mailer( mailer.configure_smtp(
conf.get(ConfigParameter.SMTP_HOST), config.get(ConfigParameter.SMTP_HOST),
conf.get_int(ConfigParameter.SMTP_PORT), config.get_int(ConfigParameter.SMTP_PORT),
conf.get(ConfigParameter.SMTP_LOGIN), config.get(ConfigParameter.SMTP_LOGIN),
conf.get(ConfigParameter.SMTP_PASSWORD), config.get(ConfigParameter.SMTP_PASSWORD),
conf.get(ConfigParameter.SITE_ADMIN_EMAIL),
) )
mailer.configure_destination(config.get(ConfigParameter.SITE_ADMIN_EMAIL))
# inject config parameters into flask
app.config.update(LANG=conf.get(ConfigParameter.LANG))
app.config.update(SITE_NAME=conf.get(ConfigParameter.SITE_NAME))
app.config.update(SITE_URL=conf.get(ConfigParameter.SITE_URL))
app.config.update(SITE_REDIRECT=conf.get(ConfigParameter.SITE_REDIRECT))
app.config.update(WEB_USERNAME=conf.get(ConfigParameter.WEB_USERNAME))
app.config.update(WEB_PASSWORD=conf.get(ConfigParameter.WEB_PASSWORD))
app.config.update(MAILER=mailer)
app.config.update(RSS=rss)
logger.info("start interfaces %s %s %s", api, form, admin) logger.info("start interfaces %s %s %s", api, form, admin)
# start Flask # start Flask
app.run( app.run(
host=conf.get(ConfigParameter.HTTP_HOST), host=config.get(ConfigParameter.HTTP_HOST),
port=conf.get_int(ConfigParameter.HTTP_PORT), port=config.get_int(ConfigParameter.HTTP_PORT),
debug=False, debug=False,
use_reloader=False, use_reloader=False,
) )

@ -0,0 +1,10 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from .configuration import Config
from .mail import Mailer
from .rssfeed import Rss
config = Config()
mailer = Mailer()
rss = Rss()

@ -12,7 +12,6 @@ class ConfigParameter(Enum):
HTTP_HOST = "http.host" HTTP_HOST = "http.host"
HTTP_PORT = "http.port" HTTP_PORT = "http.port"
RSS_PROTO = "rss.proto"
RSS_FILE = "rss.file" RSS_FILE = "rss.file"
SMTP_HOST = "smtp.host" SMTP_HOST = "smtp.host"
@ -20,6 +19,7 @@ class ConfigParameter(Enum):
SMTP_LOGIN = "smtp.login" SMTP_LOGIN = "smtp.login"
SMTP_PASSWORD = "smtp.password" SMTP_PASSWORD = "smtp.password"
SITE_PROTO = "site.proto"
SITE_NAME = "site.name" SITE_NAME = "site.name"
SITE_URL = "site.url" SITE_URL = "site.url"
SITE_ADMIN_EMAIL = "site.admin_email" SITE_ADMIN_EMAIL = "site.admin_email"
@ -30,14 +30,16 @@ class ConfigParameter(Enum):
class Config: class Config:
def __init__(self):
self._cfg = configparser.ConfigParser()
@classmethod _cfg = configparser.ConfigParser()
def load(cls, config_pathname):
config = cls() # def __new__(cls):
config._cfg.read(config_pathname) # if not hasattr(cls, "instance"):
return config # cls.instance = super(Config, cls).__new__(cls)
# return cls.instance
def load(self, config_pathname):
self._cfg.read(config_pathname)
def _split_key(self, key: ConfigParameter): def _split_key(self, key: ConfigParameter):
section, param = str(key.value).split(".") section, param = str(key.value).split(".")
@ -50,12 +52,12 @@ class Config:
section, param = self._split_key(key) section, param = self._split_key(key)
return self._cfg.has_option(section, param) return self._cfg.has_option(section, param)
def get(self, key: ConfigParameter): def get(self, key: ConfigParameter) -> str:
section, param = self._split_key(key) section, param = self._split_key(key)
return ( return (
self._cfg.get(section, param) self._cfg.get(section, param)
if self._cfg.has_option(section, param) if self._cfg.has_option(section, param)
else None else ""
) )
def put(self, key: ConfigParameter, value): def put(self, key: ConfigParameter, value):
@ -64,11 +66,11 @@ class Config:
self._cfg.add_section(section) self._cfg.add_section(section)
self._cfg.set(section, param, str(value)) self._cfg.set(section, param, str(value))
def get_int(self, key: ConfigParameter): def get_int(self, key: ConfigParameter) -> int:
value = self.get(key) value = self.get(key)
return int(value) if value else 0 return int(value) if value else 0
def get_bool(self, key: ConfigParameter): def get_bool(self, key: ConfigParameter) -> bool:
value = self.get(key) value = self.get(key)
assert value in ( assert value in (
"yes", "yes",
@ -85,8 +87,8 @@ class Config:
return (True, None) return (True, None)
def __repr__(self): def __repr__(self):
d = dict() dict_repr = {}
for section in self._cfg.sections(): for section in self._cfg.sections():
for option in self._cfg.options(section): for option in self._cfg.options(section):
d[".".join([section, option])] = self._cfg.get(section, option) dict_repr[".".join([section, option])] = self._cfg.get(section, option)
return str(d) return str(dict_repr)

@ -10,21 +10,29 @@ logger = logging.getLogger(__name__)
class Mailer: class Mailer:
def __init__( def __init__(self) -> None:
self._smtp_host: str = ""
self._smtp_port: int = 0
self._smtp_login: str = ""
self._smtp_password: str = ""
self._site_admin_email: str = ""
def configure_smtp(
self, self,
smtp_host, smtp_host,
smtp_port, smtp_port,
smtp_login, smtp_login,
smtp_password, smtp_password,
site_admin_email, ) -> None:
):
self._smtp_host = smtp_host self._smtp_host = smtp_host
self._smtp_port = smtp_port self._smtp_port = smtp_port
self._smtp_login = smtp_login self._smtp_login = smtp_login
self._smtp_password = smtp_password self._smtp_password = smtp_password
def configure_destination(self, site_admin_email) -> None:
self._site_admin_email = site_admin_email self._site_admin_email = site_admin_email
def send(self, subject, message): def send(self, subject, message) -> bool:
sender = self._smtp_login sender = self._smtp_login
receivers = [self._site_admin_email] receivers = [self._site_admin_email]
@ -34,7 +42,7 @@ class Mailer:
msg["From"] = sender msg["From"] = sender
context = ssl.create_default_context() context = ssl.create_default_context()
# TODO catch SMTP failure # TODO catch SMTP failure
with smtplib.SMTP_SSL( with smtplib.SMTP_SSL(
self._smtp_host, self._smtp_port, context=context self._smtp_host, self._smtp_port, context=context
) as server: ) as server:

@ -0,0 +1,63 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from datetime import datetime
import markdown
import PyRSS2Gen
from stacosys.model.comment import Comment
class Rss:
def __init__(self) -> None:
self._rss_file: str = ""
self._site_proto: str = ""
self._site_name: str = ""
self._site_url: str = ""
def configure(
self,
rss_file,
site_proto,
site_name,
site_url,
) -> None:
self._rss_file = rss_file
self._site_proto = site_proto
self._site_name = site_name
self._site_url = site_url
def generate(self) -> None:
markdownizer = markdown.Markdown()
items = []
for row in (
Comment.select()
.where(Comment.published)
.order_by(-Comment.published)
.limit(10)
):
item_link = f"{self._site_proto}://{self._site_url}{row.url}"
items.append(
PyRSS2Gen.RSSItem(
title=f"{self._site_proto}://{self._site_url}{row.url} - {row.author_name}",
link=item_link,
description=markdownizer.convert(row.content),
guid=PyRSS2Gen.Guid(f"{item_link}{row.id}"),
pubDate=row.published,
)
)
rss_title = f"Commentaires du site {self._site_name}"
rss = PyRSS2Gen.RSS2(
title=rss_title,
link=f"{self._site_proto}://{self._site_url}",
description=rss_title,
lastBuildDate=datetime.now(),
items=items,
)
# TODO technical debt: replace pyRss2Gen
# TODO validate feed (https://validator.w3.org/feed/check.cgi)
# pylint: disable=consider-using-with
rss.write_xml(open(self._rss_file, "w", encoding="utf-8"), encoding="utf-8")

@ -25,7 +25,6 @@ def client():
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
database.setup(":memory:") database.setup(":memory:")
init_test_db() init_test_db()
app.config.update(SITE_TOKEN="ETC")
logger.info(f"start interface {api}") logger.info(f"start interface {api}")
return app.test_client() return app.test_client()

@ -3,7 +3,8 @@
import unittest import unittest
from stacosys.conf.config import Config, ConfigParameter from stacosys.service import config
from stacosys.service.configuration import ConfigParameter
EXPECTED_DB_SQLITE_FILE = "db.sqlite" EXPECTED_DB_SQLITE_FILE = "db.sqlite"
EXPECTED_HTTP_PORT = 8080 EXPECTED_HTTP_PORT = 8080
@ -11,31 +12,30 @@ EXPECTED_LANG = "fr"
class ConfigTestCase(unittest.TestCase): class ConfigTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.conf = Config() config.put(ConfigParameter.DB_SQLITE_FILE, EXPECTED_DB_SQLITE_FILE)
self.conf.put(ConfigParameter.DB_SQLITE_FILE, EXPECTED_DB_SQLITE_FILE) config.put(ConfigParameter.HTTP_PORT, EXPECTED_HTTP_PORT)
self.conf.put(ConfigParameter.HTTP_PORT, EXPECTED_HTTP_PORT)
def test_exists(self): def test_exists(self):
self.assertTrue(self.conf.exists(ConfigParameter.DB_SQLITE_FILE)) self.assertTrue(config.exists(ConfigParameter.DB_SQLITE_FILE))
def test_get(self): def test_get(self):
self.assertEqual( self.assertEqual(
self.conf.get(ConfigParameter.DB_SQLITE_FILE), EXPECTED_DB_SQLITE_FILE config.get(ConfigParameter.DB_SQLITE_FILE), EXPECTED_DB_SQLITE_FILE
) )
self.assertIsNone(self.conf.get(ConfigParameter.HTTP_HOST)) self.assertEqual(config.get(ConfigParameter.HTTP_HOST), "")
self.assertEqual( self.assertEqual(
self.conf.get(ConfigParameter.HTTP_PORT), str(EXPECTED_HTTP_PORT) config.get(ConfigParameter.HTTP_PORT), str(EXPECTED_HTTP_PORT)
) )
self.assertEqual(self.conf.get_int(ConfigParameter.HTTP_PORT), 8080) self.assertEqual(config.get_int(ConfigParameter.HTTP_PORT), 8080)
try: try:
self.conf.get_bool(ConfigParameter.DB_SQLITE_FILE) config.get_bool(ConfigParameter.DB_SQLITE_FILE)
self.assertTrue(False) self.assertTrue(False)
except AssertionError: except AssertionError:
pass pass
def test_put(self): def test_put(self):
self.assertFalse(self.conf.exists(ConfigParameter.LANG)) self.assertFalse(config.exists(ConfigParameter.LANG))
self.conf.put(ConfigParameter.LANG, EXPECTED_LANG) config.put(ConfigParameter.LANG, EXPECTED_LANG)
self.assertTrue(self.conf.exists(ConfigParameter.LANG)) self.assertTrue(config.exists(ConfigParameter.LANG))
self.assertEqual(self.conf.get(ConfigParameter.LANG), EXPECTED_LANG) self.assertEqual(config.get(ConfigParameter.LANG), EXPECTED_LANG)

@ -13,8 +13,7 @@ from stacosys.interface import form
@pytest.fixture @pytest.fixture
def client(): def client():
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
database.setup(":memory:") database.setup(":memory:")
app.config.update(SITE_REDIRECT="/redirect")
logger.info(f"start interface {form}") logger.info(f"start interface {form}")
return app.test_client() return app.test_client()

Loading…
Cancel
Save