274 lines
7.3 KiB
Python
274 lines
7.3 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-Only
|
|
# Copyright (C) 2022 Sean Anderson <seanga2@gmail.com>
|
|
|
|
import enum
|
|
import itertools
|
|
|
|
import cocotb
|
|
from cocotb.clock import Clock
|
|
from cocotb.regression import TestFactory
|
|
from cocotb.triggers import ClockCycles, Edge, RisingEdge, FallingEdge, Timer
|
|
from cocotb.types import LogicArray
|
|
|
|
from .util import *
|
|
|
|
class Code(enum.Enum):
|
|
_0 = (0b11110, '0')
|
|
_1 = (0b01001, '1')
|
|
_2 = (0b10100, '2')
|
|
_3 = (0b10101, '3')
|
|
_4 = (0b01010, '4')
|
|
_5 = (0b01011, '5')
|
|
_6 = (0b01110, '6')
|
|
_7 = (0b01111, '7')
|
|
_8 = (0b10010, '8')
|
|
_9 = (0b10011, '9')
|
|
_A = (0b10110, 'A')
|
|
_B = (0b10111, 'B')
|
|
_C = (0b11010, 'C')
|
|
_D = (0b11011, 'D')
|
|
_E = (0b11100, 'E')
|
|
_F = (0b11101, 'F')
|
|
_I = (0b11111, 'I')
|
|
_J = (0b11000, 'J')
|
|
_K = (0b10001, 'K')
|
|
_T = (0b01101, 'T')
|
|
_R = (0b00111, 'R')
|
|
_H = (0b00100, 'H')
|
|
_V0 = (0b00000, 'V')
|
|
_V1 = (0b00001, 'V')
|
|
_V2 = (0b00010, 'V')
|
|
_V3 = (0b00011, 'V')
|
|
_V4 = (0b00101, 'V')
|
|
_V5 = (0b00110, 'V')
|
|
_V6 = (0b01000, 'V')
|
|
_V7 = (0b01100, 'V')
|
|
_V8 = (0b10000, 'V')
|
|
_V9 = (0b11001, 'V')
|
|
|
|
@classmethod
|
|
def _missing_(cls, value):
|
|
return cls.__members__['_' + value]
|
|
|
|
@classmethod
|
|
def decode(cls, bits):
|
|
value = 0
|
|
for bit in bits:
|
|
value = (value << 1) | bit
|
|
return cls(value)
|
|
|
|
@classproperty
|
|
def encode(cls):
|
|
if not hasattr(cls, '_encode'):
|
|
cls._encode = { data: cls(f"{data:X}") for data in range(16) }
|
|
return cls._encode
|
|
|
|
def __new__(cls, code, name):
|
|
self = object.__new__(cls)
|
|
self._value_ = code
|
|
return self
|
|
|
|
def __init__(self, code, name):
|
|
self._name_ = name
|
|
try:
|
|
self.data = int(name, 16)
|
|
except ValueError:
|
|
self.data = None
|
|
|
|
def __hash__(self):
|
|
return hash(self.value)
|
|
|
|
def __int__(self):
|
|
if self.data is None:
|
|
raise ValueError
|
|
return self.data
|
|
|
|
def __index__(self):
|
|
return self._value_
|
|
|
|
def __repr__(self):
|
|
return f"{self.__class__.__name__}({self._value_:#07b}, '{self.name}')"
|
|
|
|
def __str__(self):
|
|
return f"/{self._name_}/"
|
|
|
|
def __iter__(self):
|
|
code = self.value
|
|
for _ in range(5):
|
|
yield (code & 0x10) >> 4
|
|
code <<= 1
|
|
|
|
def as_nibbles(data):
|
|
for byte in data:
|
|
yield byte >> 4
|
|
yield byte & 0xf
|
|
|
|
def as_codes(nibbles):
|
|
for nibble in nibbles:
|
|
if nibble is None:
|
|
yield Code('H')
|
|
else:
|
|
yield Code.encode[nibble]
|
|
|
|
def frame(data):
|
|
return itertools.chain(
|
|
(Code('J'), Code('K')),
|
|
# Chop off the SSD
|
|
as_codes(data[2:]),
|
|
(Code('T'), Code('R')),
|
|
)
|
|
|
|
async def mii_send_packet(pcs, nibbles):
|
|
await FallingEdge(pcs.tx_ce)
|
|
for nibble in nibbles:
|
|
pcs.tx_en.value = 1
|
|
pcs.tx_er.value = 0
|
|
if nibble is None:
|
|
pcs.tx_er.value = 1
|
|
else:
|
|
pcs.txd.value = nibble
|
|
await FallingEdge(pcs.tx_ce)
|
|
|
|
pcs.tx_en.value = 0
|
|
pcs.tx_er.value = 0
|
|
pcs.txd.value = LogicArray("XXXX")
|
|
await FallingEdge(pcs.tx_ce)
|
|
|
|
async def mii_recv_packet(pcs):
|
|
while not (pcs.rx_ce.value and pcs.rx_dv.value):
|
|
await RisingEdge(pcs.rx_clk)
|
|
|
|
while pcs.rx_dv.value:
|
|
if pcs.rx_ce.value:
|
|
if pcs.rx_er.value:
|
|
yield None
|
|
else:
|
|
yield pcs.rxd.value
|
|
await RisingEdge(pcs.rx_clk)
|
|
|
|
class PCSError(Exception):
|
|
pass
|
|
|
|
class BadSSD(PCSError):
|
|
pass
|
|
|
|
class PrematureEnd(PCSError):
|
|
pass
|
|
|
|
async def pcs_recv_packet(pcs):
|
|
rx_bits = ReverseList([1] * 10)
|
|
|
|
async def read_bit():
|
|
await RisingEdge(pcs.tx_clk)
|
|
rx_bits.append(pcs.pma_data_tx.value)
|
|
|
|
async def read_code():
|
|
for _ in range(5):
|
|
await read_bit()
|
|
|
|
async def bad_ssd():
|
|
while not all(rx_bits[9:0]):
|
|
await read_bit()
|
|
raise BadSSDError()
|
|
|
|
while all(rx_bits[9:2]) or rx_bits[0]:
|
|
await read_bit()
|
|
|
|
if Code.decode(rx_bits[9:5]) != Code('I') or \
|
|
Code.decode(rx_bits[4:0]) != Code('J'):
|
|
await bad_ssd()
|
|
|
|
await read_code()
|
|
if Code.decode(rx_bits[4:0]) != Code('K'):
|
|
await bad_ssd()
|
|
|
|
yield 0x5
|
|
await read_code()
|
|
|
|
yield 0x5
|
|
while any(rx_bits[9:0]):
|
|
await read_code()
|
|
code = Code.decode(rx_bits[9:5])
|
|
if code == Code('T') and Code.decode(rx_bits[4:0]) == Code('R'):
|
|
return
|
|
yield code.data
|
|
raise PrematureEndError()
|
|
|
|
async def pcs_send_codes(pcs, codes, valids):
|
|
await send_recovered_bits(pcs.rx_clk, pcs.pma_data_rx, pcs.pma_data_rx_valid,
|
|
itertools.chain(*codes), valids)
|
|
|
|
@cocotb.test(timeout_time=10, timeout_unit='us')
|
|
async def test_tx(pcs):
|
|
async def tx_ce():
|
|
pcs.tx_ce.value = 1
|
|
while True:
|
|
await ClockCycles(pcs.tx_clk, 1, False)
|
|
pcs.tx_ce.value = 0
|
|
await ClockCycles(pcs.tx_clk, 4, False)
|
|
pcs.tx_ce.value = 1
|
|
|
|
pcs.tx_en.value = 0
|
|
pcs.tx_er.value = 0
|
|
pcs.txd.value = LogicArray("XXXX")
|
|
pcs.link_status.value = 1
|
|
await cocotb.start(tx_ce())
|
|
await Timer(1)
|
|
await cocotb.start(Clock(pcs.tx_clk, 8, units='ns').start())
|
|
await FallingEdge(pcs.tx_ce)
|
|
|
|
# Test that all bytes can be transmitted
|
|
packet = list(as_nibbles((0x55, 0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF)))
|
|
# And ensure errors are propagated
|
|
packet.insert(10, None)
|
|
await cocotb.start(mii_send_packet(pcs, packet))
|
|
assert packet == await alist(pcs_recv_packet(pcs))
|
|
|
|
# Test start errors
|
|
await cocotb.start(mii_send_packet(pcs, [None]))
|
|
assert [0x5, 0x5, None] == await alist(pcs_recv_packet(pcs))
|
|
await cocotb.start(mii_send_packet(pcs, [0x5, None]))
|
|
assert [0x5, 0x5, None] == await alist(pcs_recv_packet(pcs))
|
|
|
|
@timeout(10, 'us')
|
|
async def test_rx(pcs, valids):
|
|
pcs.pma_data_rx.value = LogicArray('11')
|
|
pcs.pma_data_rx_valid.value = 2
|
|
pcs.link_status.value = 1
|
|
await Timer(1)
|
|
await cocotb.start(Clock(pcs.rx_clk, 8, units='ns').start())
|
|
|
|
packet = list(as_nibbles((0x55, 0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF)))
|
|
# And test errors too
|
|
packet.insert(10, None)
|
|
|
|
await cocotb.start(pcs_send_codes(pcs, itertools.chain(
|
|
frame(packet),
|
|
# Bad SSDs
|
|
(Code('C'), Code('I'), Code('I')),
|
|
(Code('J'), Code('I'), Code('I')),
|
|
(Code('J'), Code('H'), Code('I'), Code('I')),
|
|
# Premature end, plus two clocks since we don't have instant turnaround
|
|
(Code('J'), Code('K'), Code('I'), Code('I'), (1,1)),
|
|
# Packet spacing
|
|
*((*frame([0x55, 0x55]), (1,) * i) for i in range(10))
|
|
), valids))
|
|
|
|
assert packet == await alist(mii_recv_packet(pcs))
|
|
|
|
for _ in range(3):
|
|
while not (pcs.receiving.value and pcs.rx_er.value and pcs.rx_ce.value):
|
|
await RisingEdge(pcs.rx_clk)
|
|
assert pcs.rxd.value == 0xE
|
|
await FallingEdge(pcs.receiving)
|
|
|
|
assert [0x5, 0x5, None] == await alist(mii_recv_packet(pcs))
|
|
|
|
# Test packet spacing
|
|
for _ in range(10):
|
|
assert [0x5, 0x5] == await alist(mii_recv_packet(pcs))
|
|
|
|
rx_tests = TestFactory(test_rx)
|
|
rx_tests.add_option('valids', (one_valid, two_valid, rand_valid, saw_valid()))
|
|
rx_tests.generate_tests()
|