rabbit connection

pull/6/head
Yax 6 years ago
parent e95f59bb87
commit 646508b65e

@ -16,6 +16,7 @@ from models.comment import Comment
from helpers.hashing import md5 from helpers.hashing import md5
import json import json
from conf import config from conf import config
from util import rabbit
import PyRSS2Gen import PyRSS2Gen
import markdown import markdown
import pika import pika
@ -416,12 +417,16 @@ def rss(token, onstart=False):
def get_rabbitmq_connection(): def get_rabbitmq_connection():
credentials = pika.PlainCredentials( credentials = pika.PlainCredentials(
config.rabbitmq['username'], config.rabbitmq['password']) config.rabbitmq['username'], config.rabbitmq['password'])
connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq['host'], port=config.rabbitmq[ parameters = pika.ConnectionParameters(
'port'], credentials=credentials, virtual_host=config.rabbitmq['vhost'])) host=config.rabbitmq['host'],
return connection port=config.rabbitmq['port'],
credentials=credentials,
virtual_host=config.rabbitmq['vhost']
)
return rabbit.Connection(parameters)
def mail(to_email, subject, message): def mail(to_email, subject, message):
@ -430,23 +435,25 @@ def mail(to_email, subject, message):
'subject': subject, 'subject': subject,
'content': message 'content': message
} }
connection = get_rabbitmq_connection() connector = get_rabbitmq_connection()
connection = connector.open()
channel = connection.channel() channel = connection.channel()
channel.basic_publish(exchange=config.rabbitmq['exchange'], channel.basic_publish(exchange=config.rabbitmq['exchange'],
routing_key='mail.command.send', routing_key='mail.command.send',
body=json.dumps(body, indent=False, sort_keys=False)) body=json.dumps(body, indent=False, sort_keys=False))
connection.close() connector.close()
logger.debug('Email for %s posted' % to_email) logger.debug('Email for %s posted' % to_email)
def send_delete_command(content): def send_delete_command(content):
connection = get_rabbitmq_connection() connector = get_rabbitmq_connection()
connection = connector.open()
channel = connection.channel() channel = connection.channel()
channel.basic_publish(exchange=config.rabbitmq['exchange'], channel.basic_publish(exchange=config.rabbitmq['exchange'],
routing_key='mail.command.delete', routing_key='mail.command.delete',
body=json.dumps(content, indent=False, sort_keys=False)) body=json.dumps(content, indent=False, sort_keys=False))
connection.close() connector.close()
logger.debug('Email accepted. Delete request sent for %s' % content) logger.debug('Email accepted. Delete request sent for %s' % content)

@ -11,6 +11,7 @@ from util import rabbit
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class MailConsumer(rabbit.Consumer): class MailConsumer(rabbit.Consumer):
def process(self, channel, method, properties, body): def process(self, channel, method, properties, body):
@ -31,20 +32,18 @@ class MailConsumer(rabbit.Consumer):
def start(): def start():
logger.info('start rmqclient')
#c = MessageConsumer()
#c.start()
credentials = pika.PlainCredentials(config.rabbitmq['username'], config.rabbitmq['password']) logger.info('start rmqclient')
credentials = pika.PlainCredentials(
config.rabbitmq['username'], config.rabbitmq['password'])
parameters = pika.ConnectionParameters( parameters = pika.ConnectionParameters(
host=config.rabbitmq['host'], host=config.rabbitmq['host'],
port=config.rabbitmq['port'], port=config.rabbitmq['port'],
credentials=credentials, credentials=credentials,
virtual_host=config.rabbitmq['vhost'] virtual_host=config.rabbitmq['vhost']
) )
connection = rabbit.Connection(parameters) connection = rabbit.Connection(parameters)
c = MailConsumer(connection, config.rabbitmq['exchange'], 'mail.message') c = MailConsumer(connection, config.rabbitmq['exchange'], 'mail.message')
c.start() c.start()
#print('exit rmqclient ' + str(c))

@ -26,7 +26,8 @@ class Connection:
break break
except: except:
time.sleep(CONNECT_DELAY) time.sleep(CONNECT_DELAY)
logger.warn("rabbitmq connection failure. try again...") logger.exception('rabbitmq connection failure. try again...')
return self._connection
def close(self): def close(self):
self._connection.close() self._connection.close()
@ -38,24 +39,23 @@ class Connection:
class Consumer(Thread): class Consumer(Thread):
_connection = None _connector = None
_channel = None _channel = None
_queue_name = None _queue_name = None
def __init__(self, connection, exchange_name, routing_key): def __init__(self, connector, exchange_name, routing_key):
Thread.__init__(self) Thread.__init__(self)
self._connection = connection self._connector = connector
self._exchange_name = exchange_name self._exchange_name = exchange_name
self._routing_key = routing_key self._routing_key = routing_key
def configure(self): def configure(self, connection):
self._connection = None
self._channel = None self._channel = None
while True: while True:
try: try:
self._channel = self._connection.channel() self._channel = connection.channel()
self._channel.exchange_declare( self._channel.exchange_declare(
exchange=self._exchange_name, exchange_type=EXCHANGE_TYPE exchange=self._exchange_name, exchange_type=EXCHANGE_TYPE
) )
@ -69,12 +69,13 @@ class Consumer(Thread):
) )
break break
except: except:
logger.exception('configuration failure. try again...')
time.sleep(CONNECT_DELAY) time.sleep(CONNECT_DELAY)
logger.warn("connection failure. try again...")
def run(self): def run(self):
self.configure() self._connector.open()
self.configure(self._connector.get())
self._channel.basic_consume( self._channel.basic_consume(
self.process, queue=self._queue_name, no_ack=True) self.process, queue=self._queue_name, no_ack=True)
self._channel.start_consuming() self._channel.start_consuming()

@ -1,3 +1,3 @@
#!/bin/sh #!/bin/sh
python app/stacosys.py "$@" python app/run.py "$@"

Loading…
Cancel
Save