Source code for master.lib.jobs

#!/usr/bin/env python
# encoding: utf-8

import json
import logging
import os
try:
    import Queue as Q  # ver. < 3.0
except ImportError:
	import queue as Q
PQ = Q.PriorityQueue
import threading
import time

from master import Master
from master.lib.amqp_man import AmqpManager
from master.models import *

logging.basicConfig(level=logging.DEBUG)

[docs]class JobHandler(object): """A class to handle new jobs""" def __init__(self, job, queue_name): """init the job handler :param mongoengine.Document job: A Job model :param str queue_name: The name of the queue this job will drip into """ self.job = job self.drip_count = 0 self.queue_name = queue_name
[docs] def drip(self, drip_size): """Return items to be inserted into the queue. The ``drip_size`` is the total number in this drip. The amount yielded should be determined by the priority and the drip_size. :num: The number of items to return """ priority = self.job.priority num = int(round(drip_size * priority / 100.0)) for x in range(num): yield self.drop()
[docs] def drop(self): self.drip_count += 1 return json.dumps(dict( job = str(self.job.id), idx = self.drip_count, image = str(self.job.image.id), image_username = self.job.image.username, image_password = self.job.image.password, tool = str(self.job.task.tool.name), params = self.job.params, network = self.job.network ))
[docs]class JobManager(threading.Thread): """A class to manage jobs (starting/stopping/cancelling/etc)""" AMQP_JOB_QUEUE = "jobs" AMQP_JOB_STATUS_QUEUE = "job_status" AMQP_JOB_PROPS = dict( durable = True, auto_delete = False, exclusive = False, ) def __init__(self, drip_size=10): """init the job manager :drip_size: The number of jobs to be added to the queue at once""" super(JobManager, self).__init__() self._drip_size = drip_size self._running = threading.Event() self._job_queue_lock = threading.Lock() self._amqp_man = AmqpManager.instance() self._log = logging.getLogger("JobMan") # each job can potentially specify their own queue, this # will be a dict of Q.PriorityQueue()s self._job_amqp_queues = {} # dict of {<jobid>: JobHandler} self._job_handlers = {}
[docs] def run(self): """Run the job manager. Only one of these should ever be running at a time :returns: TODO """ self._log.info("running") self._running.set() self._amqp_man.declare_queue(self.AMQP_JOB_QUEUE, **self.AMQP_JOB_PROPS) self._amqp_man.declare_queue(self.AMQP_JOB_STATUS_QUEUE, **self.AMQP_JOB_PROPS) self._amqp_man.consume_queue(self.AMQP_JOB_STATUS_QUEUE, self._on_job_status) self._amqp_man.do_start() self._amqp_man.wait_for_ready() self._log.info("beginning main loop") self._create_handlers_for_existing() while self._running.is_set(): self._monitor_queues() time.sleep(0.2) self._log.info("finished")
[docs] def stop(self): """Stop the job manager :returns: TODO """ self._log.info("stopping") self._running.clear()
[docs] def run_job(self, job): """TODO: Docstring for run_job. :job: TODO :returns: TODO """ self._log.info("running job: {}".format(job.id)) job.priority = self._safe_priority(job.priority) job.save() queue = job.queue if queue is None or queue == "": queue = self.AMQP_JOB_QUEUE handler = JobHandler(job, queue) self._job_handlers[str(job.id)] = handler with self._job_queue_lock: job_priority_queue = self._job_amqp_queues.setdefault(queue, PQ()) job_priority_queue.put((job.priority, handler))
[docs] def stop_job(self, job): """This is intended to be called once a job has been completed (not cancelled, but completed) """ self._log.info("stopping job: {}".format(job.id)) if str(job.id) not in self._job_handlers: self._log.warn("error, job {} not in job handlers".format(job.id)) return with self._job_queue_lock: handler = self._job_handlers[str(job.id)] queue = self._job_amqp_queues[handler.queue_name] new_queue = [] for priority,handler in queue.queue: if handler.job.id == job.id: continue new_queue.append((priority, handler)) queue.queue = new_queue AmqpManager.instance().queue_msg( json.dumps(dict( type = "cancel", job = str(job.id) )), "", exchange=Master.AMQP_BROADCAST_XCHG ) job.reload() job.status = { "name": "finished" } job.save() self._log.info("stopped job: {}".format(job.id))
[docs] def cancel_job(self, job): """Cancel the job ``job`` :job: The job object to cancel :returns: None """ # TODO forcefully cancel the job (notify all slaves via amqp that # this job.id needs to be forcefully cancelled self._log.info("cancelling job: {}".format(job.id)) if str(job.id) not in self._job_handlers: self._log.warn("error, job {} not in job handlers".format(job.id)) return with self._job_queue_lock: handler = self._job_handlers[str(job.id)] queue = self._job_amqp_queues[handler.queue_name] new_queue = [] for priority,info in queue.queue: if handler.job.id == job.id: continue new_queue.append((priority, handler)) queue.queue = new_queue AmqpManager.instance().queue_msg( json.dumps(dict( type = "cancel", job = str(job.id) )), "", exchange=Master.AMQP_BROADCAST_XCHG ) job.reload() job.status = { "name": "cancelled" } job.save() self._log.info("cancelled job: {}".format(job.id)) # ---------------------------------------
def _create_handlers_for_existing(self): self._log.info("creating job handlers for existing running jobs in the database") for job in Job.objects(status__name = "running"): self.run_job(job) # --------------------------------------- # job amqp related # --------------------------------------- def _on_job_status(self, channel, method, properties, body): """Should be called when an AMQP_JOB_STATUS_QUEUE message is received - intended to be for job progress... maybe more? """ self._log.info("received job status: {}".format(body)) # just ack it immediately self._amqp_man.ack_method(method) data = json.loads(body) switch = dict( progress = self._handle_job_progress, result = self._handle_job_result, ) if data["type"] not in switch: self._log.warn("unknown job status type! {}".format(data)) return switch[data["type"]](data) def _handle_job_progress(self, data): """Handling job progress """ self._log.debug("handling job progress: {}".format(data)) Job.objects(id=data["job"]).update_one(inc__progress = data["amt"]) if data["job"] not in self._job_handlers: self._log.warn("job {} not in current list of job handlers".format(data["job"])) return handler = self._job_handlers[data["job"]] job = handler.job job.reload() if job.progress >= job.limit: self._log.debug("job {} finished ({}/{})".format(job.id, job.progress, job.limit)) self.stop_job(job) def _handle_job_result(self, data): """Handling job result """ self._log.debug("handling job result: {}".format(data)) if not isinstance(data["data"], dict): data["data"] = {"data": data["data"]} jobs = Job.objects(id=data["job"]) if len(jobs) == 0: self._log.warn("received result for a non-existent job!") job = jobs[0] result = Result() result.job = job result.type = data["type"] result.tool = data["tool"] result.data = data["data"] result.save() def _monitor_queues(self): """Drip-feed the queue based on the current state of the job priority queue. """ with self._job_queue_lock: for queue_name,job_queue in self._job_amqp_queues.iteritems(): if job_queue.qsize() == 0: time.sleep(0.5) continue num_msgs = self._amqp_man.get_message_count(queue_name) if num_msgs < self._drip_size: self._log.debug("queue has {}/{} messages, dripping some more".format(num_msgs, self._drip_size)) self._do_drip(queue_name, job_queue) def _safe_priority(self, priority): res = priority if not isinstance(res, int): res = 50 if res < 0: res = 0 elif res > 100: res = 100 return res def _do_drip(self, queue_name, job_queue): """Drip more items into the job queue :queue_name: The name of the queue to add more job items into :job_queue: The job queue to work :returns: None """ for priority,job in job_queue.queue: for drop in job.drip(self._drip_size): self._amqp_man.queue_msg(drop, queue_name)