from string import Template import sys import os import re import glob import time import threading from datetime import timedelta import argparse import subprocess import logging from configparser import ConfigParser, ExtendedInterpolation # = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = # Configure logging system # = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = FILE_LOG_FORMAT = '%(levelname)s (%(threadName)10s) - %(message)s' logging.basicConfig(level=logging.INFO, stream=sys.stdout, format='%(levelname)s (%(threadName)10s) - %(message)s') logger = logging.getLogger('Modelsim_run_log') # = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = # Parse commandline arguments # = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = parser = argparse.ArgumentParser() parser.add_argument('files', nargs='+', help="Pass SimulationDeckInfo generated by OpenFPGA flow" + " or pass taskname ") parser.add_argument('--maxthreads', type=int, default=2, help="Number of fpga_flow threads to run default = 2," + "Typically <= Number of processors on the system") parser.add_argument('--debug', action="store_true", help="Run script in debug mode") parser.add_argument('--modelsim_proc_tmpl', type=str, help="Modelsim proc template file") parser.add_argument('--modelsim_runsim_tmpl', type=str, help="Modelsim runsim template file") parser.add_argument('--run_sim', action="store_true", help="Execute generated script in formality") parser.add_argument('--modelsim_proj_dir', help="Provide modelsim project directory") parser.add_argument('--modelsim_proj_name', help="Provide modelsim project name") parser.add_argument('--modelsim_ini', type=str, default="/uusoc/facility/cad_tools/Mentor/modelsim10.7b/modeltech/modelsim.ini", help="Skip any confirmation") parser.add_argument('--skip_prompt', action='store_true', help='Skip any confirmation') parser.add_argument('--ini_filename', type=str, default="simulation_deck_info.ini", help='default INI filename in in fun dir') args = parser.parse_args() # = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = # Read script configuration file # = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = task_script_dir = os.path.dirname(os.path.abspath(__file__)) script_env_vars = ({"PATH": { "OPENFPGA_FLOW_PATH": task_script_dir, "ARCH_PATH": os.path.join("${PATH:OPENFPGA_PATH}", "arch"), "BENCH_PATH": os.path.join("${PATH:OPENFPGA_PATH}", "benchmarks"), "TECH_PATH": os.path.join("${PATH:OPENFPGA_PATH}", "tech"), "SPICENETLIST_PATH": os.path.join("${PATH:OPENFPGA_PATH}", "SpiceNetlists"), "VERILOG_PATH": os.path.join("${PATH:OPENFPGA_PATH}", "VerilogNetlists"), "OPENFPGA_PATH": os.path.abspath(os.path.join(task_script_dir, os.pardir, os.pardir))}}) config = ConfigParser(interpolation=ExtendedInterpolation()) config.read_dict(script_env_vars) config.read_file(open(os.path.join(task_script_dir, 'run_fpga_task.conf'))) gc = config["GENERAL CONFIGURATION"] # = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = # Load default templates for modelsim # = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = task_script_dir = os.path.dirname(os.path.abspath(__file__)) if not args.modelsim_proc_tmpl: args.modelsim_proc_tmpl = os.path.join(task_script_dir, os.pardir, "misc", "modelsim_proc.tcl") if not args.modelsim_runsim_tmpl: args.modelsim_runsim_tmpl = os.path.join(task_script_dir, os.pardir, "misc", "modelsim_runsim.tcl") args.modelsim_proc_tmpl = os.path.abspath(args.modelsim_proc_tmpl) args.modelsim_runsim_tmpl = os.path.abspath(args.modelsim_runsim_tmpl) def main(): if os.path.isfile(args.files[0]): create_tcl_script(args.files) else: # Check if task directory exists and consistent taskname = args.files[0] task_run = "latest" if len(args.files) > 1: task_run = f"run{int(args.files[1]):03}" temp_dir = os.path.join(gc["task_dir"], taskname) if not os.path.isdir(temp_dir): clean_up_and_exit("Task directory [%s] not found" % temp_dir) temp_dir = os.path.join(gc["task_dir"], taskname, task_run) if not os.path.isdir(temp_dir): clean_up_and_exit("Task run directory [%s] not found" % temp_dir) # = = = = = = = Create a current script log file handler = = = = logfile_path = os.path.join(gc["task_dir"], taskname, task_run, "modelsim_run.log") logfilefh = logging.FileHandler(logfile_path, "w") logfilefh.setFormatter(logging.Formatter(FILE_LOG_FORMAT)) logger.addHandler(logfilefh) logger.info("Created log file at %s" % logfile_path) # = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = # = = = = Read Task log file and extract run directory = = = logfile = os.path.join(gc["task_dir"], taskname, task_run, "*_out.log") logfiles = glob.glob(logfile) if not len(logfiles): clean_up_and_exit("No successful run found in [%s]" % temp_dir) task_ini_files = [] for eachfile in logfiles: with open(eachfile) as fp: run_dir = [re.findall(r'^INFO.*Run directory : (.*)$', line) for line in open(eachfile)] run_dir = filter(bool, run_dir) for each_run in run_dir: INIfile = os.path.join(each_run[0], args.ini_filename) if os.path.isfile(INIfile): task_ini_files.append(INIfile) logger.info(f"Found {len(task_ini_files)} INI files") create_tcl_script(task_ini_files) def clean_up_and_exit(msg): logger.error(msg) logger.error("Exiting . . . . . .") exit(1) def create_tcl_script(files): runsim_files = [] for eachFile in files: eachFile = os.path.abspath(eachFile) pDir = os.path.dirname(eachFile) os.chdir(pDir) config = ConfigParser() config.read(eachFile) config = config["SIMULATION_DECK"] # Resolve project Modelsim project path if not args.modelsim_proj_dir: args.modelsim_run_dir = os.path.dirname(os.path.abspath(eachFile)) args.modelsim_proj_dir = os.path.join( args.modelsim_run_dir, "MMSIM2") logger.info(f"Modelsim project dir not provide " + f"using default {args.modelsim_proj_dir} directory") if not args.skip_prompt: input("Press Enter to continue, Ctrl+C to abort") args.modelsim_proj_dir = os.path.abspath(args.modelsim_proj_dir) config["MODELSIM_PROJ_DIR"] = args.modelsim_proj_dir if not os.path.exists(args.modelsim_proj_dir): os.makedirs(args.modelsim_proj_dir) # Resolve Modelsim Project name if not args.modelsim_proj_name: args.modelsim_proj_name = config["BENCHMARK"] + "_MMSIM" logger.info(f"Modelsim project name not provide " + f"using default {args.modelsim_proj_name} directory") if not args.skip_prompt: input("Press Enter to continue, Ctrl+C to abort") config["MODELSIM_PROJ_NAME"] = args.modelsim_proj_name config["MODELSIM_INI"] = args.modelsim_ini # Modify the variables in config file here config["TOP_TB"] = os.path.splitext(config["TOP_TB"])[0] # pass # Write final template file # Write runsim file tmpl = Template(open(args.modelsim_runsim_tmpl, encoding='utf-8').read()) runsim_filename = os.path.join(args.modelsim_proj_dir, "%s_runsim.tcl" % config['BENCHMARK']) logger.info(f"Creating tcl script at : {runsim_filename}") with open(runsim_filename, 'w', encoding='utf-8') as tclout: tclout.write(tmpl.substitute(config)) # Write proc file proc_filename = os.path.join(args.modelsim_proj_dir, "%s_autocheck_proc.tcl" % config['BENCHMARK']) logger.info(f"Creating tcl script at : {proc_filename}") with open(proc_filename, 'w', encoding='utf-8') as tclout: tclout.write(open(args.modelsim_proc_tmpl, encoding='utf-8').read()) runsim_files.append({ "modelsim_run_dir": args.modelsim_run_dir, "runsim_filename": runsim_filename, "status" :False, "finished" : True }) # Execute modelsim if args.run_sim: thread_sema = threading.Semaphore(args.maxthreads) logger.info("Launching %d parallel threads" % args.maxthreads) thread_list = [] for thread_no, eachjob in enumerate(runsim_files): t = threading.Thread(target=run_modelsim_thread, name=f"Thread_{thread_no:d}", args=(thread_sema, eachjob, runsim_files)) t.start() thread_list.append(t) for eachthread in thread_list: eachthread.join() exit() else: logger.info("Created runsim and proc files") logger.info(f"runsim_filename {runsim_filename}") logger.info(f"proc_filename {proc_filename}") from pprint import pprint pprint(runsim_files) def run_modelsim_thread(s, eachJob, job_list): os.chdir(eachJob["modelsim_run_dir"]) with s: thread_name = threading.currentThread().getName() eachJob["starttime"] = time.time() try: logfile = "%s_modelsim.log" % thread_name with open(logfile, 'w+') as output: output.write("* "*20 + '\n') output.write("RunDirectory : %s\n" % os.getcwd()) command = ["vsim", "-c", "-do", eachJob["runsim_filename"]] output.write(" ".join(command) + '\n') output.write("* "*20 + '\n') logger.info("Running modelsim with [%s]" % " ".join(command)) process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) for line in process.stdout: if "Errors" in line: logger.info(line.strip()) sys.stdout.buffer.flush() output.write(line) process.wait() if process.returncode: raise subprocess.CalledProcessError(0, " ".join(command)) eachJob["status"] = True except: logger.exception("Failed to execute openfpga flow - " + eachJob["name"]) if not args.continue_on_fail: os._exit(1) eachJob["endtime"] = time.time() timediff = timedelta(seconds=(eachJob["endtime"]-eachJob["starttime"])) timestr = humanize.naturaldelta(timediff) if "humanize" in sys.modules \ else str(timediff) logger.info("%s Finished with returncode %d, Time Taken %s " % (thread_name, process.returncode, timestr)) eachJob["finished"] = True no_of_finished_job = sum([not eachJ["finished"] for eachJ in job_list]) logger.info("***** %d runs pending *****" % (no_of_finished_job)) if __name__ == "__main__": if args.debug: logger.info("Setting loggger in debug mode") logger.setLevel(logging.DEBUG) main()