OpenFPGA/openfpga_flow/scripts/run_fpga_task.py

440 lines
18 KiB
Python

# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Script Name : run_fpga_task.py
# Description : This script designed to run openfpga_flow tasks,
# Opensfpga task are design to run opefpga_flow on each
# Combination of architecture, benchmark and script paramters
# Args : python3 run_fpga_task.py --help
# Author : Ganesh Gore
#Email : ganeshgore@utah.edu
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
import os
import sys
import shutil
import time
from datetime import timedelta
import shlex
import argparse
from configparser import ConfigParser, ExtendedInterpolation
import logging
import glob
import subprocess
import threading
import csv
from string import Template
import pprint
from importlib import util
from collections import OrderedDict
if util.find_spec("humanize"):
import humanize
if sys.version_info[0] < 3:
raise Exception("run_fpga_task script must be using Python 3")
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Configure logging system
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
logging.basicConfig(level=logging.INFO, stream=sys.stdout,
format='%(levelname)s (%(threadName)10s) - %(message)s')
logger = logging.getLogger('OpenFPGA_Task_logs')
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Read commandline arguments
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
parser = argparse.ArgumentParser()
parser.add_argument('tasks', nargs='+')
parser.add_argument('--maxthreads', type=int, default=2,
help="Number of fpga_flow threads to run default = 2," +
"Typically <= Number of processors on the system")
parser.add_argument('--config', help="Override default configuration")
parser.add_argument('--test_run', action="store_true",
help="Dummy run shows final generated VPR commands")
parser.add_argument('--debug', action="store_true",
help="Run script in debug mode")
parser.add_argument('--continue_on_fail', action="store_true",
help="Exit script with return code")
parser.add_argument('--show_thread_logs', action="store_true",
help="Skips logs from running thread")
args = parser.parse_args()
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Read script configuration file
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
task_script_dir = os.path.dirname(os.path.abspath(__file__))
script_env_vars = ({"PATH": {
"OPENFPGA_FLOW_PATH": task_script_dir,
"ARCH_PATH": os.path.join("${PATH:OPENFPGA_PATH}", "arch"),
"BENCH_PATH": os.path.join("${PATH:OPENFPGA_PATH}", "benchmarks"),
"TECH_PATH": os.path.join("${PATH:OPENFPGA_PATH}", "tech"),
"SPICENETLIST_PATH": os.path.join("${PATH:OPENFPGA_PATH}", "SpiceNetlists"),
"VERILOG_PATH": os.path.join("${PATH:OPENFPGA_PATH}", "VerilogNetlists"),
"OPENFPGA_PATH": os.path.abspath(os.path.join(task_script_dir, os.pardir,
os.pardir))}})
config = ConfigParser(interpolation=ExtendedInterpolation())
config.read_dict(script_env_vars)
config.read_file(open(os.path.join(task_script_dir, 'run_fpga_task.conf')))
gc = config["GENERAL CONFIGURATION"]
def main():
validate_command_line_arguments()
for eachtask in args.tasks:
logger.info("Currently running task %s" % eachtask)
eachtask = eachtask.replace("\\", "/").split("/")
job_run_list = generate_each_task_actions(eachtask)
eachtask = "_".join(eachtask)
if not args.test_run:
run_actions(job_run_list)
collect_results(job_run_list)
else:
pprint.pprint(job_run_list)
logger.info("Task execution completed")
exit(0)
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Subroutines starts here
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
def clean_up_and_exit(msg):
logger.error(msg)
logger.error("Exiting . . . . . .")
exit(1)
def validate_command_line_arguments():
if args.debug:
logger.info("Setting loggger in debug mode")
logger.setLevel(logging.DEBUG)
logger.info("Set up to run %d Parallel threads", args.maxthreads)
def generate_each_task_actions(taskname):
"""
This script generates all the scripts required for each benchmark
"""
# Check if task directory exists and consistent
curr_task_dir = os.path.join(gc["task_dir"], *(taskname))
if not os.path.isdir(curr_task_dir):
clean_up_and_exit("Task directory [%s] not found" % curr_task_dir)
os.chdir(curr_task_dir)
curr_task_conf_file = os.path.join(curr_task_dir, "config", "task.conf")
if not os.path.isfile(curr_task_conf_file):
clean_up_and_exit(
"Missing configuration file for task %s" % curr_task_dir)
# Create run directory for current task run ./runxxx
run_dirs = [int(os.path.basename(x)[-3:]) for x in glob.glob('run*[0-9]')]
curr_run_dir = "run%03d" % (max(run_dirs+[0, ])+1)
try:
os.mkdir(curr_run_dir)
if os.path.islink('latest') or os.path.exists('latest'):
os.remove("latest")
os.symlink(curr_run_dir, "latest")
logger.info('Created "%s" directory for current task run' %
curr_run_dir)
except:
logger.exception("")
logger.error("Failed to create new run directory in task directory")
os.chdir(curr_run_dir)
# Read task configuration file and check consistency
task_conf = ConfigParser(allow_no_value=True,
interpolation=ExtendedInterpolation())
task_conf.read_dict(script_env_vars)
task_conf.read_file(open(curr_task_conf_file))
required_sec = ["GENERAL", "BENCHMARKS", "ARCHITECTURES"]
missing_section = list(set(required_sec)-set(task_conf.sections()))
if missing_section:
clean_up_and_exit("Missing sections %s" % " ".join(missing_section) +
" in task configuration file")
# Declare varibles to access sections
TaskFileSections = task_conf.sections()
SynthSection = task_conf["SYNTHESIS_PARAM"]
GeneralSection = task_conf["GENERAL"]
# Check if specified architecture files exist
# TODO Store it as a dictionary and take reference from the key
archfile_list = []
for _, arch_file in task_conf["ARCHITECTURES"].items():
arch_full_path = arch_file
if os.path.isfile(arch_full_path):
archfile_list.append(arch_full_path)
else:
clean_up_and_exit("Architecture file not found: " +
"%s " % arch_file)
if not len(archfile_list) == len(list(set(archfile_list))):
clean_up_and_exit("Found duplicate architectures in config file")
# Get Flow information
logger.info('Running "%s" flow' %
GeneralSection.get("fpga_flow", fallback="yosys_vpr"))
# Check if specified benchmark files exist
benchmark_list = []
for bech_name, each_benchmark in task_conf["BENCHMARKS"].items():
# Declare varible to store paramteres for current benchmark
CurrBenchPara = {}
# Parse benchmark file
bench_files = []
for eachpath in each_benchmark.split(","):
files = glob.glob(eachpath)
if not len(files):
clean_up_and_exit(("No files added benchmark %s" % bech_name) +
" with path %s " % (eachpath))
bench_files += files
# Read provided benchmark configurations
# Common configurations
ys_for_task_common = SynthSection.get("bench_yosys_common")
chan_width_common = SynthSection.get("bench_chan_width_common")
# Individual benchmark configuration
CurrBenchPara["files"] = bench_files
CurrBenchPara["top_module"] = SynthSection.get(bech_name+"_top",
fallback="top")
CurrBenchPara["ys_script"] = SynthSection.get(bech_name+"_yosys",
fallback=ys_for_task_common)
CurrBenchPara["chan_width"] = SynthSection.get(bech_name+"_chan_width",
fallback=chan_width_common)
if GeneralSection.get("fpga_flow") == "vpr_blif":
# Check if activity file exist
if not SynthSection.get(bech_name+"_act"):
clean_up_and_exit("Missing argument %s" % (bech_name+"_act") +
"for vpr_blif flow")
CurrBenchPara["activity_file"] = SynthSection.get(bech_name+"_act")
# Check if base verilog file exists
if not SynthSection.get(bech_name+"_verilog"):
clean_up_and_exit("Missing argument %s for vpr_blif flow" %
(bech_name+"_verilog"))
CurrBenchPara["verilog_file"] = SynthSection.get(
bech_name+"_verilog")
# Add script parameter list in current benchmark
ScriptSections = [x for x in TaskFileSections if "SCRIPT_PARAM" in x]
script_para_list = {}
for eachset in ScriptSections:
command = []
for key, values in task_conf[eachset].items():
command += ["--"+key, values] if values else ["--"+key]
# Set label for Sript Parameters
set_lbl = eachset.replace("SCRIPT_PARAM", "")
set_lbl = set_lbl[1:] if set_lbl else "Common"
script_para_list[set_lbl] = command
CurrBenchPara["script_params"] = script_para_list
benchmark_list.append(CurrBenchPara)
# Create OpenFPGA flow run commnad for each combination of
# architecture, benchmark and parameters
# Create run_job object [arch, bench, run_dir, commnad]
flow_run_cmd_list = []
for indx, arch in enumerate(archfile_list):
for bench in benchmark_list:
for lbl, param in bench["script_params"].items():
flow_run_dir = get_flow_rundir(arch, bench["top_module"], lbl)
command = create_run_command(
curr_job_dir=flow_run_dir,
archfile=arch,
benchmark_obj=bench,
param=param,
task_conf=task_conf)
flow_run_cmd_list.append({
"arch": arch,
"bench": bench,
"name": "%02d_arch%s_%s" % (indx, bench["top_module"], lbl),
"run_dir": flow_run_dir,
"commands": command,
"finished" : False,
"status": False})
logger.info('Found %d Architectures %d Benchmarks & %d Script Parameters' %
(len(archfile_list), len(benchmark_list), len(ScriptSections)))
logger.info('Created total %d jobs' % len(flow_run_cmd_list))
return flow_run_cmd_list
def get_flow_rundir(arch, top_module, flow_params=None):
path = [
os.path.basename(arch).replace(".xml", ""),
top_module,
flow_params if flow_params else "common"
]
return os.path.abspath(os.path.join(*path))
def create_run_command(curr_job_dir, archfile, benchmark_obj, param, task_conf):
"""
Create_run_script function accepts run directory, architecture list and
fpga_flow configuration file and prepare final executable fpga_flow script
TODO : Replace this section after convert fpga_flow to python script
Config file creation and bechnmark list can be skipped
"""
# = = = = = = = = = File/Directory Consitancy Check = = = = = = = = = =
if not os.path.isdir(gc["misc_dir"]):
clean_up_and_exit("Miscellaneous directory does not exist")
# = = = = = = = = = = = = Create execution folder = = = = = = = = = = = =
if os.path.isdir(curr_job_dir):
question = "One the result directory already exist.\n"
question += "%s\n" % curr_job_dir
reply = str(input(question+' (y/n): ')).lower().strip()
if reply[:1] in ['y', 'yes']:
shutil.rmtree(curr_job_dir)
else:
logger.info("Result directory removal denied by the user")
exit()
os.makedirs(curr_job_dir)
# Make execution command to run Open FPGA flow
task_gc = task_conf["GENERAL"]
command = [archfile] + benchmark_obj["files"]
command += ["--top_module", benchmark_obj["top_module"]]
command += ["--run_dir", curr_job_dir]
if task_gc.get("fpga_flow"):
command += ["--fpga_flow", task_gc.get("fpga_flow")]
if benchmark_obj.get("activity_file"):
command += ["--activity_file", benchmark_obj.get("activity_file")]
if benchmark_obj.get("verilog_file"):
command += ["--base_verilog", benchmark_obj.get("verilog_file")]
if benchmark_obj.get("ys_script"):
command += ["--yosys_tmpl", benchmark_obj["ys_script"]]
if task_gc.getboolean("power_analysis"):
command += ["--power"]
command += ["--power_tech", task_gc.get("power_tech_file")]
if task_gc.getboolean("spice_output"):
command += ["--vpr_fpga_spice"]
if task_gc.getboolean("verilog_output"):
command += ["--vpr_fpga_verilog"]
command += ["--vpr_fpga_verilog_dir", "."]
command += ["--vpr_fpga_x2p_rename_illegal_port"]
# Add other paramters to pass
command += param
if args.debug:
command += ["--debug"]
return command
def strip_child_logger_info(line):
try:
logtype, message = line.split(" - ", 1)
lognumb = {"CRITICAL": 50, "ERROR": 40, "WARNING": 30,
"INFO": 20, "DEBUG": 10, "NOTSET": 0}
logger.log(lognumb[logtype.strip().upper()], message)
except:
logger.info(line)
def run_single_script(s, eachJob, job_list):
logger.debug('Added job in pool')
with s:
logger.debug("Running OpenFPGA flow with " +
" ".join(eachJob["commands"]))
name = threading.currentThread().getName()
eachJob["starttime"] = time.time()
try:
logfile = "%s_out.log" % name
with open(logfile, 'w+') as output:
output.write("* "*20 + '\n')
output.write("RunDirectory : %s\n" % os.getcwd())
command = [gc["python_path"], gc["script_default"]] + \
eachJob["commands"]
output.write(" ".join(command) + '\n')
output.write("* "*20 + '\n')
process = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
for line in process.stdout:
if args.show_thread_logs:
strip_child_logger_info(line[:-1])
sys.stdout.buffer.flush()
output.write(line)
process.wait()
if process.returncode:
raise subprocess.CalledProcessError(0, command)
eachJob["status"] = True
except:
logger.exception("Failed to execute openfpga flow - " +
eachJob["name"])
if not args.continue_on_fail:
clean_up_and_exit("Faile to run task %s exiting" % name)
eachJob["endtime"] = time.time()
timediff = timedelta(seconds=(eachJob["endtime"]-eachJob["starttime"]))
timestr = humanize.naturaldelta(timediff) if "humanize" in sys.modules \
else str(timediff)
logger.info("%s Finished with returncode %d, Time Taken %s " %
(name, process.returncode, timestr))
eachJob["finished"] = True
no_of_finished_job = sum([ not eachJ["finished"] for eachJ in job_list])
logger.info("***** %d runs pending *****" % (no_of_finished_job))
def run_actions(job_list):
thread_sema = threading.Semaphore(args.maxthreads)
thred_list = []
for index, eachjob in enumerate(job_list):
JobID = 'Job_%02d' % (index+1)
logger.info("Running %s = %s" % (JobID, eachjob["name"]))
t = threading.Thread(target=run_single_script,
name=JobID, args=(thread_sema, eachjob, job_list))
t.start()
thred_list.append(t)
for eachthread in thred_list:
eachthread.join()
def collect_results(job_run_list):
task_result = []
for run in job_run_list:
if not run["status"]:
logger.warning("Skipping %s run", run["name"])
continue
# Check if any result file exist
if not glob.glob(os.path.join(run["run_dir"], "*.result")):
logger.info("No result files found for %s" % run["name"])
# Read and merge result file
vpr_res = ConfigParser(allow_no_value=True,
interpolation=ExtendedInterpolation())
vpr_res.read_file(
open(os.path.join(run["run_dir"], "vpr_stat.result")))
result = OrderedDict()
result["name"] = run["name"]
result["TotalRunTime"] = int(run["endtime"]-run["starttime"])
result.update(vpr_res["RESULTS"])
task_result.append(result)
colnames = []
for eachLbl in task_result:
colnames.extend(eachLbl.keys())
if len(task_result):
with open("task_result.csv", 'w', newline='') as csvfile:
writer = csv.DictWriter(
csvfile, extrasaction='ignore', fieldnames=list(set(colnames)))
writer.writeheader()
for eachResult in task_result:
writer.writerow(eachResult)
if __name__ == "__main__":
main()