#!/usr/bin/env python3 import os import sys import argparse import tempfile import tarfile import glob import urllib.request import math import subprocess import shutil TATUM_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) tests = { 'basic': [os.path.join(TATUM_ROOT, 'test', 'basic')], 'mcnc20': ["http://www.eecg.utoronto.ca/~kmurray/tatum/golden_results/mcnc20_tatum_golden.tar.gz"], #'vtr': [ #], #'titan_other': [ #], #'titan': [ #], #'tau15_unit_delay': [ #], } def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("test_names", help="Test to run", nargs='+', choices=tests.keys()) parser.add_argument("--tatum_test_exec", help="Path to tatum test executable (Default: from path)", default=None) parser.add_argument("--tatum_nworkers", help="How many parallel workers each tatum invokation should use. (Default: tatum default)", type=int, default=None) parser.add_argument("--tatum_nserial", help="How serial runs tatum should perform per test. (Default: %(default)s)", type=int, default=3) parser.add_argument("--tatum_nparallel", help="How parallel runs tatum should perform per test. (Default: %(default)s)", type=int, default=6) parser.add_argument("--debug", help="Show directory path and don't clean-up", default=False, action='store_true') return parser.parse_args() def main(): args = parse_args() if args.tatum_test_exec: args.tatum_test_exec = os.path.abspath(args.tatum_test_exec) else: args.tatum_test_exec = "tatum_test" #From path num_failures = 0 for test_name in args.test_names: for test_url in tests[test_name]: work_dir = tempfile.mkdtemp(prefix="tatum_reg_test") try: test_files = download_extract_test(args, work_dir, test_url) num_failures += run_test(args, work_dir, test_name, test_files) finally: if not args.debug: shutil.rmtree(work_dir) else: print(work_dir) sys.exit(num_failures) def run_test(args, work_dir, test_name, test_files): num_failed = 0 os.chdir(work_dir) print("Running {} ({} tests)".format(test_name, len(test_files))) for test_file in test_files: num_failed += run_single_test(args, work_dir, test_file) return num_failed; def run_single_test(args, work_dir, test_file): num_failed = 0 benchmark_name, ext = os.path.splitext(os.path.basename(test_file)) log = benchmark_name + '.log' cmd = [] cmd += [args.tatum_test_exec] cmd += ['--verify', "1"] cmd += ['--num_serial', str(args.tatum_nserial)] cmd += ['--num_parallel', str(args.tatum_nparallel)] if args.tatum_nworkers: cmd += ['--num_workers', str(args.tatum_nworkers)] cmd += [test_file] print(" {}".format(test_file), end='') if args.debug: print() print(" cmd: {} log: {}".format(' '.join(cmd), log)) with open(log, 'w') as outfile: retcode = subprocess.call(cmd, stdout=outfile, stderr=outfile) if retcode == 0: print(" PASSED") else: num_failed += 1 print(" FAILED") #Print log if failed with open(log, 'r') as log_file: for line in log_file: print("\t{}".format(line), end='') return num_failed def download_extract_test(args, work_dir, test_url): test_files = [] if '.tar' in test_url: #A tar file of benchmark files benchmark_tar = os.path.join(work_dir, os.path.basename(test_url)) get_url(test_url, benchmark_tar) with tarfile.TarFile.open(benchmark_tar, mode="r|*") as tar_file: tar_file.extractall(path=work_dir) test_files += glob.glob("{}/*.tatum".format(work_dir)) test_files += glob.glob("{}/*/*.tatum".format(work_dir)) else: #A directory of benchmark files test_files += glob.glob("{}/*.tatum".format(test_url)) test_files += glob.glob("{}/*/*.tatum".format(test_url)) return test_files def get_url(url, filename): if '://' in url: download_url(url, filename) else: shutl.copytree(url, filename) def download_url(url, filename): """ Downloads the specifed url to filename """ print("Downloading {} to {}".format(url, filename)) urllib.request.urlretrieve(url, filename, reporthook=download_progress_callback) def download_progress_callback(block_num, block_size, expected_size): """ Callback for urllib.urlretrieve which prints a dot for every percent of a file downloaded """ total_blocks = int(math.ceil(expected_size / block_size)) progress_increment = int(math.ceil(total_blocks / 100)) if block_num % progress_increment == 0: print(".", end='', flush=False) if block_num*block_size >= expected_size: print("") if __name__ == "__main__": main()