Source code for jade.basic.threading.Threader

import subprocess
import multiprocessing
import time
import atexit
import os
import sys
#from __future__ import print_function

#Need to do more with thread safety
[docs]class Threads: """ Class for managing threads, killing them on program exit. """ def __init__(self): self.thread_classes = [] self.total_allowed_threads = multiprocessing.cpu_count() - 1 #atexit.register(self.kill_all); Disable default use of this for app-specific uses.
[docs] def set_allowed_threads(self, n):
self.total_allowed_threads = n
[docs] def kill_all(self): for th in self.thread_classes: if th.sub_p and th.sub_p.poll() != 0: th.sub_p.kill() if th.multi_p:
th.multi_p.terminate()
[docs] def append(self, thread):
self.thread_classes.append(thread) def __len__(self): return len(self.thread_classes)
[docs] def n_alive(self): i = 0 for th in self.thread_classes: if th.sub_p and th.sub_p.poll() == None: i+=1 if th.multi_p and th.multi_p.exitcode == None: i+=1
return i
[docs] def is_alive(self, pid): th = self.thread_classes[pid] if th.sub_p and th.sub_p.poll() == None: return True elif th.multi_p and th.multi_p.exitcode == None: return True else:
return False
[docs] def get_exitcode(self, pid): th = self.thread_classes[pid] if th.sub_p: return th.sub_p.poll() if th.multi_p:
return th.multi_p.exitcode
[docs] def new_thread_allowed(self): if self.n_alive() == self.total_allowed_threads: return False else:
return True global threads threads = Threads()
[docs]class Threader(object): """ Class for starting 2 new threads. One that runs a system process and one that waits and prints info to std::out or whatever you currently have set as std::out. Use print interval to set the wait time between prints. Useful for GUI subprocessing. """ def __init__(self, print_interval = 0): self.print_interval = print_interval self.sub_p = None; #A subprocess.Popen object self.multi_p = None; #A multiprocessing.Process object.
[docs] def run_system_command(self, command): """ Run a system command using Popen. Prints out at end. Probably should remove this. :param command: :return: """ def start(): ##You absolutely NEED to use PIPE for standard out or you will hang. p = subprocess.Popen(command, shell = True, stdout=subprocess.PIPE,stderr=subprocess.PIPE) wait_p = multiprocessing.Process(target=print_loop, args=(p, self.print_interval)) wait_p.start() self.sub_p = p; self.multi_p = wait_p threads.append(self) print("Running Thread ID: "+repr(len(threads)+1)+" "+command) print("Total Running Threads: "+repr(threads.n_alive())) return p, wait_p if threads.new_thread_allowed(): return start() else: print "Too many threads running. Waiting to start next..." while not threads.new_thread_allowed(): pass
return start()
[docs] def run_functions(self, functions): """ Run a bunch of lambda functions together with multiprocessing :param functions: :return: """ def run(fs): print "Running Thread ID: "+repr(len(threads)+1) print("Total Running Threads: "+repr(threads.n_alive())) for f in fs: f() print "Thread ID "+repr(len(threads)+1)+" done!" if threads.new_thread_allowed(): p = multiprocessing.Process(target=run, args = (functions,)) else: while not threads.new_thread_allowed(): pass p = multiprocessing.Process(target=run, args = (functions,)) p.start() self.multi_p = p
threads.append(self) print repr(threads.n_alive())+" still running..."
[docs]def test_function(i, extra = ""): print extra+" "+repr(i)+"\n" time.sleep(3+i)
print extra+" "+repr(i)+"done\n" if __name__ == "__main__": functions = [] functions.append(lambda: test_function(1, "one: ")) functions.append(lambda: os.system("echo This works baby!; echo $PATH")) f2 = [] f2.append(lambda: test_function(3, "three: ")) f3 = [lambda: test_function(5, "five: ")] threader = Threader() threader.run_functions(functions) threader.run_functions(f2) threader.run_functions(f3)