use rabbitmq

pull/6/head
Yax 7 years ago
parent 64d836d7f9
commit 9d096e86cb

@ -1,6 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# TODO move to JSON config
zmq = {'pub_port': 7701, 'sub_port':7702}

@ -24,16 +24,16 @@ json_schema = """
"rss": {
"$ref": "#/definitions/RSS"
},
"zmq": {
"$ref": "#/definitions/Zmq"
"rabbitmq": {
"$ref": "#/definitions/Rabbitmq"
}
},
"required": [
"general",
"http",
"rabbitmq",
"rss",
"security",
"zmq"
"security"
],
"title": "stacosys"
},
@ -79,6 +79,43 @@ json_schema = """
],
"title": "http"
},
"Rabbitmq": {
"type": "object",
"additionalProperties": false,
"properties": {
"active": {
"type": "boolean"
},
"host": {
"type": "string"
},
"port": {
"type": "integer"
},
"username": {
"type": "string"
},
"password": {
"type": "string"
},
"vhost": {
"type": "string"
},
"exchange": {
"type": "string"
}
},
"required": [
"active",
"exchange",
"host",
"password",
"port",
"username",
"vhost"
],
"title": "rabbitmq"
},
"RSS": {
"type": "object",
"additionalProperties": false,
@ -116,27 +153,6 @@ json_schema = """
"secret"
],
"title": "security"
},
"Zmq": {
"type": "object",
"additionalProperties": false,
"properties": {
"host": {
"type": "string"
},
"pub_port": {
"type": "integer"
},
"sub_port": {
"type": "integer"
}
},
"required": [
"host",
"pub_port",
"sub_port"
],
"title": "zmq"
}
}
}

@ -25,7 +25,7 @@ import processor
from interface import api
from interface import form
from interface import report
from interface import zclient
from interface import rmqclient
# configure logging
def configure_logging(level):
@ -50,7 +50,7 @@ logger = logging.getLogger(__name__)
database.setup()
# start broker client
zclient.start()
rmqclient.start()
# start processor
template_path = os.path.abspath(os.path.join(current_path, '../templates'))

@ -18,7 +18,7 @@ import json
from conf import config
import PyRSS2Gen
import markdown
import zmq
import pika
logger = logging.getLogger(__name__)
queue = Queue()
@ -26,11 +26,6 @@ proc = None
env = None
context = zmq.Context()
zpub = context.socket(zmq.PUB)
zpub.connect('tcp://127.0.0.1:{}'.format(config.zmq['sub_port']))
class Processor(Thread):
def stop(self):
@ -142,7 +137,13 @@ def reply_comment_email(data):
token = m.group(2)
# retrieve site and comment rows
comment = Comment.select().where(Comment.id == comment_id).get()
try:
comment = Comment.select().where(Comment.id == comment_id).get()
except:
logger.warn('unknown comment %d' % comment_id)
send_delete_command(data)
return
if comment.site.token != token:
logger.warn('ignore corrupted email. Unknown token %d' % comment_id)
return
@ -152,7 +153,7 @@ def reply_comment_email(data):
return
# accept email: request to delete
send_deletion_order(data)
send_delete_command(data)
# safe logic: no answer or unknown answer is a go for publishing
if message[:2].upper() == 'NO':
@ -394,26 +395,37 @@ def rss(token, onstart=False):
rss.write_xml(open(config.rss['file'], 'w'), encoding='utf-8')
def get_rmq_channel():
credentials = pika.PlainCredentials(
config.rabbitmq['username'], config.rabbitmq['password'])
connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[
'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost']))
channel = connection.channel()
return channel
def mail(to_email, subject, message):
zmsg = {
'topic': 'email:send',
body = {
'to': to_email,
'subject': subject,
'content': message
}
# TODO test broker failure and find alternative
zpub.send_string(json.dumps(zmsg, indent=False, sort_keys=False))
channel = get_rmq_channel()
channel.basic_publish(exchange=config.rabbitmq['exchange'],
routing_key='mail.command.send',
body=json.dumps(body, indent=False, sort_keys=False))
logger.debug('Email for %s posted' % to_email)
#logger.warn('Cannot post email for %s' % to_email)
def send_deletion_order(zmsg):
zmsg['topic'] = 'email:delete'
zpub.send_string(json.dumps(zmsg, indent=False, sort_keys=False))
logger.debug('Email accepted. Deletion request sent for %s' % zmsg)
def send_delete_command(content):
channel = get_rmq_channel()
channel.basic_publish(exchange=config.rabbitmq['exchange'],
routing_key='mail.command.delete',
body=json.dumps(content, indent=False, sort_keys=False))
logger.debug('Email accepted. Delete request sent for %s' % content)
def get_template(name):

@ -0,0 +1,55 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
from conf import config
from threading import Thread
import logging
import json
from core import processor
logger = logging.getLogger(__name__)
def process_message(chan, method, properties, body):
topic = method.routing_key
data = json.loads(body)
if topic == 'mail.message':
logger.info('new message => {}'.format(data))
processor.enqueue({'request': 'new_mail', 'data': data})
else:
logger.warn('unsupported message [topic={}]'.format(topic))
class MessageConsumer(Thread):
def run(self):
credentials = pika.PlainCredentials(
config.rabbitmq['username'], config.rabbitmq['password'])
connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[
'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost']))
channel = connection.channel()
channel.exchange_declare(exchange=config.rabbitmq['exchange'],
exchange_type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange=config.rabbitmq['exchange'],
queue=queue_name,
routing_key='mail.message')
channel.basic_consume(process_message,
queue=queue_name,
no_ack=True)
channel.start_consuming()
def stop(self):
self.loop = False
def start():
logger.info('start rmqclient')
c = MessageConsumer()
c.start()

@ -1,44 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import zmq
from conf import config
from threading import Thread
import logging
import json
from core import processor
logger = logging.getLogger(__name__)
context = zmq.Context()
def process(message):
data = json.loads(message)
if data['topic'] == 'email:mail':
logger.info('newmail => {}'.format(data))
processor.enqueue({'request': 'new_mail', 'data': data})
class Consumer(Thread):
def run(self):
zsub = context.socket(zmq.SUB)
zsub.connect('tcp://127.0.0.1:{}'.format(config.zmq['pub_port']))
zsub.setsockopt_string(zmq.SUBSCRIBE, '')
self.loop = True
while self.loop:
message = zsub.recv()
try:
process(message)
except:
logger.exception('cannot process broker message')
def stop(self):
self.loop = False
def start():
logger.info('start zclient')
c = Consumer()
c.start()

@ -28,7 +28,7 @@ def stacosys_server(config_pathname):
config.http = conf['http']
config.security = conf['security']
config.rss = conf['rss']
config.zmq = conf['zmq']
config.rabbitmq = conf['rabbitmq']
# start application
from core import app

@ -1,6 +1,6 @@
{
"general" : {
"debug": true,
"debug": false,
"lang": "fr",
"db_url": "sqlite:///db.sqlite"
},
@ -18,9 +18,13 @@
"proto": "http",
"file": "comments.xml"
},
"zmq": {
"rabbitmq": {
"active": true,
"host": "127.0.0.1",
"pub_port": 7701,
"sub_port": 7702
"port": 5672,
"username": "techuser",
"password": "tech",
"vhost": "devhub",
"exchange": "hub.topic"
}
}

@ -12,8 +12,8 @@ Markdown==2.6.11
MarkupSafe==1.0
od==1.0
peewee==2.10.2
pika==0.11.2
PyRSS2Gen==1.1
pyzmq==16.0.3
sigtools==2.0.1
six==1.11.0
Werkzeug==0.14.1

Loading…
Cancel
Save