2023-01-09 20:05:31 -06:00
# SPDX-License-Identifier: AGPL-3.0-Only
# Copyright (C) 2022 Sean Anderson <seanga2@gmail.com>
import enum
import random
import zlib
import cocotb
2023-01-13 23:02:53 -06:00
from cocotb.binary import BinaryValue
2023-01-09 20:05:31 -06:00
from cocotb.clock import Clock
from cocotb.regression import TestFactory
from cocotb.triggers import ClockCycles, Edge, FallingEdge, First, RisingEdge, Timer
2023-01-13 23:05:50 -06:00
from cocotb.utils import get_sim_time, get_sim_steps
2023-01-09 20:05:31 -06:00
from . import axis_replay_buffer
from .pcs_rx import mii_recv_packet
from .util import alist, lookahead, timeout
import os
skip_slow = not os.environ.get('RUN_SLOW', False)
async def init(mac):
2023-01-13 23:02:53 -06:00
mac.clk.value = BinaryValue('Z')
2023-01-10 22:52:46 -06:00
mac.rst.value = 1
2023-01-09 20:05:31 -06:00
mac.mii_col.value = 0
mac.mii_crs.value = 0
mac.axis_valid.value = 0
mac.axis_err.value = 0
mac.short_backoff.value = 1
2023-01-13 23:08:38 -06:00
mac.half_duplex.value = 1
2023-01-09 20:05:31 -06:00
await Timer(1)
2023-01-10 22:52:46 -06:00
mac.rst.value = 0
2023-01-09 20:05:31 -06:00
await cocotb.start(Clock(mac.clk, 8, units='ns').start())
2023-01-10 22:52:46 -06:00
await FallingEdge(mac.clk)
2023-01-09 20:05:31 -06:00
2023-01-11 16:23:10 -06:00
def send_packet(mac, packet, **kwargs):
2023-01-09 20:05:31 -06:00
return axis_replay_buffer.send_packet({
'clk': mac.clk,
'data': mac.axis_data,
'err': mac.axis_err,
'valid': mac.axis_valid,
'last': mac.axis_last,
'ready': mac.axis_ready,
2023-01-11 16:23:10 -06:00
}, packet, **kwargs)
2023-01-09 20:05:31 -06:00
class MACError(Exception):
class FrameCheckError(MACError):
class AlignmentError(MACError):
class PaddingError(MACError):
async def nibbles_to_bytes(nibbles):
while True:
lo = await anext(nibbles)
except StopAsyncIteration:
hi = await anext(nibbles)
except StopAsyncIteration:
raise AlignmentError
yield hi << 4 | lo
SFD = 0xd5
async def skip_preamble(packet):
saw_ssd = False
async for byte in packet:
if saw_ssd:
yield byte
elif byte == SFD:
saw_ssd = True
FCS_GOOD = zlib.crc32(b'\0\0\0\0')
async def check_fcs(packet):
packet = await alist(packet)
fcs = zlib.crc32(bytes(packet))
if fcs != FCS_GOOD:
raise FrameCheckError
elif len(packet) < 64:
raise PaddingError
return packet[:-4]
def recv_packet(mac):
return check_fcs(skip_preamble(nibbles_to_bytes(mii_recv_packet(mac, {
'ce': mac.mii_tx_ce,
'data': mac.mii_txd,
'valid': mac.mii_tx_en,
async def expect_bad_fcs(mac):
await recv_packet(mac)
except FrameCheckError:
raise AssertionError
class Status(enum.Enum):
OK = enum.auto()
GAVE_UP = enum.auto()
LATE_COLLISION = enum.auto()
UNDERFLOW = enum.auto()
async def get_status(mac):
ok = 0
gave_up = 0
late = 0
underflow = 0
while not (ok or gave_up or late or underflow) or mac.mii_tx_en.value:
ok += mac.transmit_ok.value
gave_up += mac.gave_up.value
late += mac.late_collision.value
underflow += mac.underflow.value
await FallingEdge(mac.clk)
assert ok + gave_up + late + underflow == 1
if ok:
return Status.OK
elif gave_up:
return Status.GAVE_UP
elif late:
return Status.LATE_COLLISION
elif underflow:
return Status.UNDERFLOW
2023-01-11 16:23:10 -06:00
async def start(mac, packet, **kwargs):
send = await cocotb.start(send_packet(mac, packet, **kwargs))
2023-01-09 20:05:31 -06:00
status = await cocotb.start(get_status(mac))
return send, status
2023-03-05 12:26:35 -06:00
COCOTB_17 = tuple(int(part) for part in cocotb.__version__.split('.')[:2]) >= (1, 7)
2023-01-09 20:05:31 -06:00
async def collide(mac, ns, duration=16):
2023-03-05 12:26:35 -06:00
# newer cocotbs order writes before the clock
ns += COCOTB_17
2023-01-09 20:05:31 -06:00
while not mac.mii_tx_en.value:
await FallingEdge(mac.clk)
if ns > 4:
await Timer(ns - 4, 'ns')
mac.mii_col.value = 1
await Timer(duration, 'ns')
mac.mii_col.value = 0
def randtime(min_bytes, max_bytes):
return random.randrange(min_bytes * BYTE_TIME_NS, max_bytes * BYTE_TIME_NS)
async def restart(mac, ns):
await cocotb.start(collide(mac, ns))
await expect_bad_fcs(mac)
def compare(actual, expected):
if actual[:len(expected)] != expected:
raise AssertionError
async def ok(mac, packet, status, ns=None):
if ns is not None:
await cocotb.start(collide(mac, ns))
compare(await recv_packet(mac), packet)
assert await status.join() == Status.OK
@timeout(50, 'us')
async def test_send(mac, ratio):
await init(mac)
packets = (
async def send_packets():
for packet in packets:
2023-01-11 16:23:10 -06:00
await send_packet(mac, packet, ratio=ratio)
2023-01-09 20:05:31 -06:00
await cocotb.start(send_packets())
for i, packet in enumerate(packets):
status = await cocotb.start(get_status(mac))
recv = await cocotb.start(recv_packet(mac))
# Measure the IPG to ensure throughput
2023-01-13 23:05:50 -06:00
start = get_sim_time('step')
2023-01-09 20:05:31 -06:00
while not mac.mii_tx_en.value:
await RisingEdge(mac.clk)
# The first IPG may not be exact
if i:
2023-01-13 23:05:50 -06:00
assert get_sim_time('step') - start == get_sim_steps(12 * 80 - 4, 'ns')
2023-01-09 20:05:31 -06:00
compare(await recv.join(), packet)
assert await status.join() == Status.OK
send_tests = TestFactory(test_send)
send_tests.add_option('ratio', (1, BIT_TIME_NS))
@cocotb.test(timeout_time=100, timeout_unit='us')
async def test_underflow(mac):
await init(mac)
async def underflow(mac, send, status):
await expect_bad_fcs(mac)
await send.join()
assert await status.join() == Status.UNDERFLOW
2023-01-11 16:23:10 -06:00
send, status = await start(mac, range(32), ratio=30)
2023-01-09 20:05:31 -06:00
await underflow(mac, send, status)
2023-01-11 16:26:20 -06:00
#from math import floor
# Solution to (IPG + PREAMBLE + x) * BIT_TIME = ratio * x
# end = floor(200 / (ratio - 10))
for x in (56, 58, 60):
# Error on last byte
send, status = await start(mac, [*range(x), None])
await underflow(mac, send, status)
2023-01-11 16:31:42 -06:00
# Error on last byte valid for one cycle
send, status = await start(mac, [*range(x), None],
last_extra=(12 + 8 + x) * 10 - x + 5)
await underflow(mac, send, status)
2023-01-11 16:26:20 -06:00
# Error with more to come
send, status = await start(mac, [*range(x), None, 1])
await underflow(mac, send, status)
# Underflow with collision
2023-01-11 16:27:35 -06:00
send, status = await start(mac, [*range(x), None], last_extra=(12 + 8 + x) * 10)
2023-01-11 16:26:20 -06:00
await restart(mac, (8 + x) * BYTE_TIME_NS - 1)
if x <= 56:
await underflow(mac, send, status)
await send.join()
assert await status.join() == Status.LATE_COLLISION
2023-01-09 20:05:31 -06:00
@cocotb.test(timeout_time=1250, timeout_unit='us', skip=skip_slow)
async def test_backoff(mac):
await init(mac)
packet = list(range(32))
for collisions in (15, 16):
send, status = await start(mac, packet)
then = None
for n in range(collisions):
await restart(mac, 0)
2023-01-13 23:05:50 -06:00
now = get_sim_time('step')
2023-01-09 20:05:31 -06:00
if then is not None:
2023-01-13 23:05:50 -06:00
assert now - then <= \
get_sim_steps((8 + 4 + 12 + 2 ** min(n, 10)) * BYTE_TIME_NS, 'ns')
2023-01-09 20:05:31 -06:00
then = now
if collisions == 16:
assert await status.join() == Status.GAVE_UP
await ok(mac, packet, status)
@cocotb.test(timeout_time=10, timeout_unit='us')
async def test_defer(mac):
await init(mac)
# Skip to the end of IPG_LATE
mac.mii_crs.value = 1
await Timer(13 * BYTE_TIME_NS, 'ns')
packet = list(range(32))
send, status = await start(mac, packet)
# Ensure IPG_EARLY works
await Timer(13 * BYTE_TIME_NS, 'ns')
assert not mac.mii_tx_en.value
# Test the early 2/3s; not exact since the last 1/3 takes the slack
mac.mii_crs.value = 0
await Timer(7 * BYTE_TIME_NS, 'ns')
mac.mii_crs.value = 1
await Timer(6 * BYTE_TIME_NS, 'ns')
assert not mac.mii_tx_en.value
# And the late
mac.mii_crs.value = 0
await Timer(8 * BYTE_TIME_NS, 'ns')
mac.mii_crs.value = 1
await Timer(5 * BYTE_TIME_NS, 'ns')
assert mac.mii_tx_en.value
assert await status.join() == Status.OK
@cocotb.test(timeout_time=150, timeout_unit='us')
async def test_collision(mac):
await init(mac)
async def late(mac, packet, ns):
send, status = await start(mac, packet)
await restart(mac, ns)
await send.join()
assert await status.join() == Status.LATE_COLLISION
packet = list(range(32))
send, status = await start(mac, packet)
await restart(mac, randtime(0, 8))
await restart(mac, randtime(8, 40))
await restart(mac, randtime(40, 64))
await restart(mac, 64 * BYTE_TIME_NS - 1)
await ok(mac, packet, status, 72 * BYTE_TIME_NS)
await late(mac, packet, 64 * BYTE_TIME_NS)
await late(mac, packet, randtime(64, 72))
await late(mac, packet, (8 + 64) * BYTE_TIME_NS - 1)
packet = list(range(256))
send, status = await start(mac, packet)
await restart(mac, randtime(8, 40))
await restart(mac, randtime(40, 64))
await restart(mac, 64 * BYTE_TIME_NS - 1)
await ok(mac, packet, status, (8 + 256 + 4) * BYTE_TIME_NS)
await late(mac, packet, 64 * BYTE_TIME_NS)
await late(mac, packet, randtime(64, 256))
await late(mac, packet, 72 * BYTE_TIME_NS - 1)
2023-01-13 23:08:38 -06:00
@cocotb.test(timeout_time=10, timeout_unit='us')
async def test_full_duplex(mac):
await init(mac)
mac.half_duplex.value = 0
# Skip to the end of IPG_LATE
await Timer(13 * BYTE_TIME_NS, 'ns')
mac.mii_crs.value = 1
packet = list(range(32))
send, status = await start(mac, packet)
recv = await cocotb.start(ok(mac, packet, status, 16 * BYTE_TIME_NS))
# No deferral
await Timer(13 * BYTE_TIME_NS, 'ns')
assert mac.mii_tx_en.value
# No collision
await recv