From 1f925d39fff4db4cc8ae78d825acb90f357f14e5 Mon Sep 17 00:00:00 2001 From: Sean Anderson Date: Mon, 9 Jan 2023 21:04:23 -0500 Subject: [PATCH] tb: axis_replay_buffer: Export send_packet This function is useful for testing other AXI stream components as well. Signed-off-by: Sean Anderson --- tb/axis_replay_buffer.py | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/tb/axis_replay_buffer.py b/tb/axis_replay_buffer.py index 63e0ba8..7002e9e 100644 --- a/tb/axis_replay_buffer.py +++ b/tb/axis_replay_buffer.py @@ -10,6 +10,28 @@ from .util import ClockEnable, lookahead, timeout BUF_SIZE = 54 +async def send_packet(signals, packet, ratio=1): + for val, last in lookahead(packet): + if 'err' in signals: + if val is None: + signals['data'].value = 0 + signals['err'].value = 1 + else: + signals['data'].value = val + signals['err'].value = 0 + else: + signals['data'].value = val + signals['valid'].value = 1 + signals['last'].value = last + await RisingEdge(signals['clk']) + while True: + await FallingEdge(signals['clk']) + if signals['ready'].value: + break + signals['valid'].value = 0 + if ratio != 1: + await ClockCycles(signals['clk'], ratio - 1, rising=False) + @timeout(30, 'us') async def test_replay(buf, in_ratio, out_ratio): buf.s_axis_valid.value = 0 @@ -31,17 +53,13 @@ async def test_replay(buf, in_ratio, out_ratio): async def send(): for packet in packets: - for val, last in lookahead(packet): - buf.s_axis_data.value = val - buf.s_axis_valid.value = 1 - buf.s_axis_last.value = last - while True: - await FallingEdge(buf.clk) - if buf.s_axis_ready.value: - break - buf.s_axis_valid.value = 0 - if in_ratio != 1: - await ClockCycles(buf.clk, in_ratio - 1, rising=False) + await send_packet({ + 'clk': buf.clk, + 'data': buf.s_axis_data, + 'valid': buf.s_axis_valid, + 'last': buf.s_axis_last, + 'ready': buf.s_axis_ready, + }, packet, in_ratio) async def recv(packet): async def handshake():