slave package

Submodules

slave.amqp_man module

class slave.amqp_man.AmqpManager(host=None)[source]

Bases: threading.Thread

A class to manage jobs (starting/stopping/cancelling/etc)

AMQP_JOB_QUEUE = 'jobs'
AMQP_JOB_RESULT_QUEUE = 'job_results'
ack_method(method)[source]

basic_ack the method

Method:The method to ack
bind_queue(exchange, queue)[source]

Bind the queue queue to the exchange exchange

consume_queue(queue_name, callback, no_ack=False)[source]

Consume from the queue queue_name with callback callback

declare_exchange(name, type)[source]

Declare an exchange named name and of type type

declare_queue(queue_name, **props)[source]

Declare the queue queue_name with properties defined in **props kwargs. Popular properties to set:

  • durable
  • exclusive
  • auto_delete
do_start()[source]
get_message_count(queue_name, **props)[source]

Get the size of the amqp queue queue_name. Note that the props kwargs must match the declaration properties of the queue. Getting the queue size is done by redeclaring the queue with the same properties, with the additional passive=True property set.

Queue_name:The name of the queue
classmethod instance(host=None)[source]
queue_msg(msg, queue_name, **props)[source]

Queue the message msg in the queue queue_name

Parameters:
  • msg (str) – The message to send (str or unicode)
  • queue_name (str) – The queue to put the message in
  • **props (dict) –

    Any additional props (exchange, etc)

run()[source]

Run the job manager. Only one of these should ever be running at a time :returns: TODO

stop()[source]

Stop the job manager :returns: TODO

wait_for_ready(timeout=2147483648)[source]

Wait until the AMQP manager is connected and ready to go

class slave.amqp_man.AmqpQueueHandler(callback, queue_name, channel, lock, no_ack, running)[source]

Bases: threading.Thread

run()[source]
stop()[source]

slave.models module

slave.models.do_connect(host)[source]

Module contents

class slave.GuestComms[source]

Bases: twisted.protocols.basic.LineReceiver

Communicates with the guest hosts as they start running

connectionMade()[source]
rawDataReceived(data)[source]
class slave.GuestCommsFactory[source]

Bases: twisted.internet.protocol.Factory

buildProtocol(addr)[source]
class slave.Slave(amqp_host, max_vms)[source]

Bases: threading.Thread

The slave handler

AMQP_BROADCAST_XCHG = 'broadcast'
AMQP_JOB_PROPS = {'exclusive': False, 'auto_delete': False, 'durable': True}
AMQP_JOB_QUEUE = 'jobs'
AMQP_JOB_STATUS_QUEUE = 'job_status'
AMQP_SLAVE_PROPS = {'exclusive': False, 'auto_delete': False, 'durable': True}
AMQP_SLAVE_QUEUE = 'slaves'
AMQP_SLAVE_STATUS_PROPS = {'exclusive': False, 'auto_delete': False, 'durable': True}
AMQP_SLAVE_STATUS_QUEUE = 'slave_status'
cancel_job(job)[source]

Cancel the job with job id job

Job:The job id to cancel
handle_guest_comms(data)[source]
classmethod instance(amqp_host=None, max_vms=None)[source]
run()[source]
stop()[source]

Stop the slave

slave.main(amqp_host, max_vms)[source]