diff --git a/app/interface/rmqclient.py b/app/interface/rmqclient.py index b97cb58..1edc8d2 100644 --- a/app/interface/rmqclient.py +++ b/app/interface/rmqclient.py @@ -7,56 +7,44 @@ from threading import Thread import logging import json from core import processor +from util import rabbit logger = logging.getLogger(__name__) +class MailConsumer(rabbit.Consumer): -def process_message(chan, method, properties, body): + def process(self, channel, method, properties, body): + try: + topic = method.routing_key + data = json.loads(body) - try: - topic = method.routing_key - data = json.loads(body) - - if topic == 'mail.message': - if "STACOSYS" in data['subject']: - logger.info('new message => {}'.format(data)) - processor.enqueue({'request': 'new_mail', 'data': data}) + if topic == 'mail.message': + if "STACOSYS" in data['subject']: + logger.info('new message => {}'.format(data)) + processor.enqueue({'request': 'new_mail', 'data': data}) + else: + logger.info('ignore message => {}'.format(data)) else: - logger.info('ignore message => {}'.format(data)) - else: - logger.warn('unsupported message [topic={}]'.format(topic)) - except: - logger.exception('cannot process message') - - -class MessageConsumer(Thread): - - def run(self): - - credentials = pika.PlainCredentials( - config.rabbitmq['username'], config.rabbitmq['password']) - connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[ - 'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost'])) + logger.warn('unsupported message [topic={}]'.format(topic)) + except: + logger.exception('cannot process message') - channel = connection.channel() - channel.exchange_declare(exchange=config.rabbitmq['exchange'], - exchange_type='topic') - result = channel.queue_declare(exclusive=True) - queue_name = result.method.queue - channel.queue_bind(exchange=config.rabbitmq['exchange'], - queue=queue_name, - routing_key='mail.message') - channel.basic_consume(process_message, - queue=queue_name, - no_ack=True) - channel.start_consuming() +def start(): + logger.info('start rmqclient') + #c = MessageConsumer() + #c.start() - def stop(self): - self.loop = False + credentials = pika.PlainCredentials(config.rabbitmq['username'], config.rabbitmq['password']) + parameters = pika.ConnectionParameters( + host=config.rabbitmq['host'], + port=config.rabbitmq['port'], + credentials=credentials, + virtual_host=config.rabbitmq['vhost'] + ) -def start(): - logger.info('start rmqclient') - c = MessageConsumer() + connection = rabbit.Connection(parameters) + c = MailConsumer(connection, config.rabbitmq['exchange'], 'mail.message') c.start() + #print('exit rmqclient ' + str(c)) \ No newline at end of file diff --git a/app/util/rabbit.py b/app/util/rabbit.py new file mode 100644 index 0000000..d353d8f --- /dev/null +++ b/app/util/rabbit.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +# -*- coding: utf-8 - *- + +import logging +import pika +import time +from threading import Thread + +logger = logging.getLogger(__name__) + +EXCHANGE_TYPE = "topic" +CONNECT_DELAY = 3 + +class Connection: + + def __init__(self, connection_parameters): + self._connection_parameters = connection_parameters + + def open(self): + + self._connection = None + while True: + try: + self._connection = pika.BlockingConnection( + self._connection_parameters) + break + except: + time.sleep(CONNECT_DELAY) + logger.warn("rabbitmq connection failure. try again...") + + def close(self): + self._connection.close() + self._connection = None + + def get(self): + return self._connection + + +class Consumer(Thread): + + _connection = None + _channel = None + _queue_name = None + + def __init__(self, connection, exchange_name, routing_key): + Thread.__init__(self) + self._connection = connection + self._exchange_name = exchange_name + self._routing_key = routing_key + + def configure(self): + + self._connection = None + self._channel = None + while True: + try: + + self._channel = self._connection.channel() + self._channel.exchange_declare( + exchange=self._exchange_name, exchange_type=EXCHANGE_TYPE + ) + + result = self._channel.queue_declare(exclusive=True) + self._queue_name = result.method.queue + self._channel.queue_bind( + exchange=self._exchange_name, + queue=self._queue_name, + routing_key=self._routing_key, + ) + break + except: + time.sleep(CONNECT_DELAY) + logger.warn("connection failure. try again...") + + def run(self): + + self.configure() + self._channel.basic_consume( + self.process, queue=self._queue_name, no_ack=True) + self._channel.start_consuming() + + def process(self, channel, method, properties, body): + raise NotImplemented