ethernet/tb/mii_elastic_buffer.py

100 lines
3.0 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-Only
# Copyright (C) 2023 Sean Anderson <seanga2@gmail.com>
import cocotb
from cocotb.binary import BinaryValue
from cocotb.clock import Clock
from cocotb.regression import TestFactory
from cocotb.triggers import ClockCycles, FallingEdge, RisingEdge, Timer
from .pcs_rx import mii_recv_packet
from .pcs_tx import mii_send_packet
from .util import alist, ClockEnable, lookahead, timeout
@cocotb.test(timeout_time=50, timeout_unit='us')
async def test_elastic(buf):
buf.clk.value = BinaryValue('Z')
buf.tx_ce.value = 0
buf.tx_en.value = 0
buf.rx_ce.value = 0
await Timer(1)
await cocotb.start(Clock(buf.clk, 8, units='ns').start())
await FallingEdge(buf.clk)
await cocotb.start(ClockEnable(buf.clk, buf.tx_ce, 5))
await FallingEdge(buf.clk)
rx_ce = await cocotb.start(ClockEnable(buf.clk, buf.rx_ce, 5))
underflows = 0
overflows = 0
async def count_excursions():
nonlocal underflows, overflows
while True:
await RisingEdge(buf.clk)
underflows += buf.underflow.value
overflows += buf.overflow.value
await cocotb.start(count_excursions())
in_signals = {
'ce': buf.tx_ce,
'enable': buf.tx_en,
'err': buf.tx_er,
'data': buf.txd,
}
out_signals = {
'ce': buf.rx_ce,
'err': buf.rx_er,
'data': buf.rxd,
'valid': buf.rx_dv,
}
for packet in (list(range(10)), [0, 1, 2, None, 4, 5]):
await cocotb.start(mii_send_packet(buf, packet, in_signals))
assert packet == await alist(mii_recv_packet(buf, out_signals))
packet = list(range(10))
for ratio in (2, 12):
rx_ce.kill()
while not buf.tx_ce.value:
await RisingEdge(buf.clk)
rx_ce = await cocotb.start(ClockEnable(buf.clk, buf.rx_ce, ratio))
underflows = 0
overflows = 0
await cocotb.start(mii_send_packet(buf, packet, in_signals))
outs = await alist(mii_recv_packet(buf, out_signals))
if ratio > 5:
assert overflows
else:
assert underflows
last = None
for nibble in outs:
if nibble is not None:
assert nibble != last
last = nibble
packet = list(range(5))
for ratio in (4, 6):
# Wait for a CE before shutting it off; we need to output at least one idle
while not buf.rx_ce.value:
await FallingEdge(buf.clk)
await FallingEdge(buf.clk)
rx_ce.kill()
underflows = 0
overflows = 0
await cocotb.start(mii_send_packet(buf, packet, in_signals))
# Set up a worst-case scenario
while not buf.rx_dv.value:
await FallingEdge(buf.clk)
if ratio == 6:
await ClockCycles(buf.clk, 5, False)
rx_ce = await cocotb.start(ClockEnable(buf.clk, buf.rx_ce, ratio))
# And make sure everything works out
assert packet == await alist(mii_recv_packet(buf, out_signals))