Source code for master.watchers.result

#!/usr/bin/env python
# encoding: utf-8

import bson
import glob
import os
import sys
import uuid

import master.models
from master.watchers import WatcherBase
from master.lib.amqp_man import AmqpManager
from master import Master
from master.watchers.result_processors import ResultProcessorBase

[docs]class ResultWatcher(WatcherBase): collection = "talus.result" def __init__(self, *args, **kwargs): WatcherBase.__init__(self, *args, **kwargs) self._processors = [] for filename in glob.glob(os.path.join(os.path.dirname(__file__), "result_processors", "*.py")): filename_ = os.path.basename(filename) if filename_ == "__init__.py": continue mod_name = filename_.replace(".py", "") mod = __import__( "master.watchers.result_processors.{}".format(mod_name), globals(), locals() ) for item_name in dir(mod): item = getattr(mod, item_name) # we only care about classes if type(item) is not type: continue if issubclass(item, ResultProcessorBase): self._processors.append(item())
[docs] def insert(self, id_, obj): self._log.debug("handling insert") results = master.models.Result.objects(id=id_) if len(results) == 0: self._log.warn("WTF? couldn't find Result object that was just inserted") return result = results[0] for processor in self._processors: try: can_process = processor.can_process(result) except NotImplemented as e: self._log.error("Result processor class '{}' does not implement the can_process function!".format(processor.__class__.__name__)) continue if can_process: processor.process(result) try: result.reload() except Exception as e: self._log.info("error reloading result document, probably deleted?? TODO verify this is OK", exc_info=True) # if it's been deleted, then just return, as no other processors should be able to process it return
[docs] def update(self, id, mod): pass
[docs] def delete(self, id): pass