ethernet/tb/mdio.py

182 lines
4.9 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-Only
# Copyright (C) 2022 Sean Anderson <seanga2@gmail.com>
import random
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import ClockCycles, Edge, FallingEdge, First, RisingEdge, Timer
from cocotb.types import LogicArray
from .util import ClockEnable
def to_bits(val, width):
for bit in range(width - 1, -1, -1):
yield (val >> bit) & 1
def frame(phyad, regad, data=None, *, st=0b01, op=None, preamble_bits=32):
for _ in range(preamble_bits):
yield 1
yield from to_bits(st, 2)
if op is None:
op = 0b10 if data is None else 0b01
yield from to_bits(op, 2)
yield from to_bits(phyad, 5)
yield from to_bits(regad, 5)
if data is None:
return
yield 1
yield 0
yield from to_bits(data, 16)
# Should be 50, but reduced to simulate faster
MDIO_RATIO = 3
async def mdio_read(mdio, phyad, regad, **kwargs):
ret = 0
for bit in frame(phyad, regad, **kwargs):
await RisingEdge(mdio.ce)
mdio.mdi.value = bit
await FallingEdge(mdio.ce)
mdio.mdi.value = LogicArray('X')
for bit in range(19):
await RisingEdge(mdio.ce)
await RisingEdge(mdio.clk)
if bit < 2:
continue
if bit == 2:
if not mdio.mdo_valid.value:
ret = None
continue
if ret is None:
assert not mdio.mdo_valid.value
continue
else:
assert mdio.mdo_valid.value
ret <<= 1
ret |= mdio.mdo.value
return ret
async def mdio_write(mdio, phyad, regad, data, **kwargs):
for bit in frame(phyad, regad, data, **kwargs):
await RisingEdge(mdio.ce)
mdio.mdi.value = bit
await FallingEdge(mdio.ce)
mdio.mdi.value = LogicArray('X')
async def wb_read(mdio, addr, data):
while not (mdio.cyc.value and mdio.stb.value):
await FallingEdge(mdio.clk)
assert not mdio.we.value
assert mdio.addr.value == addr
mdio.data_read.value = data
mdio.ack.value = 1
await RisingEdge(mdio.clk)
mdio.ack.value = 0
mdio.data_read.value = LogicArray('X' * 16)
await FallingEdge(mdio.clk)
assert not mdio.stb.value
async def wb_write(mdio, addr, data):
while not (mdio.cyc.value and mdio.stb.value):
await FallingEdge(mdio.clk)
mdio.ack.value = 1
await RisingEdge(mdio.clk)
assert mdio.we.value
assert mdio.addr.value == addr
assert mdio.data_write.value == data
mdio.ack.value = 0
await FallingEdge(mdio.clk)
assert not mdio.stb.value
async def setup(mdio):
mdio.mdi.value = 0
mdio.ack.value = 0
mdio.err.value = 0
mdio.data_read.value = LogicArray('X' * 16)
await cocotb.start(ClockEnable(mdio.clk, mdio.ce, MDIO_RATIO))
await Timer(1)
await cocotb.start(Clock(mdio.clk, 8, units='ns').start())
@cocotb.test(timeout_time=50, timeout_unit='us')
async def test_mdio(mdio):
await setup(mdio)
reads = [(i, random.randrange(0, 0xFFFF)) for i in range(16)]
writes = [(i, random.randrange(0, 0xFFFF)) for i in range(16)]
random.shuffle(reads)
random.shuffle(writes)
async def rw_mdio():
for (read, write) in zip(reads, writes):
assert await mdio_read(mdio, 0, read[0]) == read[1]
await mdio_write(mdio, 0, write[0], write[1])
await cocotb.start(rw_mdio())
for (read, write) in zip(reads, writes):
await wb_read(mdio, read[0], read[1])
await wb_write(mdio, write[0], write[1])
@cocotb.test(timeout_time=20, timeout_unit='us')
async def test_badmdio(mdio):
await setup(mdio)
async def nowb():
await First(RisingEdge(mdio.cyc), RisingEdge(mdio.stb))
assert False, "Unexpected wishbone transaction"
await cocotb.start(nowb())
# Force mdi low to ensure we get exactly 31 bits in the preamble
async def rw_mdio(phyad, **kwargs):
mdio.mdi.value = 0
await FallingEdge(mdio.clk)
assert await mdio_read(mdio, phyad, 0, **kwargs) is None
mdio.mdi.value = 0
await FallingEdge(mdio.clk)
await mdio_write(mdio, phyad, 0, 0, **kwargs)
await rw_mdio(0, preamble_bits=31)
await rw_mdio(0, st=0b00)
await rw_mdio(0, op=0b00)
await rw_mdio(1)
await rw_mdio(0x10)
@cocotb.test(timeout_time=20, timeout_unit='us')
async def test_badwb(mdio):
await setup(mdio)
async def bad_resp():
# No ack
await ClockCycles(mdio.stb, 2, False)
# Error response
for _ in range(2):
while not (mdio.cyc.value and mdio.stb.value):
await FallingEdge(mdio.clk)
mdio.err.value = 1
await RisingEdge(mdio.clk)
mdio.err.value = 0
await FallingEdge(mdio.clk)
assert not mdio.stb.value
await cocotb.start(bad_resp())
for _ in range(2):
assert await mdio_read(mdio, 0, 0) is None
await mdio_write(mdio, 0, 0, 0)