Source code for slave.vm.bootstrap

#!/usr/bin/env python
# encoding: utf-8

from __future__ import absolute_import

import imp
import json
import logging
import os
import requests
import re
from requests.auth import HTTPBasicAuth
import select
import socket
import shutil
import struct
import sys
import threading
import time

DEV = len(sys.argv) > 1 and sys.argv[1] == "dev"

log_file = os.path.join(os.path.dirname(__file__), __file__.split(".")[0] + ".log")
if DEV:
	logging.basicConfig(level=logging.DEBUG)
else:
	logging.basicConfig(filename=log_file, level=logging.DEBUG)
logging.getLogger("requests").setLevel(logging.CRITICAL)

[docs]class HostComms(threading.Thread): def __init__(self, recv_callback, job_id, job_idx, tool, dev=False): super(HostComms, self).__init__() self._log = logging.getLogger("HostComms") self._dev = dev self._my_ip = socket.gethostbyname(socket.gethostname()) self._host_ip = self._my_ip.rsplit(".", 1)[0] + ".1" self._host_port = 55555 self._job_id = job_id self._job_idx = job_idx self._tool = tool self._running = threading.Event() self._running.clear() self._send_recv_lock = threading.Lock() self._recv_callback = recv_callback self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
[docs] def run(self): self._log.info("running") self._running.set() if not self._dev: self._sock.connect((self._host_ip, self._host_port)) while self._running.is_set(): if not self._dev: reads,_,_ = select.select([self._sock],[],[], 0.1) if len(reads) > 0: data = "" with self._send_recv_lock: while True: recvd = self._sock.recv(0x1000) if len(recvd) == 0: break data += recvd self._recv_callback(data) time.sleep(0.1) self._log.info("finished")
[docs] def stop(self): self._log.info("stopping") self._running.clear()
[docs] def send_msg(self, type, data): data = json.dumps({ "job": self._job_id, "idx": self._job_idx, "tool": self._tool, "type": type, "data": data }) data_len = struct.pack(">L", len(data)) if not self._dev: try: with self._send_recv_lock: self._sock.send(data_len + data) except: # yes, just silently fail I think??? pass
[docs]class TalusCodeImporter(object): """This class will dynamically import tools and components from the talus git repository. This class *should* conform to "pep-302":https://www.python.org/dev/peps/pep-0302/ """ def __init__(self, loc, username, password, parent_log=None): """Create a new talus code importer that will fetch code from the specified ``location``, using ``username`` and ``password``. :param str loc: The repo location (e.g. ``https://....`` or ``ssh://...``) :param str username: The username to fetch the code with :param str password: The password to fetch the code with """ if parent_log is None: parent_log = logging.getLogger("BOOT") self._log = parent_log.getChild("importer") self.loc = loc if self.loc.endswith("/"): self.loc = self.loc[:-1] self.username = username self.password = password self._code_dir = os.path.join(os.path.dirname(__file__), "TALUS_CODE") if not os.path.exists(self._code_dir): os.makedirs(self._code_dir) sys.path.insert(0, self._code_dir) self.cache = {} dir_check = lambda x: x.endswith("/") self.cache["tools"] = filter(dir_check, self._git_show("talus/tools")["items"]) self.cache["components"] = filter(dir_check, self._git_show("talus/components")["items"]) self.cache["lib"] = filter(dir_check, self._git_show("talus/lib")["items"]) tools = []
[docs] def find_module(self, abs_name, path=None): """Normally, a finder object would return a loader that can load the module. In our case, we're going to be sneaky and just download the files and return ``None`` and let the normal sys.path-type loading take place. This method is cleaner and less error prone :param str abs_name: The absolute name of the module to be imported """ # git ls-files for performance #git.ls_files( # Monkeys eat bananas and poop all day. if not abs_name.split(".")[0] == "talus": return None self.download_module(abs_name) # THIS IS IMPORTANT, YES WE WANT TO RETURN NONE!!! # THIS IS IMPORTANT, YES WE WANT TO RETURN NONE!!! # THIS IS IMPORTANT, YES WE WANT TO RETURN NONE!!! # THIS IS IMPORTANT, YES WE WANT TO RETURN NONE!!! # see the comment in the docstring return None
[docs] def download_module(self, abs_name): """Download the module found at ``abs_name`` from the talus git repository :param str abs_name: The absolute module name of the module to be downloaded """ path = abs_name.replace(".", "/") info = self._git_show(path) if info is None: info_test = self._git_show(path + ".py") if info_test is None: raise ImportError(abs_name) info = info_test self._log.info("loading module {} from git".format(abs_name)) if info["type"] == "listing": return self._download_folder(abs_name, info) elif info["type"] == "file": return self._download_file(abs_name, info) return None
def _download_folder(self, abs_name, info): """Download the module (folder/__init__.py) from git. The module folder will only be recursively downloaded if it is a subfolder of the tools/components/lib folder. """ # if we're loading the root directory of a tool/component/library, recursively # download everything in git in that folder match = re.match(r'^talus\.(tools|components|lib)\.[a-zA-Z_0-9]+$', abs_name) recurse = (match is not None) self._download(info=info, recurse=recurse) def _download_file(self, abs_name, info): """Download the single file from git """ path = os.path.join(self._code_dir, info["filename"]) with open(path, "wb") as f: f.write(info["contents"]) def _download(self, path=None, info=None, recurse=False): """Download files/folders (maybe recursively) into ``self._code_dir``. If the path is a directory, the directory's immediate children will be downloaded. If ``recurse`` is ``True``, then all children will downloaded recursively. :param str path: The talus-code-repo relative path to download :param dict info: The maybe-already-obtained info about the path :param bool recurse: If it should be recursively downloaded """ if path is None and info is None: raise Exception("WTF are you doing?? unexpected condition") if path is not None and info is None: info = self._git_show(path) base_path = os.path.join(self._code_dir, info["filename"]) #self._log.info("downloading to {}".format(base_path)) if info["type"] == "listing": if not os.path.exists(base_path): os.makedirs(base_path) for item in info["items"]: if item.endswith("/"): if recurse: self._download("{}/{}".format(info["filename"], item), recurse=recurse) else: self._download("{}/{}".format(info["filename"], item)) elif info["type"] == "file": with open(base_path, "wb") as f: f.write(info["contents"]) def _git_show(self, path, ref="HEAD"): """Return the json object returned from the /code_cache on the web server :str param path: The talus-code-relative path to get information about (file or directory) :str param ref: The reference with which to lookup the code (can be a branch, commit, etc) """ res = requests.get( "/".join([self.loc, ref, path]), auth=HTTPBasicAuth(self.username, self.password) ) if res.status_code // 100 != 2: return None return json.loads(res.text)
[docs]class TalusBootstrap(object): """The main class that will bootstrap the job and get things running """ def __init__(self, config_path, dev=False): """ :param str config_path: The path to the config file containing json information about the job """ self._log = logging.getLogger("BOOT") if not os.path.exists(config_path): self._log.error("ERROR, config path {} not found!".format(config_path)) exit(1) with open(config_path, "r") as f: self._config = json.loads(f.read()) self._job_id = self._config["id"] self._idx = self._config["idx"] self._tool = self._config["tool"] self._params = self._config["params"] self._num_progresses = 0 self.dev = dev self._host_comms = HostComms(self._on_host_msg_received, self._job_id, self._idx, self._tool, dev=dev)
[docs] def run(self): self._log.debug("running bootstrap") self._host_comms.start() self._install_code_importer() talus_mod = __import__("talus", globals(), locals(), fromlist=["job"]) job_mod = getattr(talus_mod, "job") Job = getattr(job_mod, "Job") try: job = Job( id = self._job_id, idx = self._idx, tool = self._tool, params = self._params, progress_callback = self._on_progress, results_callback = self._on_result, ) job.run() except Exception as e: self._log.exception("Job had an error!") if self._num_progresses == 0: self._log.info("progress was never called, but job finished running, inc progress by 1") self._on_progress(1) self._host_comms.stop() self._log.debug("finished") self._host_comms.send_msg("finished", {}) self._shutdown()
def _shutdown(self): """shutdown the vm""" os.system("shutdown -t 0 -r -f") os.system("shutdown now") def _on_host_msg_received(self, data): """Handle the data received from the host :param str data: The raw data (probably in json format) """ self._log.info("received a message from the host: {}".format(data)) data = json.loads(data) def _on_progress(self, num): """Increment the progress count for this job by ``num`` :param int num: The number to increment the progress count of this job by """ self._num_progresses += num self._log.debug("progress incrementing by {}".format(num)) self._host_comms.send_msg("progress", num) def _on_result(self, result_data): """Append this to the results for this job :param object result_data: Any python object to be stored with this job's results (str, dict, a number, etc) """ self._log.debug("sending result") self._host_comms.send_msg("result", result_data) def _install_code_importer(self): """Install the sys.meta_path finder/loader to automatically load modules from the talus git repo. """ self._log.debug("installing talus code importer") code = self._config["code"] self._code_importer = TalusCodeImporter( code["loc"], code["username"], code["password"], parent_log = self._log ) sys.meta_path = [ self._code_importer ]
[docs]def main(dev=False): config_path = os.path.join(os.path.dirname(__file__), "config.json") bootstrap = TalusBootstrap(config_path, dev=dev) bootstrap.run()
if __name__ == "__main__": dev = False if len(sys.argv) > 1 and sys.argv[1] == "dev": dev = True main(dev=dev)