#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright 2020 The SkyWater PDK Authors. # # Use of this source code is governed by the Apache 2.0 # license that can be found in the LICENSE file or at # https://www.apache.org/licenses/LICENSE-2.0 # # SPDX-License-Identifier: Apache-2.0 ''' VCD waveform to wawedrom script/SVG conversion script. ''' from __future__ import print_function import os import sys import argparse import pathlib import wavedrom import re from contextlib import contextmanager wavedrom_template ="""\ {{ signal: [ {signals} ]}}""" signal_template = " {{ name: \"{name}\", {fill}wave: '{wave}' }}" def eprint(*args, **kwargs): ''' Print to stderr ''' print(*args, file=sys.stderr, **kwargs) @contextmanager def file_or_stdout(file): ''' Open file or stdout if file is None ''' if file is None: yield sys.stdout else: with file.open('w') as out_file: yield out_file def readVCD (file): ''' Parses VCD file. Args: file - path to a VCD file [pathlib.Path] Returns: vcd - dictionary containing vcd sections [dict] ''' eprint() eprint(file.name) assert file.is_file(), file vcd = {} with file.open('r') as f: currtag = 'body' for line in f: # regular line if not line.startswith('$'): vcd[currtag] = vcd.setdefault(currtag, '') + line continue # tag, other than end if not line.startswith('$end'): currtag = line.partition(' ')[0].lstrip('$').rstrip() vcd[currtag] = vcd.setdefault(currtag, '') + line.partition(' ')[2].rpartition('$')[0] # line ends with end tag if not vcd[currtag].endswith('\n'): vcd[currtag] += '\n' if line.split()[-1]=='$end': currtag = 'body' vcd[currtag] = '' if 'var' not in vcd: raise SyntaxError("No variables recorded in VCD file") if 'dumpvars' not in vcd: print ("Warning: intial variable states undefined") var['dumpvars'] = '' return vcd def reduce_clock_sequences (wave) : ''' Remove clock seqnces longer than 2 cycles not accompanied by other signals changes Parameters: wave - dictionary 'signal'->['list of states'] [dict] ''' for v in wave: sig = wave[v] # analized signal other = [wave[i] for i in wave if i!=v] # list of other signals other = [''.join(s) for s in zip(*other)] # list of concatenated states other = [len(s.replace('.','')) for s in other] # list of state changes count sig = [s if o==0 else ' ' for s,o in zip(sig,other)] # keep only when no changes in other sig = "".join(sig) cuts = [] for m in re.finditer("(10){2,}",sig): cuts.append( (m.start()+1, m.end()-1) ) # area to be reduced, leave 1..0 cuts.reverse() for cut in cuts: for v,w in wave.items(): # reduce cuts from all signals wave[v] = w[ :cut[0]] + w[cut[1]: ] return wave def parsetowavedrom (file, savetofile = False, reduce_clock = False): ''' Reads and simplifies VCD waveform Generates wavedrom notation. Args: file - path to a VCD file [pathlib.Path] ''' varsubst = {} # var substitution reg = [] # list of signals wire = [] # list of signals (wire class) wave = {} # waveform event = [] # event timings vcd = readVCD (file) # parse vars for line in vcd['var'].split('\n'): line = line.strip().split() if len(line)<4: if len(line): print (f"Warning: malformed var definition {' '.join(line)}") continue if line[1]!='1': print (f"Warning: bus in vars (unsupported) {' '.join(line)}") if line[0]=='reg': reg.append(line[3]) varsubst[line[2]] = line[3] if line[0]=='wire': wire.append(line[3]) varsubst[line[2]] = line[3] # set initial states event.append(0) #default for v in reg+wire: wave[v] = ['x'] #defined for line in vcd['dumpvars'].split('\n'): if len(line)>=2: wave[ varsubst[line[1]] ] = [line[0]] # parse wave body for line in vcd['body'].split('\n'): #timestamp line if line.startswith('#'): line = line.strip().lstrip('#') if not line.isnumeric(): raise SyntaxError("Invalid VCD timestamp") event.append(int(line)) for v in wave.keys(): wave[v].append('.') # state change line else : if len(line)>=2: wave [ varsubst[line[1]] ][-1] = line[0] if reduce_clock: wave = reduce_clock_sequences(wave) signals = [] for v in wave.keys(): fill = ' ' * (max( [len(s) for s in wave.keys()] ) - len(v)) wavestr = ''.join(wave[v]) signals.append( signal_template.format( name = v, wave = wavestr, fill = fill ) ) signals = ',\n'.join(signals) wavedrom = wavedrom_template.format ( signals = signals ) outfile = file.with_suffix(".wdr.json") if savetofile else None with file_or_stdout(outfile) as f: f.write(wavedrom) return wavedrom def quoted_strings_wavedrom (wdr) : ''' Convert wavedrom script to more restrictive version of JSON with quoted keywords Parameters: wdr - wavedrom script [str] ''' wdr = wdr.replace(' signal:',' "signal":') wdr = wdr.replace(' name:',' "name":') wdr = wdr.replace(' wave:',' "wave":') wdr = wdr.replace("'",'"') return wdr def main(): ''' Converts VCD waveform to wavedrom format''' output_txt = 'output:\n stdout or [vcdname].wdr.json file and/or [vcdname].svg file' allcellpath = '../../../libraries/*/latest/cells/*/*.vcd' parser = argparse.ArgumentParser( description = main.__doc__, epilog = output_txt, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument( "--all_libs", help="process all in "+allcellpath, action="store_true") parser.add_argument( "-w", "--wavedrom", help="generate wavedrom .wdr.json file", action="store_true") parser.add_argument( "-s", "--savesvg", help="generate .svg image", action="store_true") parser.add_argument( "-r", "--reduceclk", help="reduce clock sequences", action="store_true") parser.add_argument( "infile", help="VCD waveform file", type=pathlib.Path, nargs="*") args = parser.parse_args() if args.all_libs: path = pathlib.Path(allcellpath).expanduser() parts = path.parts[1:] if path.is_absolute() else path.parts paths = pathlib.Path(path.root).glob(str(pathlib.Path("").joinpath(*parts))) args.infile = list(paths) infile = [d.resolve() for d in args.infile if d.is_file()] errors = 0 for f in infile: try: wdr = parsetowavedrom(f, args.wavedrom, args.reduceclk) if args.savesvg: svg = wavedrom.render( quoted_strings_wavedrom(wdr) ) outfile = f.with_suffix(".svg") svg.saveas(outfile) except KeyboardInterrupt: sys.exit(1) except (SyntaxError, AssertionError, FileNotFoundError, ChildProcessError) as ex: eprint (f'{type(ex).__name__}: {", ".join(ex.args)}') errors +=1 eprint (f'\n{len(infile)} files processed, {errors} errors.') return 0 if errors else 1 if __name__ == "__main__": sys.exit(main())