Skip to content

Commit

Permalink
gateware.iostream.IOStreamer: fix bug for incorrect sampling DDR inputs
Browse files Browse the repository at this point in the history
This also adds a testcase to check the correct sampling time.
  • Loading branch information
purdeaandrei authored and whitequark committed Aug 24, 2024
1 parent 81653bb commit 579ff96
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 2 deletions.
42 changes: 40 additions & 2 deletions software/glasgow/gateware/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,45 @@ def _map_ioshape(direction, ioshape, fn): # actually filter+map
})


class SimulatableDDRBuffer(io.DDRBuffer):
def elaborate(self, platform):
if not isinstance(self._port, io.SimulationPort):
return super().elaborate(platform)

# At the time of writing Amaranth DDRBuffer doesn't allow for simulation, this implements
# ICE40 semantics for simulation.
m = Module()

m.submodules.io_buffer = io_buffer = io.Buffer(self.direction, self.port)

if self.direction is not io.Direction.Output:
m.domains.i_domain_negedge = ClockDomain("i_domain_negedge", local=True)
m.d.comb += ClockSignal("i_domain_negedge").eq(~ClockSignal(self.i_domain))
i_ff = Signal(len(self.port), reset_less=True)
i_negedge_ff = Signal(len(self.port), reset_less=True)
i_final_ff = Signal(data.ArrayLayout(len(self.port), 2), reset_less=True)
m.d[self.i_domain] += i_ff.eq(io_buffer.i)
m.d["i_domain_negedge"] += i_negedge_ff.eq(io_buffer.i)
m.d[self.i_domain] += i_final_ff.eq(Cat(i_ff, i_negedge_ff))
m.d.comb += self.i.eq(i_final_ff)

if self.direction is not io.Direction.Input:
m.domains.o_domain_negedge = ClockDomain("o_domain_negedge", local=True)
m.d.comb += ClockSignal("o_domain_negedge").eq(~ClockSignal(self.o_domain))
o_ff = Signal(len(self.port), reset_less=True)
o_negedge_ff = Signal(len(self.port), reset_less=True)
oe_ff = Signal(reset_less=True)
m.d[self.o_domain] += o_ff.eq(self.o[0])
o1_ff = Signal(len(self.port), reset_less=True)
m.d[self.o_domain] += o1_ff.eq(self.o[1])
m.d["o_domain_negedge"] += o_negedge_ff.eq(o1_ff)
m.d[self.o_domain] += oe_ff.eq(self.oe)
m.d.comb += io_buffer.o.eq(Mux(ClockSignal(self.o_domain), o_ff, o_negedge_ff))
m.d.comb += io_buffer.oe.eq(oe_ff)

return m


