rabbitmq util

pull/6/head
Yax 6 years ago
parent c86b5b6e87
commit e95f59bb87

@ -7,56 +7,44 @@ from threading import Thread
import logging
import json
from core import processor
from util import rabbit
logger = logging.getLogger(__name__)
class MailConsumer(rabbit.Consumer):
def process_message(chan, method, properties, body):
def process(self, channel, method, properties, body):
try:
topic = method.routing_key
data = json.loads(body)
try:
topic = method.routing_key
data = json.loads(body)
if topic == 'mail.message':
if "STACOSYS" in data['subject']:
logger.info('new message => {}'.format(data))
processor.enqueue({'request': 'new_mail', 'data': data})
if topic == 'mail.message':
if "STACOSYS" in data['subject']:
logger.info('new message => {}'.format(data))
processor.enqueue({'request': 'new_mail', 'data': data})
else:
logger.info('ignore message => {}'.format(data))
else:
logger.info('ignore message => {}'.format(data))
else:
logger.warn('unsupported message [topic={}]'.format(topic))
except:
logger.exception('cannot process message')
class MessageConsumer(Thread):
def run(self):
credentials = pika.PlainCredentials(
config.rabbitmq['username'], config.rabbitmq['password'])
connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[
'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost']))
logger.warn('unsupported message [topic={}]'.format(topic))
except:
logger.exception('cannot process message')
channel = connection.channel()
channel.exchange_declare(exchange=config.rabbitmq['exchange'],
exchange_type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange=config.rabbitmq['exchange'],
queue=queue_name,
routing_key='mail.message')
channel.basic_consume(process_message,
queue=queue_name,
no_ack=True)
channel.start_consuming()
def start():
logger.info('start rmqclient')
#c = MessageConsumer()
#c.start()
def stop(self):
self.loop = False
credentials = pika.PlainCredentials(config.rabbitmq['username'], config.rabbitmq['password'])
parameters = pika.ConnectionParameters(
host=config.rabbitmq['host'],
port=config.rabbitmq['port'],
credentials=credentials,
virtual_host=config.rabbitmq['vhost']
)
def start():
logger.info('start rmqclient')
c = MessageConsumer()
connection = rabbit.Connection(parameters)
c = MailConsumer(connection, config.rabbitmq['exchange'], 'mail.message')
c.start()
#print('exit rmqclient ' + str(c))

@ -0,0 +1,83 @@
#!/usr/bin/env python
# -*- coding: utf-8 - *-
import logging
import pika
import time
from threading import Thread
logger = logging.getLogger(__name__)
EXCHANGE_TYPE = "topic"
CONNECT_DELAY = 3
class Connection:
def __init__(self, connection_parameters):
self._connection_parameters = connection_parameters
def open(self):
self._connection = None
while True:
try:
self._connection = pika.BlockingConnection(
self._connection_parameters)
break
except:
time.sleep(CONNECT_DELAY)
logger.warn("rabbitmq connection failure. try again...")
def close(self):
self._connection.close()
self._connection = None
def get(self):
return self._connection
class Consumer(Thread):
_connection = None
_channel = None
_queue_name = None
def __init__(self, connection, exchange_name, routing_key):
Thread.__init__(self)
self._connection = connection
self._exchange_name = exchange_name
self._routing_key = routing_key
def configure(self):
self._connection = None
self._channel = None
while True:
try:
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=self._exchange_name, exchange_type=EXCHANGE_TYPE
)
result = self._channel.queue_declare(exclusive=True)
self._queue_name = result.method.queue
self._channel.queue_bind(
exchange=self._exchange_name,
queue=self._queue_name,
routing_key=self._routing_key,
)
break
except:
time.sleep(CONNECT_DELAY)
logger.warn("connection failure. try again...")
def run(self):
self.configure()
self._channel.basic_consume(
self.process, queue=self._queue_name, no_ack=True)
self._channel.start_consuming()
def process(self, channel, method, properties, body):
raise NotImplemented
Loading…
Cancel
Save