tb: axis_mii_buffer: Export recv_packet

Export recv_packet for use by other testbenches. This is mostly
straightforward, except we need the ability to manually specify when
last should be asserted (to handle replays).

Signed-off-by: Sean Anderson <seanga2@gmail.com>
This commit is contained in:
Sean Anderson 2023-02-28 22:04:01 -05:00
parent e44d381c20
commit 3f61f85a1f
1 changed files with 19 additions and 9 deletions

View File

@ -36,6 +36,18 @@ async def send_packet(signals, packet, ratio=1, last_extra=0):
if ratio != 1 and not last: if ratio != 1 and not last:
await ClockCycles(signals['clk'], ratio - 1, rising=False) await ClockCycles(signals['clk'], ratio - 1, rising=False)
async def recv_packet(signals, packet, last=None):
if last is None:
last = len(packet)
for i, val in enumerate(packet):
while not signals['valid'].value or not signals['ready'].value:
await RisingEdge(signals['clk'])
assert signals['data'].value == val
if 'last' in signals:
assert signals['last'].value == (i == last - 1)
await RisingEdge(signals['clk'])
@timeout(30, 'us') @timeout(30, 'us')
async def test_replay(buf, in_ratio, out_ratio): async def test_replay(buf, in_ratio, out_ratio):
buf.clk.value = BinaryValue('Z') buf.clk.value = BinaryValue('Z')
@ -69,16 +81,14 @@ async def test_replay(buf, in_ratio, out_ratio):
}, packet, in_ratio) }, packet, in_ratio)
async def recv(packet): async def recv(packet):
async def handshake():
while not buf.m_axis_valid.value or not buf.m_axis_ready.value:
await RisingEdge(buf.clk)
async def recv_len(length): async def recv_len(length):
for i, val in enumerate(packet[:length]): await recv_packet({
await handshake() 'clk': buf.clk,
assert buf.m_axis_data.value == val 'ready': buf.m_axis_ready,
assert buf.m_axis_last == (i == len(packet) - 1) 'valid': buf.m_axis_valid,
await RisingEdge(buf.clk) 'last': buf.m_axis_last,
'data': buf.m_axis_data,
}, packet[:length], last=len(packet))
async def restart(): async def restart():
await FallingEdge(buf.clk) await FallingEdge(buf.clk)