Source code for jade.rosetta_jade.RunRosetta

#!/usr/bin/env python
# Jared Adolf-Bryfogle
# Classes for running Rosetta on a cluster using pre-defined sets of options stored in Json file.

# Used by itself or subclassed for benchmarking.
# Can also use in other code by passing the parser to RunRosetta.

import os
import sys
from collections import defaultdict
import argparse
from jade.basic.general import get_platform
from jade.basic.path import *
from jade.basic.general import fix_input_args

from jade.rosetta_jade.SetupRosettaOptionsGeneral import SetupRosettaOptionsGeneral

#Fixes parser for extra rosetta opts.
fix_input_args()

[docs]def run_on_qsub(cmd, queue_dir, name, print_only = False, extra_opts = ""): script_path = write_queue_file(cmd, queue_dir, name) #qsub -q dna -l nodes=10:ppn=11 -V -N $1 -d $qsub_output -v np=101 $benchmarks/$1 qsub_cmd = " qsub " +extra_opts+" -V -N "+name + " -d "+queue_dir qsub_cmd = qsub_cmd +" "+script_path if print_only: print print_full_cmd(cmd, script_path) print("\n\n") print(qsub_cmd) else: #qsub_cmd = "which sbatch" print_full_cmd(cmd, script_path) print "\n\n" print(qsub_cmd)
os.system(qsub_cmd)
[docs]def run_on_slurm(cmd, queue_dir, name, nodes = False, ntasks = None, print_only = False, extra_opts = ""): script_path = write_queue_file(cmd, queue_dir, name) #qsub -q dna -l nodes=10:ppn=11 -V -N $1 -d $qsub_output -v np=101 $benchmarks/$1 ##Set the walltime for something rediculous. I hate walltime. extra_opts = extra_opts+" "+"--time=0" slurm_cmd = "sbatch "+extra_opts+" --job-name="+name + " " slurm_cmd = slurm_cmd + " -o "+queue_dir+"/"+name+"_%j.out" slurm_cmd = slurm_cmd + " -e "+queue_dir+"/"+name+"_%j.err" if nodes and not re.search('--nodes', extra_opts): slurm_cmd = slurm_cmd+" --nodes="+str(nodes) if ntasks and not re.search('--ntasks', extra_opts): slurm_cmd = slurm_cmd+" --ntasks="+str(ntasks) slurm_cmd = slurm_cmd +" "+script_path if print_only: print "Only Printing!" print_full_cmd(cmd, script_path) print "\n\n" print(slurm_cmd) else: print "Starting Slurm!!" #slurm_cmd = "which sbatch" print_full_cmd(cmd, script_path) os.system("which sbatch")
os.system(slurm_cmd) print "\nFlags file written to "+outfilename
[docs]def get_option_strings(cmd): """ Get the options as a string to be printed or saved to a file. :param cmd: :rtype: str """ grouped = defaultdict(list) cmdSP = cmd.split(" ") if len(cmdSP)< 3: return "" options = [] current_option = cmdSP[2] for c in cmdSP[3:]: if not c: continue if c[0] =='-' and c != '-': current_option = c options.append(current_option) continue elif c != '-': grouped[current_option].append(c) final_string = [] for option in options: opt = option+" "+" ".join(grouped[option]) if opt[0] =='-' and opt != '-': final_string.append(opt) full_s = "\n".join(final_string)
return "\n".join([s for s in full_s.split("\n") if (s[0] == '-' and s != '-')])
[docs]def write_queue_file(cmd, queue_dir, name): cmd = "#!"+os.environ['SHELL']+"\n\n\n" +cmd OUTFILE = open(queue_dir+"/"+name+".sh", 'w') OUTFILE.write(cmd) OUTFILE.close()
return queue_dir+"/"+name+".sh"
[docs]class RunRosetta(object): def __init__(self, program = None, parser = None, db_mode = False, json_run=None): """ Base class for Running Rosetta through python. Mainly used for benchmarking experiments. Derive this class, override methods to setup benchmark. Common Methods to override for more benchmarking control: _add_args() _get_make_mpi_tracer_dir() _get_make_out_path() _get_prefix() _get_output_string() run() """ self.base_options = None self.extra_options = None self.program = program self.db_mode = db_mode self.jsons = [os.path.basename(d) for d in glob.glob(os.path.join(get_rosetta_json_run_path(),"*.json"))] self._add_args(parser) self._parse_args() self._setup_base_options() if json_run: self.options.json_run = json_run if self.options.json_run: print "Setting JSON RUN" extra_options = SetupRosettaOptionsGeneral(self.options.json_run) self._set_extra_options(extra_options) else: self._set_extra_options(self.base_options) self._resolve_options() def _add_args(self, parser = None): """ Add Arguments to an Argument Parser or create a new one. """ if not parser: self.parser = argparse.ArgumentParser("This program runs Rosetta MPI locally or on a cluster using slurm or qsub. " "Relative paths are accepted.") else: self.parser = parser common_options = self.parser.add_argument_group("Common Options" ) common_options.add_argument("-s", help = "Path to a pdb file") common_options.add_argument("-l", help = "Path to a list of pdb files") common_options.add_argument("--np", default = 101, help = "Number of processors to use for MPI. " "Default = 101") common_options.add_argument("--nstruct", default = 1, help = "The number of structures/parallel runs. Can also set this in any JSON file.") common_options.add_argument("--job_name", default = "rosetta_run", help = "Set the job name used for mpi_tracer_to_file dir and queue. " "Default = 'rosetta_run'. " "(Benchmarking: Override any set in json_base.)",) common_options.add_argument("--outdir", "-o", default = "decoys", help = "Outpath. " "Default = 'pwd/decoys' ") common_options.add_argument("--json_run", help = "JSON file for specific Rosetta run. Not required. Pre-Configured JSONS include: "+repr(self.jsons), ) common_options.add_argument("--extra_options", help = "Extra Rosetta options. " "Specify in quotes!") common_options.add_argument("--script_vars", help = "Any script vars for XML scripts." "Specify as you would in Rosetta. like: glycosylation=137A,136A", nargs = '*') common_options.add_argument("--jd3", help = "Is this app JD3? Must build with extras=mpi,serialization.", default = False, action="store_true") if not self.program: common_options.add_argument("--program", help = "Define the Rosetta program to use if not set in json_run") debug_options = self.parser.add_argument_group("Testing and Debugging") debug_options.add_argument("--print_only", help = "Do not actually run anything. Just print setup for review.", default = False, action = "store_true") debug_options.add_argument("--local_test", default = False, help = "Is this a local test? Will change nstruct to 1 and run on 2 processors", action = "store_true") debug_options.add_argument("--one_file_mpi", help = "Output all MPI std::out to a single file instead of splitting it. ", default = False, action = "store_true") special_options = self.parser.add_argument_group("Special Options for controlling execution") special_options.add_argument("--job_manager", default="slurm", help="Job Manager to launch job. (Or none if local or local_test)" "Default = 'slurm ' ", choices = ["slurm","qsub","local","local_test"] ) special_options.add_argument("--job_manager_opts", help = "Extra options for the job manager, such as queue or processor requests" "Remove double dashes. Exclusive is on by default. Specify like: -p imperial exclusive.", default = [], nargs = "*") special_options.add_argument("--json_base", default = get_rosetta_json_run_path()+"/common_flags.json", help = "JSON file for setting up base paths/etc. for the cluster." "Default = 'database/rosetta/jsons/common_flags.json' ") special_options.add_argument("--compiler", default = "gcc", help = "Set the compiler used. Will set clang automatically for macos. " "Default = 'gcc' ", choices = ["gcc", "clang"]) special_options.add_argument("--mpiexec", help = "Specify a particular path (or type of) MPI exec. Default is srun (due to vax). If local or local test, will use mpiexex", default = "srun") special_options.add_argument("--machine_file", help = "Optional machine file for passing to MPI") if self.db_mode: db_group = self.parser.add_argument_group("Relational Databases", "Options for Rosetta Database input and output. Use for features or for inputting and output structures as databases") db_group.add_argument("--db_mode", help = "Set the mode for Rosetta to use if using a database. " "Features will be output to a database. " "If not sqlite3, must build Rosetta with extras. " "If any post-processing is required, such as combining sqlite3 dbs, will do this. " "Default DB mode for features is sqlite3. ", choices = ["sqlite3", "mysql", "postgres"]) db_group.add_argument("--db_name", help = "In or Out database name", default = "features.db" ) db_group.add_argument("--db_batch", help = "Batch of structures.", default ="feat") db_group.add_argument("--db_in", help = "Use an input database", default = False, action = "store_true") db_group.add_argument("--db_out", help = "Use an output database", default = False, action = "store_true") def _parse_args(self): self.options = self.parser.parse_args() #print repr(self.options) if hasattr(self.options, 'program'): self.program = self.options.program elif not self.program: sys.exit("Rosetta Program to run must be specified.") if self.options.local_test: self.options.job_manager = "local_test" def _setup_base_options(self): """ Setup the base JSON file that gives settings on the cluster [and] project. """ if not self.options.json_base: sys.exit("No Base Json Given. This is required for general cluster settings.") self.base_options = SetupRosettaOptionsGeneral(self.options.json_base) def _set_json_run(self, json_run): print "JSON Run Set: "+json_run self.options.json_run = json_run self._set_extra_options(SetupRosettaOptionsGeneral( json_run)) def _set_extra_options(self, extra_options): """ Set extra options (derived SetupRosettaOptionsGeneral or baseclass) for benchmarking runs. :param extra_options: SetupRosettaOptionsGeneral """ self.extra_options = extra_options if not isinstance(extra_options, SetupRosettaOptionsGeneral): sys.exit() self._resolve_options() def _get_extra_rosetta_options_string(self): if not self.options.extra_options: return "" else: ''' opts = [] skip_next = False for i in range(0, len(self.options.extra_options)): o = self.options.extra_options[ i ] if skip_next: skip_next = False continue oSP = o.split('=') if len(oSP) == 2: opts.append('-'+oSP[0]+" "+oSP[1]) elif len(oSP) == 1: #Boolean options if oSP[0][0] == '@' and len(oSP[0]) != 1: opts.append(oSP[0]) elif oSP[0][0] == '@': opts.append(oSP[0]) opts.append(self.options.extra_options[ i + 1 ]) skip_next = True else: opts.append('-'+oSP[0]) else: print "Rosetta option too long: "+repr(oSP) continue return " ".join(opts) ''' return self.options.extra_options def _resolve_options(self): """ Resolve options from base, extra, and cmd-line settings. """ #Define Conflict resolutions for base class def _set_nstruct(): if self.options.nstruct: pass elif self.extra_options.get_nstruct(): self.options.nstruct = str(self.extra_options.get_nstruct()) elif self.base_options.get_nstruct(): self.options.nstruct = str(self.base_options.get_nstruct()) def _set_machine_file(): if self.options.machine_file: pass elif self.extra_options.get_machine_file(): self.options.machine_file = self.extra_options.get_machine_file() elif self.base_options.get_machine_file(): self.options.machine_file = self.base_options.get_machine_file() def _set_job_manager_opts(): if self.options.job_manager_opts: pass elif self.extra_options._get_job_manager_opts(): self.options.machine_file = self.extra_options._get_job_manager_opts() elif self.base_options._get_job_manager_opts(): self.options.machine_file = self.base_options._get_job_manager_opts() def _setup_program(): if hasattr(self.options, "program") and self.options.program: self.program = self.options.program elif self.extra_options.get_program(): self.program = self.extra_options.get_program() elif self.base_options.get_program(): self.program = self.base_options.get_program() def _set_db_mode(): if self.options.db_mode: pass elif self.extra_options.get_db_mode(): self.options.db_mode = self.extra_options.get_db_mode() elif self.base_options.get_db_mode(): self.options.db_mode = self.base_options.get_db_mode() def _clean_up_db_name(): if self.options.db_name: if not re.search(".db", self.options.db_name): self.options.db_name = self.options.db_name+".db" #Resolve options overrides _set_nstruct() _set_machine_file() _set_job_manager_opts() _setup_program() if self.db_mode: _set_db_mode() _clean_up_db_name() def _get_root(self): if self.extra_options._get_root(): return self.extra_options._get_root() elif self.base_options._get_root(): return self.base_options._get_root() else: return os.getcwd() def _get_job_manager_opts(self): opts = [] for opt in self.options.job_manager_opts: if re.search('-', opt): opts.append(opt) else: opts.append("--"+opt) return " ".join(opts) def _get_job_name(self, *args, **kwargs): """ Get the job name. job_name -> exp_name. Pass extra args to add arguments separated by '_' """ if self.options.job_name: s = self.options.job_name for a in args: s = s+"_"+a return s elif self.extra_options: s = self.options.exp_name for a in args: s = s+"_"+a return s else: return self.options.exp_name+"."+os.path.basename(self._get_make_out_path()) def _get_program(self): """ Get the set program """ if get_platform() == 'macos': self.options.compiler = 'clang' if not self.program and not self.options.json_run: sys.exit("Please set a JSON run file.") elif not self.program: sys.exit("Please specify a program in the JSON run file") if self.options.jd3: mode=".mpiserialization." else: mode=".mpi." return self.program +mode+get_platform() + self.options.compiler+"release" def _get_make_queue_dir(self, *args, **kwargs): """ Get and make the queue dir where qsub/slurm scripts will go. """ log_path = self.base_options._get_make_log_root_dir() + "/queue" if not os.path.exists(log_path): os.makedirs(log_path) return log_path def _set_outdir(self, outdir): self.options.outdir = outdir ### Methods to override for specific Benchmarks ### def _get_make_mpi_tracer_dir(self, *args, **kwargs): """ Get and make the dir to which the MPI output of each process will go. ONLY for MPI TRACER LOGs """ #name = self.get_out_prefix(*args, **kwargs) log_path = os.path.join(self.base_options._get_make_log_root_dir(), "mpi_tracer_logs") if not os.path.exists(log_path): os.makedirs(log_path) log_path = log_path+"/"+self._get_job_name() if not os.path.exists(log_path): os.makedirs(log_path) return log_path def _get_out_prefix(self, *args, **kwargs): return None def _get_make_out_path(self, *args, **kwargs): """ Get and make the dir to which decoys will go. root/decoys """ s = self.base_options._get_root() + "/" + self.options.outdir if not os.path.exists(s): os.makedirs(s) #s = s + "/"+self.get_out_prefix(*args, **kwargs) if not os.path.exists(s): os.makedirs(s) return s def _get_output_string(self, *args, **kwargs): """ Get the full output string for MPI """ s = self._get_program() if self._get_out_prefix(): s = s + " -out:prefix "+self._get_out_prefix() if re.search("out:path:all", self._get_extra_rosetta_options_string()): #print self._get_extra_rosetta_options_string() sys.exit( "Please use --outdir script option instead of out:path:all" ) #Nstruct s = s + " -nstruct " + str(self.options.nstruct) #Outpath s = s + " -out:path:all " + self._get_make_out_path(*args, **kwargs) s = s + self.base_options.get_base_rosetta_flag_string() #Log Dir: if not self.options.one_file_mpi: dir = self._get_make_mpi_tracer_dir(*args, **kwargs) s = s + " -mpi_tracer_to_file "+ dir+"/tracer_logs_" #For these benchmarks, there are only a single root directory. if self.options.json_run: s = s + self.extra_options.get_base_rosetta_flag_string(self.base_options._get_root()) #DB Mode if self.db_mode: if self.options.db_in: s = s + " -in:use_database" if not self.options.db_mode: sys.exit("Please select the database mode you wish to use.") if self.options.db_out: s = s + " -out:use_database" if not self.options.db_mode: sys.exit("Please select the database mode you wish to use. ") if self.options.db_mode == "sqlite3" and (not re.search('separate_db_per_mpi_process', s)) : s = s + " -separate_db_per_mpi_process" if self.options.db_mode: s = s + " -inout:dbms:mode "+self.options.db_mode if self.options.db_name: s = s + " -inout:dbms:database_name " +self.options.db_name if re.search("features", self.base_options.get_xml_script() + self.extra_options.get_xml_script()): print "Checking Features" if self.options.db_name: s = s +" -parser:script_vars name="+self.options.db_name if self.options.db_batch: s = s +" -parser:script_vars batch="+self.options.db_batch #Input decoys if self.options.l: #This works for me - makes it easier to make a list in the dir and run it. # Change this to be smarter or add an option if this is a problem. p = os.path.dirname(self.options.l) if p != "": s = s+ " -in:path "+p s = s+ " -l "+self.options.l if self.options.s: s = s+ " -s "+self.options.s #Extra Rosetta options: s = s + " "+self._get_extra_rosetta_options_string() if self.options.script_vars: s = s + " -parser:script_vars "+" ".join(self.options.script_vars) return s ''' def get_out_prefix(self, *args, **kwargs): """ Get the output prefix if overrides or exp name is set. """ if self.options.out_prefix: return self.options.out_prefix+"." else: return None #s = self.options.exp_name #return s ''' def _get_full_cmd(self, *args, **kwargs): """ Get the full command line. :param args: :param kwargs: :rtype: str """ cmd_string = self._get_output_string(*args, **kwargs) mpiexec="mpiexec" if self.options.mpiexec and not self.local_run(): mpiexec = self.options.mpiexec if self.options.job_manager == "slurm": cmd = "cd "+ self._get_root() + " \n" + mpiexec + " " else: cmd = "cd "+ self._get_root() + " \n" + mpiexec + " -np " + str(self.options.np) if self.options.machine_file: cmd = cmd + " --machine_file "+self.options.machine_file+" "+ cmd_string else: cmd = cmd + " "+ cmd_string if self.db_mode and self.options.db_mode == "sqlite3": cmd = cmd + "\n" cmd = cmd + "cd "+self.options.outdir+"\n" cmd = cmd + "bash "+ get_rosetta_features_root()+"/sample_sources/merge.sh "+self.options.db_name + " "+self.options.db_name+"_*\n" cmd = cmd + "cd -" return cmd
[docs] def local_run(self, *args, **kwargs): """ Get if we are running locally :rtype: bool """ if (self.options.job_manager == "local" or self.options.job_manager == "local_test"): print "Local Run!" return True else:
return False
[docs] def run(self, *args, **kwargs): log_dir = self._get_make_mpi_tracer_dir(*args, **kwargs) outpath = self._get_make_out_path(*args, **kwargs) queue_dir = self._get_make_queue_dir(*args, **kwargs) print "\nLogDir: "+log_dir print "QueueDir: "+queue_dir print "OutPath: "+outpath+"\n\n" cmd = self._get_full_cmd() if self.options.job_manager == "local" and self.options.print_only: print cmd + "\n" elif self.options.job_manager == "local": print cmd + "\n" os.system(cmd) elif self.options.job_manager == "local_test" or self.options.local_test: self.options.np = 2 self.options.nstruct = 1 self.options.split_mpi_output = False cmd = self._get_full_cmd() print_full_cmd(cmd) print(cmd + "\n") os.system(cmd) elif self.options.job_manager == "qsub": run_on_qsub(cmd, queue_dir, self._get_job_name(*args, **kwargs), self.options.print_only, self._get_job_manager_opts()) elif self.options.job_manager == "slurm":
run_on_slurm(cmd, queue_dir, self._get_job_name(*args, **kwargs), ntasks=self.options.np, print_only=self.options.print_only, extra_opts=self._get_job_manager_opts()) if __name__ == "__main__": run_rosetta = RunRosetta() run_rosetta.run()