class IOStreamer(wiring.Component):
"""I/O buffer to stream adapter.
Expand Down Expand Up @@ -87,8 +126,7 @@ def elaborate(self, platform):
if self._ratio == 1:
buffer_cls, latency = io.FFBuffer, 1
if self._ratio == 2:
# FIXME: should this be 2 or 3? the latency differs between i[0] and i[1]
buffer_cls, latency = io.DDRBuffer, 3
buffer_cls, latency = SimulatableDDRBuffer, 2

if isinstance(self._ports, io.PortLike):
m.submodules.buffer = buffer = buffer_cls("io", self._ports)
Expand Down
104 changes: 104 additions & 0 deletions software/tests/gateware/test_iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,110 @@ async def main_testbench(ctx):
with sim.write_vcd("test.vcd", fs_per_delta=1):
sim.run()

def test_ddr_input_sampled_correctly(self):
""" This is a latency-agnostic test, that verifies that the IOStreamer samples the inputs at the same time as the output signals change """
ports = PortGroup()
ports.clk_out = io.SimulationPort("o", 1)
ports.data_in = io.SimulationPort("i", 8)

CLK_PERIOD = 1e-6

dut = IOStreamer({
"clk_out": ("o", 1),
"data_in": ("i", 8),
}, ports, ratio=2, meta_layout=4)

expected_sample = []
actually_sampled = []

async def input_generator_tb(ctx):
"""
This generates input values at the the half-time between every falling/rising clock edge.
This is to make it very obvious what value should be sampled by each DDR edge.
Exactly which of these values will be sampled depends on the latency of the dut,
which this testcase is agnostic about.
"""
cnt = 0xff
while True:
ctx.set(ports.data_in.i, cnt)
await ctx.tick()
await ctx.delay(CLK_PERIOD/4)
cnt = (cnt - 1) & 0xff
ctx.set(ports.data_in.i, cnt)
await ctx.delay(CLK_PERIOD/2) # half a clock cycle
cnt = (cnt - 1) & 0xff

async def save_expected_sample_values_tb(ctx):
"""
This testbench looks at the clk_out port and when it sees a positive edge it knows that
IOStreamer is expected to sample the input signal, so the current state of the data_in port
becomes one of the expected sampled values. This is saved into expected_sample[] to be compared
later, when the sample actually arrives back on i_stream.
The way we look for the rising edge is a bit hairy, because the current implementation of
DDRBufferCanBeSimulated can generate glitches, so we wait DELAY_TO_AVOID_GLITCHES after
the clock edge, to make sure any glitches are resolved.
Because this is a DDR test, we also wait half a clock to save the other edge as well.
"""
while True:
DELAY_TO_AVOID_GLITCHES = CLK_PERIOD/10

await ctx.posedge(ports.clk_out.o[0])
await ctx.delay(DELAY_TO_AVOID_GLITCHES)
while ctx.get(ports.clk_out.o[0]) == 0:
await ctx.posedge(ports.clk_out.o[0])
await ctx.delay(DELAY_TO_AVOID_GLITCHES)

value_phase_0 = ctx.get(ports.data_in.i)

await ctx.delay(CLK_PERIOD / 2)

value_phase_1 = ctx.get(ports.data_in.i)

expected_sample.append((value_phase_0, value_phase_1))

async def i_stream_consumer_tb(ctx):
"""
This testbench saves all the samples coming in over i_stream
"""
while True:
payload = await stream_get(ctx,dut.i_stream)
data = payload.port.data_in.i[0], payload.port.data_in.i[1]
actually_sampled.append(data)

async def main_testbench(ctx):
"""
This testbench is the producer for o_stream, and it also serves as the main orchestrator
for the entire testcase. After it produces the stimulus, it waits a number of clock cycles
to make sure any input latency has passed, and then it verifies that the expected number
of bytes has been received, and that the expected values have been sampled.
"""
await ctx.tick()

for i in range(0,8):
await stream_put(ctx, dut.o_stream, {"meta": i, "i_en": i % 2, "port": { "clk_out": {"o": (i % 2, 0)}}})

await stream_put(ctx, dut.o_stream, {"meta": 0, "i_en": 0, "port": { "clk_out": {"o": (0, 0)}}})

for i in range(20):
await ctx.tick()
assert len(actually_sampled) == 4 # This should be checked as well, because a possible failure mode is
# if IOStreamer never generates clock edges. We don't want to end up comparing two empty lists against
# eachother.

assert actually_sampled == expected_sample, (f"Expected [" +
", ".join(f"(0x{s0:02x}, 0x{s1:02x})" for s0, s1 in expected_sample) +
"] Got [" +
", ".join(f"(0x{s0:02x}, 0x{s1:02x})" for s0, s1 in actually_sampled) + "]")

sim = Simulator(dut)
sim.add_clock(CLK_PERIOD)
sim.add_testbench(main_testbench)
sim.add_testbench(i_stream_consumer_tb, background = True)
sim.add_testbench(input_generator_tb, background=True)
sim.add_testbench(save_expected_sample_values_tb, background=True)
with sim.write_vcd("test.vcd", fs_per_delta=1):
sim.run()

def test_basic(self):
ports = PortGroup()
ports.data = port = io.SimulationPort("io", 1)
Expand Down

0 comments on commit 579ff96

Please sign in to comment.