diff --git a/app/core/processor.py b/app/core/processor.py index a2c98ab..e089f8b 100644 --- a/app/core/processor.py +++ b/app/core/processor.py @@ -16,6 +16,7 @@ from models.comment import Comment from helpers.hashing import md5 import json from conf import config +from util import rabbit import PyRSS2Gen import markdown import pika @@ -416,12 +417,16 @@ def rss(token, onstart=False): def get_rabbitmq_connection(): + credentials = pika.PlainCredentials( config.rabbitmq['username'], config.rabbitmq['password']) - connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[ - 'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost'])) - return connection - + parameters = pika.ConnectionParameters( + host=config.rabbitmq['host'], + port=config.rabbitmq['port'], + credentials=credentials, + virtual_host=config.rabbitmq['vhost'] + ) + return rabbit.Connection(parameters) def mail(to_email, subject, message): @@ -430,23 +435,25 @@ def mail(to_email, subject, message): 'subject': subject, 'content': message } - connection = get_rabbitmq_connection() + connector = get_rabbitmq_connection() + connection = connector.open() channel = connection.channel() channel.basic_publish(exchange=config.rabbitmq['exchange'], routing_key='mail.command.send', body=json.dumps(body, indent=False, sort_keys=False)) - connection.close() + connector.close() logger.debug('Email for %s posted' % to_email) def send_delete_command(content): - connection = get_rabbitmq_connection() + connector = get_rabbitmq_connection() + connection = connector.open() channel = connection.channel() channel.basic_publish(exchange=config.rabbitmq['exchange'], routing_key='mail.command.delete', body=json.dumps(content, indent=False, sort_keys=False)) - connection.close() + connector.close() logger.debug('Email accepted. Delete request sent for %s' % content) diff --git a/app/interface/rmqclient.py b/app/interface/rmqclient.py index 1edc8d2..51830fb 100644 --- a/app/interface/rmqclient.py +++ b/app/interface/rmqclient.py @@ -11,6 +11,7 @@ from util import rabbit logger = logging.getLogger(__name__) + class MailConsumer(rabbit.Consumer): def process(self, channel, method, properties, body): @@ -31,20 +32,18 @@ class MailConsumer(rabbit.Consumer): def start(): - logger.info('start rmqclient') - #c = MessageConsumer() - #c.start() - credentials = pika.PlainCredentials(config.rabbitmq['username'], config.rabbitmq['password']) + logger.info('start rmqclient') + credentials = pika.PlainCredentials( + config.rabbitmq['username'], config.rabbitmq['password']) parameters = pika.ConnectionParameters( - host=config.rabbitmq['host'], - port=config.rabbitmq['port'], - credentials=credentials, + host=config.rabbitmq['host'], + port=config.rabbitmq['port'], + credentials=credentials, virtual_host=config.rabbitmq['vhost'] ) connection = rabbit.Connection(parameters) c = MailConsumer(connection, config.rabbitmq['exchange'], 'mail.message') c.start() - #print('exit rmqclient ' + str(c)) \ No newline at end of file diff --git a/app/stacosys.py b/app/run.py similarity index 100% rename from app/stacosys.py rename to app/run.py diff --git a/app/util/rabbit.py b/app/util/rabbit.py index d353d8f..042e272 100644 --- a/app/util/rabbit.py +++ b/app/util/rabbit.py @@ -26,7 +26,8 @@ class Connection: break except: time.sleep(CONNECT_DELAY) - logger.warn("rabbitmq connection failure. try again...") + logger.exception('rabbitmq connection failure. try again...') + return self._connection def close(self): self._connection.close() @@ -38,24 +39,23 @@ class Connection: class Consumer(Thread): - _connection = None + _connector = None _channel = None _queue_name = None - def __init__(self, connection, exchange_name, routing_key): + def __init__(self, connector, exchange_name, routing_key): Thread.__init__(self) - self._connection = connection + self._connector = connector self._exchange_name = exchange_name self._routing_key = routing_key - def configure(self): + def configure(self, connection): - self._connection = None self._channel = None while True: try: - self._channel = self._connection.channel() + self._channel = connection.channel() self._channel.exchange_declare( exchange=self._exchange_name, exchange_type=EXCHANGE_TYPE ) @@ -69,12 +69,13 @@ class Consumer(Thread): ) break except: + logger.exception('configuration failure. try again...') time.sleep(CONNECT_DELAY) - logger.warn("connection failure. try again...") def run(self): - self.configure() + self._connector.open() + self.configure(self._connector.get()) self._channel.basic_consume( self.process, queue=self._queue_name, no_ack=True) self._channel.start_consuming() diff --git a/run.sh b/run.sh index 3300b92..a3b0364 100755 --- a/run.sh +++ b/run.sh @@ -1,3 +1,3 @@ #!/bin/sh -python app/stacosys.py "$@" +python app/run.py "$@"