diff --git a/tb/pcs.py b/tb/pcs.py index 7e6c280..27b32c3 100644 --- a/tb/pcs.py +++ b/tb/pcs.py @@ -7,10 +7,11 @@ import itertools import cocotb from cocotb.clock import Clock +from cocotb.regression import TestFactory from cocotb.triggers import ClockCycles, Edge, RisingEdge, FallingEdge, Timer from cocotb.types import LogicArray -from .util import alist, classproperty, ReverseList +from .util import alist, classproperty, ReverseList, timeout class Code(enum.Enum): _0 = (0b11110, '0') @@ -194,14 +195,34 @@ async def pcs_recv_packet(pcs): yield code.data raise PrematureEndError() -async def pcs_send_codes(pcs, codes): +def one_valid(): + return 1 + +def two_valid(): + return 2 + +def rand_valid(): + return random.randrange(3) + +class saw_valid: + def __init__(self): + self.last = 0 + # Lie for TestFactory + self.__qualname__ = self.__class__.__qualname__ + + def __call__(self): + self.last += 1 + if self.last > 2: + self.last = 0 + return self.last + +async def pcs_send_codes(pcs, codes, valids): await FallingEdge(pcs.rx_clk) codes = list(codes) bits = itertools.chain(*codes) try: while True: - #valid = 2 - valid = random.randrange(3) + valid = valids() pcs.pma_data_rx_valid.value = valid if valid == 0: data = 'XX' @@ -254,8 +275,8 @@ async def test_tx(pcs): await cocotb.start(mii_send_packet(pcs, [0x5, None])) assert [0x5, 0x5, None] == await alist(pcs_recv_packet(pcs)) -@cocotb.test(timeout_time=10, timeout_unit='us') -async def test_rx(pcs): +@timeout(10, 'us') +async def test_rx(pcs, valids): pcs.pma_data_rx.value = LogicArray('11') pcs.pma_data_rx_valid.value = 2 pcs.link_status.value = 1 @@ -276,7 +297,7 @@ async def test_rx(pcs): (Code('J'), Code('K'), Code('I'), Code('I'), (1,1)), # Packet spacing *((*frame([0x55, 0x55]), (1,) * i) for i in range(10)) - ))) + ), valids)) assert packet == await alist(mii_recv_packet(pcs)) @@ -291,3 +312,7 @@ async def test_rx(pcs): # Test packet spacing for _ in range(10): assert [0x5, 0x5] == await alist(mii_recv_packet(pcs)) + +rx_tests = TestFactory(test_rx) +rx_tests.add_option('valids', (one_valid, two_valid, rand_valid, saw_valid())) +rx_tests.generate_tests() diff --git a/tb/pmd.py b/tb/pmd.py index a3a4baf..86a3d52 100644 --- a/tb/pmd.py +++ b/tb/pmd.py @@ -4,13 +4,30 @@ from statistics import NormalDist import cocotb from cocotb.binary import BinaryValue from cocotb.clock import Clock +from cocotb.regression import TestFactory from cocotb.triggers import RisingEdge, Timer +from .util import timeout + def print_list_at(l, i): print(' ' * max(50 - i, 0), *l[max(i - 50, 0):i+50], sep='') -@cocotb.test(timeout_time=100, timeout_unit='us') -async def test_rx(pmd): +BITS = 1000 + +def random_delays(count): + # Target BER is 1e9 and the maximum jitter is 1.4ns + # This is just random jitter (not DDJ or DCJ) but it'll do + delay_dist = NormalDist(8000, 1400 / NormalDist().inv_cdf(1-2e-9)) + return (int(delay) for delay in delay_dist.samples(count)) + +def mindelays(count): + return (7900,) * count + +def maxdelays(count): + return (8100,) * count + +@timeout(100, 'us') +async def test_rx(pmd, delays): pmd.signal_detect.value = 0 await Timer(1) await cocotb.start(Clock(pmd.rx_clk_125, 8, units='ns').start()) @@ -18,22 +35,18 @@ async def test_rx(pmd): await Timer(random.randrange(0, 8000), units='ps') await cocotb.start(Clock(pmd.rx_clk_250, 4, units='ns').start()) - ins = [random.randrange(2) for _ in range(1000)] + ins = [random.randrange(2) for _ in range(BITS)] async def generate_bits(): # random phase await Timer(random.randrange(0, 8000), units='ps') pmd.signal_detect.value = 1 - # Target BER is 1e9 and the maximum jitter is 1.4ns - # This is just random jitter (not DDJ or DCJ) but it'll do - delay_dist = NormalDist(8000, 1400 / NormalDist().inv_cdf(1-2e-9)) - for i, delay in zip(ins, (int(delay) for delay in delay_dist.samples(len(ins)))): + for i, delay in zip(ins, delays(len(ins))): pmd.rx.value = i try: pmd.delay.value = delay except AttributeError: pass await Timer(delay, units='ps') - #await Timer(8100, units='ps') pmd.signal_detect.value = 0 await cocotb.start(generate_bits()) @@ -69,3 +82,7 @@ async def test_rx(pmd): # There will be a few bits at the end not recorded because signal_detect # isn't delayed like the data signals assert best_corr > len(ins) - best_off - 10 + +rx_tests = TestFactory(test_rx) +rx_tests.add_option('delays', (random_delays, mindelays, maxdelays)) +rx_tests.generate_tests() diff --git a/tb/util.py b/tb/util.py index 5f04e54..c009afe 100644 --- a/tb/util.py +++ b/tb/util.py @@ -1,6 +1,12 @@ # SPDX-License-Identifier: AGPL-3.0-Only # Copyright (C) 2022 Sean Anderson +import functools + +import cocotb +from cocotb.triggers import with_timeout +from cocotb.result import SimTimeoutError + async def alist(xs): return [x async for x in xs] @@ -9,6 +15,24 @@ class classproperty(property): def __get__(self, cls, owner): return classmethod(self.fget).__get__(None, owner)() +class timeout: + def __init__(self, time, unit): + self.time = time + self.unit = unit + + def __call__(self, f): + coro = cocotb.coroutine(f) + + @functools.wraps(f) + async def wrapped(*args, **kwargs): + r = coro(*args, **kwargs) + try: + return await with_timeout(r, self.time, self.unit) + except SimTimeoutError: + r.kill() + raise + return wrapped + class ReverseList(list): def __init__(self, iterable=None): super().__init__(reversed(iterable) if iterable is not None else None)