#!/usr/bin/env python
# encoding: utf-8
import logging
import os
import threading
import time
import pika
pika_logger = logging.getLogger('pika')
pika_logger.setLevel(logging.CRITICAL)
[docs]class AmqpQueueHandler(threading.Thread):
def __init__(self, callback, queue_name, channel, lock, no_ack, running):
super(AmqpQueueHandler, self).__init__()
self.lock = lock
self.channel = channel
self.queue_name = queue_name
self.callback = callback
self.no_ack = no_ack
self._running = running
self._log = logging.getLogger("AmqpMan").getChild(self.queue_name)
[docs] def run(self):
self._running.set()
self._log.debug("monitoring")
while self._running.is_set():
with self.lock:
method, props, body = self.channel.basic_get(
self.queue_name,
no_ack=self.no_ack
)
if method is None:
time.sleep(0.1)
continue
self._log.debug("recieved message")
self.callback(self.channel, method, props, body)
self._log.debug("finished")
[docs] def stop(self):
self._log.debug("stopping")
self._running.clear()
[docs]class AmqpManager(threading.Thread):
"""A class to manage jobs (starting/stopping/cancelling/etc)"""
AMQP_JOB_QUEUE = "jobs"
AMQP_JOB_RESULT_QUEUE = "job_results"
_INSTANCE = None
@classmethod
[docs] def instance(cls, host=None):
if cls._INSTANCE is None:
cls._INSTANCE = cls(host)
return cls._INSTANCE
def __init__(self, host=None):
"""init the job manager
"""
super(AmqpManager, self).__init__()
if host is None and "TALUS_AMQP_PORT_5672_TCP" in os.environ:
host = os.environ["TALUS_AMQP_PORT_5672_TCP"].replace("tcp://", "")
self._amqp_host = host
self._amqp_conn = None
self._amqp_channel = None
self._running = threading.Event()
self._amqp_connected = threading.Event()
self._amqp_consume_thread = None
self._log = logging.getLogger("AmqpMan")
self._cached_exchange_declares = []
self._cached_bind_queues = []
self._cached_queue_declares = []
self._cached_queue_consumes = []
self._queue_props = {}
self._queue_handlers = {}
self._amqp_lock = threading.Lock()
self._handlers_lock = threading.Lock()
[docs] def do_start(self):
if self._running.is_set():
return
self.start()
[docs] def run(self):
"""Run the job manager. Only one of these should ever be running at a time
:returns: TODO
"""
self._log.info("running")
self._running.set()
self._amqp_connect()
for exchange_name, type in self._cached_exchange_declares:
self.declare_exchange(exchange_name, type)
for queue_name, props in self._cached_queue_declares:
self.declare_queue(queue_name, **props)
for exchange_name, queue in self._cached_bind_queues:
self.bind_queue(exchange_name, queue)
for queue_name, callback, no_ack in self._cached_queue_consumes:
self.consume_queue(queue_name, callback, no_ack=no_ack)
self._amqp_ioloop()
self._log.info("finished")
[docs] def stop(self):
"""Stop the job manager
:returns: TODO
"""
self._log.info("stopping")
if self._amqp_channel is not None:
try:
self._amqp_channel.stop_consuming()
except RuntimeError as e:
pass
try:
self._amqp_conn.close()
except RuntimeError as e:
pass
self._running.clear()
[docs] def get_message_count(self, queue_name, **props):
""" Get the size of the amqp queue ``queue_name``. Note that the ``props``
kwargs must match the declaration properties of the queue. Getting the
queue size is done by redeclaring the queue with the same properties, with
the additional ``passive=True`` property set.
:queue_name: The name of the queue
"""
if len(props) is None and queue_name in self._queue_props:
props = self._queue_props[queue_name]
with self._amqp_lock:
method = self._amqp_channel.queue_declare(queue_name, passive=True, **props)
res = method.method.message_count
return res
[docs] def declare_exchange(self, name, type):
"""Declare an exchange named ``name`` and of type ``type``
"""
self._log.info("declaring exchange {}, type {}".format(name, type))
if self._amqp_channel is None:
self._cached_exchange_declares.append((name, type))
else:
self._amqp_channel.exchange_declare(
exchange=name,
type=type
)
[docs] def bind_queue(self, exchange, queue):
"""Bind the queue ``queue`` to the exchange ``exchange``
"""
self._log.info("binding queue {!r} to exchange {!r}".format(queue, exchange))
if self._amqp_channel is None:
self._cached_bind_queues.append((exchange, queue))
else:
self._amqp_channel.queue_bind( exchange=exchange,
queue=queue
)
[docs] def declare_queue(self, queue_name, **props):
"""Declare the queue ``queue_name`` with properties defined in
``**props`` kwargs. Popular properties to set:
* durable
* exclusive
* auto_delete
"""
self._log.info("declaring queue {}".format(queue_name))
self._queue_props[queue_name] = props
if self._amqp_channel is None:
self._cached_queue_declares.append((queue_name, props))
else:
self._amqp_channel.queue_declare(
queue_name,
**props
)
[docs] def consume_queue(self, queue_name, callback, no_ack=False):
"""Consume from the queue ``queue_name`` with callback ``callback``
"""
self._log.info("will consume from queue {}".format(queue_name))
if self._amqp_channel is None:
self._cached_queue_consumes.append((queue_name, callback, no_ack))
else:
with self._handlers_lock:
handler = AmqpQueueHandler(
callback,
queue_name,
self._amqp_channel,
self._amqp_lock,
no_ack,
self._running
)
self._queue_handlers[queue_name] = handler
handler.start()
[docs] def wait_for_ready(self, timeout=2**31):
"""Wait until the AMQP manager is connected and ready to go
"""
self._log.info("waiting until connected")
self._amqp_connected.wait(timeout)
# stupid, not sure if neccessary
time.sleep(3)
self._log.info("connected!")
[docs] def queue_msg(self, msg, queue_name, **props):
"""Queue the message ``msg`` in the queue ``queue_name``
:param str msg: The message to send (str or unicode)
:param str queue_name: The queue to put the message in
:param dict **props: Any additional props (exchange, etc)
"""
default_props = dict(
exchange = ""
)
default_props.update(props)
with self._amqp_lock:
self._amqp_channel.basic_publish(
routing_key=queue_name,
body=msg,
**default_props
)
[docs] def ack_method(self, method):
"""basic_ack the method
:method: The method to ack
"""
with self._amqp_lock:
self._amqp_channel.basic_ack(delivery_tag=method.delivery_tag)
# ---------------------------------------
# amqp related
# ---------------------------------------
def _amqp_ioloop(self):
"""
"""
while self._running.is_set():
time.sleep(0.1)
def _amqp_connect(self):
"""
"""
self._log.info("connecting to amqp: {}".format(self._amqp_host))
self._amqp_conn = pika.BlockingConnection(pika.URLParameters("amqp://guest:guest@" + self._amqp_host))
self._amqp_channel = self._amqp_conn.channel()
self._amqp_connected.set()
def _job_queue(self, channel, method, properties, body):
"""Called when an AMQP message is received
"""
print("received {}".format(body))