ethernet/tb/hub.py

102 lines
3.4 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-Only
# Copyright (C) 2022 Sean Anderson <seanga2@gmail.com>
import itertools
import cocotb
from cocotb.binary import BinaryValue
from cocotb.clock import Clock
from cocotb.triggers import ClockCycles, Combine, FallingEdge, Join, RisingEdge, Timer
from cocotb.types import LogicArray
from .descramble import scramble
from .mdio_regs import BMSR, BMSR_LSTATUS, VCR, VCR_LTEST, wb_xfer
from .nrzi_encode import nrzi_encode
from .nrzi_decode import nrzi_decode
from .pcs_tx import as_nibbles, mii_send_packet, pcs_recv_packet
from .pcs_rx import frame, mii_recv_packet
from .scramble import descramble
from .util import BIT, alist
@cocotb.test(timeout_time=5, timeout_unit='us')
async def test_hub(hub):
hub.clk_125.value = BinaryValue('Z')
hub.clk_250.value = BinaryValue('Z')
hub.signal_detect.value = 0
hub.wb_cyc.value = 1
await Timer(1)
await cocotb.start(Clock(hub.clk_125, 8, units='ns').start())
await cocotb.start(Clock(hub.clk_250, 4, units='ns').start())
wb = {
'clk': hub.clk_125,
'cyc': hub.wb_cyc,
'stb': hub.wb_stb,
'we': hub.wb_we,
'addr': hub.wb_addr,
'data_write': hub.wb_data_write,
'data_read': hub.wb_data_read,
'ack': hub.wb_ack,
'err': hub.wb_err,
}
# Enable fast link stabilization for testing
for i in range(4):
await wb_xfer(wb, BIT(i + 5) + VCR, VCR_LTEST, delay=2)
packet = list(as_nibbles((0x55, *b"Hello world!")))
packet_bits = list(itertools.chain.from_iterable(frame(packet)))
itertools.chain(itertools.repeat(1, 120), packet_bits, itertools.repeat(1))
async def send_rx(i, bits):
hub.signal_detect[i].value = 1
for bit in nrzi_encode(scramble(bits)):
hub.indicate_data[i].value = bit
await Timer(8, units='ns')
hub.signal_detect[i].value = 0
hub.indicate_data[i].value = BinaryValue('X')
async def recv_tx(i, packets):
async def bits():
await ClockCycles(hub.clk_125, 1)
while True:
await RisingEdge(hub.clk_125)
yield hub.request_data[i].value
data = descramble(nrzi_decode(bits()))
for expected, valid in packets:
actual = await alist(pcs_recv_packet(None, data))
if valid:
assert actual == expected
else:
assert actual != expected
await cocotb.start(send_rx(0, itertools.chain(
itertools.repeat(1, 120),
packet_bits,
itertools.repeat(1, 120),
packet_bits,
itertools.repeat(1),
)))
await cocotb.start(send_rx(1, itertools.chain(
itertools.repeat(1, 300),
packet_bits,
itertools.repeat(1),
)))
await cocotb.start(send_rx(2, itertools.repeat(1)))
await cocotb.start(send_rx(3, itertools.repeat(1)))
receivers = [
await cocotb.start(recv_tx(0, ((packet, False),))),
await cocotb.start(recv_tx(1, ((packet, True), (packet, False)))),
await cocotb.start(recv_tx(2, ((packet, True), (packet, False)))),
await cocotb.start(recv_tx(3, ((packet, True), (packet, False)))),
]
await Combine(*(Join(t) for t in receivers))
for i in range(4):
assert not await wb_xfer(wb, BIT(i + 5) + BMSR, delay=2) & BMSR_LSTATUS
assert await wb_xfer(wb, BIT(i + 5) + BMSR, delay=2) & BMSR_LSTATUS