From 0c5f7fa90509532bbe5dadb4625492d569f3dfc0 Mon Sep 17 00:00:00 2001 From: Sean Anderson Date: Sat, 6 Aug 2022 21:38:36 -0400 Subject: [PATCH] tb: Parametrize rx tests In the recieve tests, the harness often has a choice of how fast to feed data to the module. Up to this point, we have always used the same strategy (typically random), even when multiple strategies were used when writing the test. Add parametrization to test different strategies in each test run. The timing decorator is taken from the cocotb source, since we can't pass parameters to cocotb.test directly any more. Signed-off-by: Sean Anderson --- tb/pcs.py | 39 ++++++++++++++++++++++++++++++++------- tb/pmd.py | 33 +++++++++++++++++++++++++-------- tb/util.py | 24 ++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 15 deletions(-) diff --git a/tb/pcs.py b/tb/pcs.py index 7e6c280..27b32c3 100644 --- a/tb/pcs.py +++ b/tb/pcs.py @@ -7,10 +7,11 @@ import itertools import cocotb from cocotb.clock import Clock +from cocotb.regression import TestFactory from cocotb.triggers import ClockCycles, Edge, RisingEdge, FallingEdge, Timer from cocotb.types import LogicArray -from .util import alist, classproperty, ReverseList +from .util import alist, classproperty, ReverseList, timeout class Code(enum.Enum): _0 = (0b11110, '0') @@ -194,14 +195,34 @@ async def pcs_recv_packet(pcs): yield code.data raise PrematureEndError() -async def pcs_send_codes(pcs, codes): +def one_valid(): + return 1 + +def two_valid(): + return 2 + +def rand_valid(): + return random.randrange(3) + +class saw_valid: + def __init__(self): + self.last = 0 + # Lie for TestFactory + self.__qualname__ = self.__class__.__qualname__ + + def __call__(self): + self.last += 1 + if self.last > 2: + self.last = 0 + return self.last + +async def pcs_send_codes(pcs, codes, valids): await FallingEdge(pcs.rx_clk) codes = list(codes) bits = itertools.chain(*codes) try: while True: - #valid = 2 - valid = random.randrange(3) + valid = valids() pcs.pma_data_rx_valid.value = valid if valid == 0: data = 'XX' @@ -254,8 +275,8 @@ async def test_tx(pcs): await cocotb.start(mii_send_packet(pcs, [0x5, None])) assert [0x5, 0x5, None] == await alist(pcs_recv_packet(pcs)) -@cocotb.test(timeout_time=10, timeout_unit='us') -async def test_rx(pcs): +@timeout(10, 'us') +async def test_rx(pcs, valids): pcs.pma_data_rx.value = LogicArray('11') pcs.pma_data_rx_valid.value = 2 pcs.link_status.value = 1 @@ -276,7 +297,7 @@ async def test_rx(pcs): (Code('J'), Code('K'), Code('I'), Code('I'), (1,1)), # Packet spacing *((*frame([0x55, 0x55]), (1,) * i) for i in range(10)) - ))) + ), valids)) assert packet == await alist(mii_recv_packet(pcs)) @@ -291,3 +312,7 @@ async def test_rx(pcs): # Test packet spacing for _ in range(10): assert [0x5, 0x5] == await alist(mii_recv_packet(pcs)) + +rx_tests = TestFactory(test_rx) +rx_tests.add_option('valids', (one_valid, two_valid, rand_valid, saw_valid())) +rx_tests.generate_tests() diff --git a/tb/pmd.py b/tb/pmd.py index a3a4baf..86a3d52 100644 --- a/tb/pmd.py +++ b/tb/pmd.py @@ -4,13 +4,30 @@ from statistics import NormalDist import cocotb from cocotb.binary import BinaryValue from cocotb.clock import Clock +from cocotb.regression import TestFactory from cocotb.triggers import RisingEdge, Timer +from .util import timeout + def print_list_at(l, i): print(' ' * max(50 - i, 0), *l[max(i - 50, 0):i+50], sep='') -@cocotb.test(timeout_time=100, timeout_unit='us') -async def test_rx(pmd): +BITS = 1000 + +def random_delays(count): + # Target BER is 1e9 and the maximum jitter is 1.4ns + # This is just random jitter (not DDJ or DCJ) but it'll do + delay_dist = NormalDist(8000, 1400 / NormalDist().inv_cdf(1-2e-9)) + return (int(delay) for delay in delay_dist.samples(count)) + +def mindelays(count): + return (7900,) * count + +def maxdelays(count): + return (8100,) * count + +@timeout(100, 'us') +async def test_rx(pmd, delays): pmd.signal_detect.value = 0 await Timer(1) await cocotb.start(Clock(pmd.rx_clk_125, 8, units='ns').start()) @@ -18,22 +35,18 @@ async def test_rx(pmd): await Timer(random.randrange(0, 8000), units='ps') await cocotb.start(Clock(pmd.rx_clk_250, 4, units='ns').start()) - ins = [random.randrange(2) for _ in range(1000)] + ins = [random.randrange(2) for _ in range(BITS)] async def generate_bits(): # random phase await Timer(random.randrange(0, 8000), units='ps') pmd.signal_detect.value = 1 - # Target BER is 1e9 and the maximum jitter is 1.4ns - # This is just random jitter (not DDJ or DCJ) but it'll do - delay_dist = NormalDist(8000, 1400 / NormalDist().inv_cdf(1-2e-9)) - for i, delay in zip(ins, (int(delay) for delay in delay_dist.samples(len(ins)))): + for i, delay in zip(ins, delays(len(ins))): pmd.rx.value = i try: pmd.delay.value = delay except AttributeError: pass await Timer(delay, units='ps') - #await Timer(8100, units='ps') pmd.signal_detect.value = 0 await cocotb.start(generate_bits()) @@ -69,3 +82,7 @@ async def test_rx(pmd): # There will be a few bits at the end not recorded because signal_detect # isn't delayed like the data signals assert best_corr > len(ins) - best_off - 10 + +rx_tests = TestFactory(test_rx) +rx_tests.add_option('delays', (random_delays, mindelays, maxdelays)) +rx_tests.generate_tests() diff --git a/tb/util.py b/tb/util.py index 5f04e54..c009afe 100644 --- a/tb/util.py +++ b/tb/util.py @@ -1,6 +1,12 @@ # SPDX-License-Identifier: AGPL-3.0-Only # Copyright (C) 2022 Sean Anderson +import functools + +import cocotb +from cocotb.triggers import with_timeout +from cocotb.result import SimTimeoutError + async def alist(xs): return [x async for x in xs] @@ -9,6 +15,24 @@ class classproperty(property): def __get__(self, cls, owner): return classmethod(self.fget).__get__(None, owner)() +class timeout: + def __init__(self, time, unit): + self.time = time + self.unit = unit + + def __call__(self, f): + coro = cocotb.coroutine(f) + + @functools.wraps(f) + async def wrapped(*args, **kwargs): + r = coro(*args, **kwargs) + try: + return await with_timeout(r, self.time, self.unit) + except SimTimeoutError: + r.kill() + raise + return wrapped + class ReverseList(list): def __init__(self, iterable=None): super().__init__(reversed(iterable) if iterable is not None else None)