tb: axis_replay_buffer: Export send_packet

This function is useful for testing other AXI stream components as well.

Signed-off-by: Sean Anderson <seanga2@gmail.com>
This commit is contained in:
Sean Anderson 2023-01-09 21:04:23 -05:00
parent 6b2eb4d27c
commit 1f925d39ff
1 changed files with 29 additions and 11 deletions

View File

@ -10,6 +10,28 @@ from .util import ClockEnable, lookahead, timeout
BUF_SIZE = 54
async def send_packet(signals, packet, ratio=1):
for val, last in lookahead(packet):
if 'err' in signals:
if val is None:
signals['data'].value = 0
signals['err'].value = 1
else:
signals['data'].value = val
signals['err'].value = 0
else:
signals['data'].value = val
signals['valid'].value = 1
signals['last'].value = last
await RisingEdge(signals['clk'])
while True:
await FallingEdge(signals['clk'])
if signals['ready'].value:
break
signals['valid'].value = 0
if ratio != 1:
await ClockCycles(signals['clk'], ratio - 1, rising=False)
@timeout(30, 'us')
async def test_replay(buf, in_ratio, out_ratio):
buf.s_axis_valid.value = 0
@ -31,17 +53,13 @@ async def test_replay(buf, in_ratio, out_ratio):
async def send():
for packet in packets:
for val, last in lookahead(packet):
buf.s_axis_data.value = val
buf.s_axis_valid.value = 1
buf.s_axis_last.value = last
while True:
await FallingEdge(buf.clk)
if buf.s_axis_ready.value:
break
buf.s_axis_valid.value = 0
if in_ratio != 1:
await ClockCycles(buf.clk, in_ratio - 1, rising=False)
await send_packet({
'clk': buf.clk,
'data': buf.s_axis_data,
'valid': buf.s_axis_valid,
'last': buf.s_axis_last,
'ready': buf.s_axis_ready,
}, packet, in_ratio)
async def recv(packet):
async def handshake():