Source code for master

#!/usr/bin/env python
# encoding: utf-8

"""
"""

from __future__ import absolute_import

import glob
import json
import logging
import netifaces
import os
import pymongo
import signal
import sys
import threading

import master.models
from master.models import *
from master.lib.mongo_oplog_watcher import OplogWatcher, OplogPrinter
from master.lib.amqp_man import AmqpManager
import master.watchers

logging.basicConfig(level=logging.DEBUG)

def _signal_handler(signum, frame):
	"""Shut down the running Master worker

	:signum: Signal number (e.g. signal.SIGINT, etc)
	:frame: Python frame (I think)
	:returns: None

	"""
	print("handling signal")
	Master.instance().stop()

def _install_sig_handlers():
	"""Install signal handlers
	"""
	print("installing signal handlers")
	signal.signal(signal.SIGINT, _signal_handler)
	signal.signal(signal.SIGTERM, _signal_handler)

[docs]class TalusDBWatcher(OplogWatcher): """A class to watch the mongodb for changes""" def __init__(self, parent_log=None, *args, **kwargs): """docstring for TalusDBWatcher constructor :db: database name, default to "talus" :collection: name of the collection to filter on, default to ``None`` """ threading.Thread.__init__(self) OplogWatcher.__init__(self, *args, **kwargs) # { <mod_name>: [watchers], ... } self._watchers = {} if parent_log is None: self._log = logging.getLogger("DB-WATCH") else: self._log = parent_log.getChild("DB-WATCH")
[docs] def run(self): self._log.info("running") super(TalusDBWatcher, self).run() for collection,watchers in self._watchers.iteritems(): for watcher in watchers: watcher.stop()
[docs] def stop(self): """Stop the database watcher :returns: TODO """ self._log.info("stopping") self._running.clear()
[docs] def add_watcher(self, collection, watcher): self._watchers.setdefault(collection, []).append(watcher)
[docs] def insert(self, ns, ts, id, obj, raw, **kwargs): """Handle new insertions into the database :ns: TODO :ts: TODO :id: TODO :obj: TODO :raw: TODO :**kwargs: TODO :returns: TODO """ self._log.info("watched insert in {}: {}".format(ns, id)) self._log.debug("received insert: {}".format(obj)) if ns in self._watchers: for watcher in self._watchers[ns]: watcher.insert(id, obj)
[docs] def update(self, ns, ts, id, mod, raw, **kwargs): """Handle new updates in the database :ns: TODO :ts: TODO :id: TODO :mod: TODO :raw: TODO :**kwargs: TODO :returns: TODO """ self._log.info("update for {}:{}".format(ns, id)) self._log.debug("modification: {}".format(mod)) if ns in self._watchers: for watcher in self._watchers[ns]: watcher.update(id, mod)
[docs] def delete(self, ns, ts, id, raw, **kwargs): """Handle new deletions in the database :ns: TODO :ts: TODO :id: TODO :raw: TODO :**kwargs: TODO :returns: TODO """ self._log.info("watched delete in {}: {}".format(ns, id)) if ns in self._watchers: for watcher in self._watchers[ns]: watcher.delete(id)
[docs]class Master(object): """The master class watches the database for changes, queues and handles amqp messages, and handles VM image conversions""" AMQP_BROADCAST_XCHG = "broadcast" AMQP_SLAVE_QUEUE = "slaves" AMQP_SLAVE_STATUS_QUEUE = "slave_status" # ------------------------- # class methods # ------------------------- _instance = None @classmethod
[docs] def instance(cls): """Return the singleton instance of the Master class :returns: TODO """ if cls._instance is None: cls._instance = cls() return cls._instance
def __init__(self): """docstring for Master constructor""" super(Master, self).__init__() self._log = logging.getLogger(self.__class__.__name__) # this will be set when the docker container is linked to talus_db self._db_conn_info = os.environ["TALUS_DB_PORT_27017_TCP"].replace("tcp://", "") self._running = threading.Event() self._watcher = None self._amqp_man = AmqpManager.instance() # TODO need a better way than just eth0 self._ip = netifaces.ifaddresses('eth0')[2][0]['addr'] self._log.info("ready") # ------------------------- # public functions # -------------------------
[docs] def run(self): """Run the master daemon :returns: TODO """ self._log.info("running") self._running.set() self._start_watcher() self._log.info("started watcher") self._amqp_man.do_start() self._amqp_listen_for_slaves() # stupid GIL while self._watcher.is_alive(): self._watcher.join(2**32) self._shutdown_singletons() self._log.info("done running")
[docs] def stop(self): """Stop the master service from running :returns: TODO """ self._log.info("stopping") self._running.clear() self._watcher.stop()
[docs] def handle_signal(self, sig, frame): """TODO: Docstring for handle_signal. :sig: TODO :frame: TODO :returns: TODO """ self.stop() # ------------------------- # private functions # -------------------------
def _amqp_listen_for_slaves(self): """Setup amqp queues to listen/respond to slaves """ self._amqp_man.declare_exchange( self.AMQP_BROADCAST_XCHG, "fanout" ) self._amqp_man.declare_queue(self.AMQP_SLAVE_QUEUE, durable = True, auto_delete = False, exclusive = False ) self._amqp_man.declare_queue(self.AMQP_SLAVE_STATUS_QUEUE, durable = True, auto_delete = False, exclusive = False ) self._amqp_man.consume_queue(self.AMQP_SLAVE_STATUS_QUEUE, self._on_slave_status) def _on_slave_status(self, channel, method, props, body): """Slaves will respond to commands/queries via this queue. Slaves will also send an initial connection message via this queue in order to get configuration details and report basic stats """ self._amqp_man.ack_method(method) data = json.loads(body) switch = dict( new = self._handle_slave_new, status = self._handle_slave_status, heartbeat = self._handle_slave_heartbeat, ) if "type" not in data or data["type"] not in switch: self._log.warn("recieved slave data is in the wrong format") else: switch[data["type"]](data) def _handle_slave_status(self, data): """Handle slave status messages""" if "uuid" not in data: self._log.warn("got a slave status message that does not include a uuid") self._log.debug(data) return uuid = data["uuid"] self._log.info("got slave status update message") slaves = Slave.objects(uuid=uuid) if len(slaves) == 0: self._log.warn("got a slave status message that does not specify its uuid!") return slave = slaves[0] if "running_vms" in data: slave.running_vms = data["running_vms"] if "total_jobs_run" in data: slave.total_jobs_run = data["total_jobs_run"] if "vms" in data: slave.vms = data["vms"] slave.save() def _handle_slave_new(self, data): """Handle new slave messages""" self._log.info("handling new slave message: {}".format(data)) # these must be unique Slave.objects( ip=data["ip"], hostname=data["hostname"] ).delete() slave = Slave() slave.ip = data["ip"] slave.hostname = data["hostname"] slave.uuid = data["uuid"] slave.save() self._amqp_man.queue_msg( json.dumps(dict( type = "config", db = self._ip, code = dict( loc = "http://{}/code_cache".format(self._ip), # TODO put these in a config file username = "talus_job", password = "Monkeys eat bananas and poop all day." ), image_url = "http://{}/images/".format(self._ip) )), self.AMQP_SLAVE_QUEUE + "_" + slave.uuid ) def _handle_slave_heartbeat(self, data): """Handle slave heartbeats""" self._log.info("handling slave heartbeat: {}".format(data)) def _shutdown_singletons(self): self._log.info("shutting down singletons") AmqpManager.instance().stop() def _start_watcher(self): """Create and start the DB watcher :returns: TODO """ self._watcher = TalusDBWatcher( parent_log = self._log, connection = pymongo.MongoClient(self._db_conn_info.split(":")[0], 27017) ) self._watcher.start() # import all of the DB watchers defined in master/watchers/ for filename in glob.glob(os.path.join(os.path.dirname(__file__), "watchers", "*.py")): if os.path.basename(filename) == "__init__.py": continue mod_name = os.path.basename(filename).replace(".py", "") mod_base = __import__("master.watchers", globals(), locals(), fromlist=[mod_name]) mod = getattr(mod_base, mod_name) for name in dir(mod): item = getattr(mod, name) if type(item) is not type: continue if item != master.watchers.WatcherBase and issubclass(item, master.watchers.WatcherBase): watcher = getattr(mod, name)(self._log) self._watcher.add_watcher(watcher.collection, watcher)
[docs]def main(): _install_sig_handlers() master.models.do_connect() m = Master.instance() m.run()
if __name__ == "__main__": main()