tb: Parametrize rx tests

In the recieve tests, the harness often has a choice of how fast to feed
data to the module. Up to this point, we have always used the same
strategy (typically random), even when multiple strategies were used
when writing the test. Add parametrization to test different strategies
in each test run. The timing decorator is taken from the cocotb source,
since we can't pass parameters to cocotb.test directly any more.

Signed-off-by: Sean Anderson <seanga2@gmail.com>
This commit is contained in:
Sean Anderson 2022-08-06 21:38:36 -04:00
parent 6ffb3481fe
commit 0c5f7fa905
3 changed files with 81 additions and 15 deletions

View File

@ -7,10 +7,11 @@ import itertools
import cocotb
from cocotb.clock import Clock
from cocotb.regression import TestFactory
from cocotb.triggers import ClockCycles, Edge, RisingEdge, FallingEdge, Timer
from cocotb.types import LogicArray
from .util import alist, classproperty, ReverseList
from .util import alist, classproperty, ReverseList, timeout
class Code(enum.Enum):
_0 = (0b11110, '0')
@ -194,14 +195,34 @@ async def pcs_recv_packet(pcs):
yield code.data
raise PrematureEndError()
async def pcs_send_codes(pcs, codes):
def one_valid():
return 1
def two_valid():
return 2
def rand_valid():
return random.randrange(3)
class saw_valid:
def __init__(self):
self.last = 0
# Lie for TestFactory
self.__qualname__ = self.__class__.__qualname__
def __call__(self):
self.last += 1
if self.last > 2:
self.last = 0
return self.last
async def pcs_send_codes(pcs, codes, valids):
await FallingEdge(pcs.rx_clk)
codes = list(codes)
bits = itertools.chain(*codes)
try:
while True:
#valid = 2
valid = random.randrange(3)
valid = valids()
pcs.pma_data_rx_valid.value = valid
if valid == 0:
data = 'XX'
@ -254,8 +275,8 @@ async def test_tx(pcs):
await cocotb.start(mii_send_packet(pcs, [0x5, None]))
assert [0x5, 0x5, None] == await alist(pcs_recv_packet(pcs))
@cocotb.test(timeout_time=10, timeout_unit='us')
async def test_rx(pcs):
@timeout(10, 'us')
async def test_rx(pcs, valids):
pcs.pma_data_rx.value = LogicArray('11')
pcs.pma_data_rx_valid.value = 2
pcs.link_status.value = 1
@ -276,7 +297,7 @@ async def test_rx(pcs):
(Code('J'), Code('K'), Code('I'), Code('I'), (1,1)),
# Packet spacing
*((*frame([0x55, 0x55]), (1,) * i) for i in range(10))
)))
), valids))
assert packet == await alist(mii_recv_packet(pcs))
@ -291,3 +312,7 @@ async def test_rx(pcs):
# Test packet spacing
for _ in range(10):
assert [0x5, 0x5] == await alist(mii_recv_packet(pcs))
rx_tests = TestFactory(test_rx)
rx_tests.add_option('valids', (one_valid, two_valid, rand_valid, saw_valid()))
rx_tests.generate_tests()

View File

@ -4,13 +4,30 @@ from statistics import NormalDist
import cocotb
from cocotb.binary import BinaryValue
from cocotb.clock import Clock
from cocotb.regression import TestFactory
from cocotb.triggers import RisingEdge, Timer
from .util import timeout
def print_list_at(l, i):
print(' ' * max(50 - i, 0), *l[max(i - 50, 0):i+50], sep='')
@cocotb.test(timeout_time=100, timeout_unit='us')
async def test_rx(pmd):
BITS = 1000
def random_delays(count):
# Target BER is 1e9 and the maximum jitter is 1.4ns
# This is just random jitter (not DDJ or DCJ) but it'll do
delay_dist = NormalDist(8000, 1400 / NormalDist().inv_cdf(1-2e-9))
return (int(delay) for delay in delay_dist.samples(count))
def mindelays(count):
return (7900,) * count
def maxdelays(count):
return (8100,) * count
@timeout(100, 'us')
async def test_rx(pmd, delays):
pmd.signal_detect.value = 0
await Timer(1)
await cocotb.start(Clock(pmd.rx_clk_125, 8, units='ns').start())
@ -18,22 +35,18 @@ async def test_rx(pmd):
await Timer(random.randrange(0, 8000), units='ps')
await cocotb.start(Clock(pmd.rx_clk_250, 4, units='ns').start())
ins = [random.randrange(2) for _ in range(1000)]
ins = [random.randrange(2) for _ in range(BITS)]
async def generate_bits():
# random phase
await Timer(random.randrange(0, 8000), units='ps')
pmd.signal_detect.value = 1
# Target BER is 1e9 and the maximum jitter is 1.4ns
# This is just random jitter (not DDJ or DCJ) but it'll do
delay_dist = NormalDist(8000, 1400 / NormalDist().inv_cdf(1-2e-9))
for i, delay in zip(ins, (int(delay) for delay in delay_dist.samples(len(ins)))):
for i, delay in zip(ins, delays(len(ins))):
pmd.rx.value = i
try:
pmd.delay.value = delay
except AttributeError:
pass
await Timer(delay, units='ps')
#await Timer(8100, units='ps')
pmd.signal_detect.value = 0
await cocotb.start(generate_bits())
@ -69,3 +82,7 @@ async def test_rx(pmd):
# There will be a few bits at the end not recorded because signal_detect
# isn't delayed like the data signals
assert best_corr > len(ins) - best_off - 10
rx_tests = TestFactory(test_rx)
rx_tests.add_option('delays', (random_delays, mindelays, maxdelays))
rx_tests.generate_tests()

View File

@ -1,6 +1,12 @@
# SPDX-License-Identifier: AGPL-3.0-Only
# Copyright (C) 2022 Sean Anderson <seanga2@gmail.com>
import functools
import cocotb
from cocotb.triggers import with_timeout
from cocotb.result import SimTimeoutError
async def alist(xs):
return [x async for x in xs]
@ -9,6 +15,24 @@ class classproperty(property):
def __get__(self, cls, owner):
return classmethod(self.fget).__get__(None, owner)()
class timeout:
def __init__(self, time, unit):
self.time = time
self.unit = unit
def __call__(self, f):
coro = cocotb.coroutine(f)
@functools.wraps(f)
async def wrapped(*args, **kwargs):
r = coro(*args, **kwargs)
try:
return await with_timeout(r, self.time, self.unit)
except SimTimeoutError:
r.kill()
raise
return wrapped
class ReverseList(list):
def __init__(self, iterable=None):
super().__init__(reversed(iterable) if iterable is not None else None)