Source code for wg

import queue
import time
import threading
from pacswg.timer import *

import numpy as np

[docs]def get_random_wait_time(rps): """get_random_wait_time generates random exponential inter-arrival times corresponding to Poisson process. :param rps: rps or requests per second is the target number of requests per second :type rps: float :return: a draw from the resulting exponential distribution for inter-arrival time :rtype: float """ scale = 1/rps return np.random.exponential(scale)
[docs]class WorkerThread(threading.Thread): def __init__(self, parent, sleep_time=2): super(WorkerThread, self).__init__() # if daemon is true this thread will die when the main thread dies self.daemon = True self.stop_signal = False self.parent = parent
[docs] def run(self): while not self.stop_signal: try: item = self.parent.q.get(timeout=1) # print('-', end='') if item is None: time.sleep(.01) continue else: res = self.parent.worker_func() self.parent.temp_stats.append(res) except queue.Empty: time.sleep(.01) continue
[docs]class WorkloadGenerator: """WorkloadGenerator is the class responsible for generating the desired workload using the delay function provided, to achieve the target requests per second. :return: an instance of the WorkloadGenerator class :rtype: object """ def __init__(self, worker_func, rps=10/60, delay_func=None, worker_thread_count=10, *args, **kwargs): """__init__ for WorkloadGenerator class. :param worker_func: the worker function that will be called by the worker threads, it shouldn't have any arguments and should return a dict. :type worker_func: function :param rps: desired requests per second to be achieved by the workload generator, defaults to 10/60 :type rps: float, optional :param delay_func: the function that generates a draw from inter-arrival time given rps as an argument, defaults to exponential distribution :type delay_func: function, optional :param worker_thread_count: number of worker threads, defaults to 10 :type worker_thread_count: int, optional """ super().__init__(*args, **kwargs) self.rps = rps self.worker_threads = None self.temp_stats = [] self.worker_thread_count = worker_thread_count self.worker_func = worker_func self.q = queue.Queue() self.fire_timer = TimerClass() self.prepare_test() if delay_func is None: self.delay_func = get_random_wait_time else: self.delay_func = delay_func
[docs] def get_stats(self): """get_stats gathers the values generated by calling the workload function throughout the test. :return: stats :rtype: array of dicts """ return self.temp_stats
[docs] def fire(self): """fire causes one of the worker threads to call worker_func once """ self.q.put(1)
[docs] def reset_stats(self): """reset_stats resets the info gathered from worker threads. :return: sucess :rtype: boolean """ self.temp_stats = [] return True
[docs] def set_rps(self, new_rps): """set_rps sets the number of requests per second that will be made by the workers. :param new_rps: the new rps :type new_rps: float :return: success :rtype: boolean """ if new_rps < 1 / 60 / 60: new_rps = 1 / 60 / 60 self.rps = new_rps return True
[docs] def prepare_test(self): """prepare_test resets the timer that will be used to time the requests. """ self.fire_timer.tic()
[docs] def fire_wait(self): """fire_wait fires a request, generates an inter-arrival delay using delay_finc, then waits for that amount of time. """ self.fire_timer.tic() self.fire() wait_time = self.delay_func(self.rps) - self.fire_timer.toc() if wait_time > 0: time.sleep(wait_time)
[docs] def stop_workers(self): """stop_workers stops all workers and waits until the threads are all shut down. :return: success :rtype: boolean """ if self.worker_threads is not None: for worker_thread in self.worker_threads: worker_thread.stop_signal = True # Wait for them to finish up for worker_thread in self.worker_threads: worker_thread.join() return True else: return True
def __del__(self): self.stop_workers()
[docs] def start_workers(self): """start_workers starts up the worker pool """ self.stop_workers() self.worker_threads = [] for i in range(self.worker_thread_count): worker_thread = WorkerThread(self) worker_thread.start() self.worker_threads.append(worker_thread)