Skip to content

Commit

Permalink
example/rust/streaming_data.rs
Browse files Browse the repository at this point in the history
revised python equivalent also
  • Loading branch information
2bndy5 committed Jan 30, 2025
1 parent 2d9b0bf commit afebdc8
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 8 deletions.
16 changes: 8 additions & 8 deletions examples/python/streaming_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ def make_payloads(size: int = 32) -> list[bytes]:
# prefix payload with a sequential letter to indicate which
# payloads were lost (if any)
buff = bytes([i + (65 if 0 <= i < 26 else 71)])
for j in range(size - 1):
char = bool(j >= (size - 1) / 2 + abs((size - 1) / 2 - i))
char |= bool(j < (size - 1) / 2 - abs((size - 1) / 2 - i))
max_len = size - 1
half_size = int(max_len / 2)
abs_diff = abs(half_size - i)
for j in range(max_len):
char = bool(j >= half_size + abs_diff)
char |= bool(j < half_size - abs_diff)
buff += bytes([char + 48])
stream.append(buff)
return stream
Expand Down Expand Up @@ -80,11 +83,9 @@ def tx(self, count: int = 1, size: int = 32):
self.radio.as_tx() # ensures the nRF24L01 is in TX mode
for cnt in range(count): # transmit the same payloads this many times
self.radio.flush_tx() # clear the TX FIFO so we can use all 3 levels
# NOTE the write_only parameter does not initiate sending
buf_iter = 0 # iterator of payloads for the while loop
failures = 0 # keep track of manual retries
start_timer = time.monotonic() * 1000 # start timer
for buf_index in range(size): # cycle through all payloads in stream
for buf_iter in range(size): # cycle through all payloads in stream
while not self.radio.write(stream[buf_iter]):
# upload to TX FIFO failed because TX FIFO is full.
# check for transmission errors
Expand All @@ -101,10 +102,9 @@ def tx(self, count: int = 1, size: int = 32):
print(
"Make sure slave() node is listening. Quitting master_fifo()"
)
buf_iter = size + 1 # be sure to exit the while loop
cnt = count # be sure to exit the outer for loop
self.radio.flush_tx() # discard all payloads in TX FIFO
break
buf_iter += 1
# wait for radio to finish transmitting everything in the TX FIFO
while self.radio.get_fifo_state(True) != FifoState.Empty and failures < 99:
# get_fifo_state() also update()s the StatusFlags
Expand Down
4 changes: 4 additions & 0 deletions examples/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ linux = ["dep:linux-embedded-hal", "rf24-rs/std"]
[[bin]]
name = "getting-started"
path = "src/main.rs"

[[bin]]
name = "streaming-data"
path = "src/bin/streaming_data.rs"
239 changes: 239 additions & 0 deletions examples/rust/src/bin/streaming_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
#![no_std]

use core::time::Duration;
use std::{io::Write, string::ToString};

use anyhow::{anyhow, Result};
use rf24::{
radio::{prelude::*, RF24},
FifoState, PaLevel, StatusFlags,
};
#[cfg(feature = "linux")]
use rf24_rs_examples::linux::{
print, println, BoardHardware, CdevPin as DigitalOutImpl, Delay as DelayImpl,
SpidevDevice as SpiImpl,
};
#[cfg(feature = "linux")]
extern crate std;

/// The length of the stream and the length of each payload within the stream.
const SIZE: usize = 32;

/// A struct to drive our example app
struct App {
/// Any platform-specific functionality is abstracted into this object.
#[allow(dead_code, reason = "keep board's peripheral objects alive")]
board: BoardHardware,
/// Our instantiated RF24 object.
radio: RF24<SpiImpl, DigitalOutImpl, DelayImpl>,
}

impl App {
pub fn new() -> Result<Self> {
// instantiate a hardware peripherals on the board
let mut board = BoardHardware::default()?;

// instantiate radio object using board's hardware
let radio = RF24::new(
board.default_ce_pin()?,
BoardHardware::default_spi_device()?,
DelayImpl,
);
Ok(Self { board, radio })
}

/// Setup the radio for this example.
///
/// This will initialize and configure the [`App::radio`] object.
pub fn setup(&mut self, radio_number: u8) -> Result<()> {
// initialize the radio hardware
self.radio.init().map_err(|e| anyhow!("{e:?}"))?;

// defaults to PaLevel::Max. Use PaLevel::Low for PA/LNA testing
self.radio
.set_pa_level(PaLevel::Low)
.map_err(|e| anyhow!("{e:?}"))?;

// we'll be using a 32-byte payload lengths
self.radio
.set_payload_length(SIZE as u8)
.map_err(|e| anyhow!("{e:?}"))?;

let address = [b"1Node", b"2Node"];
self.radio
.open_rx_pipe(1, address[radio_number as usize])
.map_err(|e| anyhow!("{e:?}"))?;
self.radio
.open_tx_pipe(address[1 - radio_number as usize])
.map_err(|e| anyhow!("{e:?}"))?;
Ok(())
}

/// return a list of payloads
fn make_payloads() -> [[u8; SIZE]; SIZE] {
// we'll use `size` for the number of payloads in the list and the
// payloads' length
let stream = [[0u8; SIZE]; SIZE];
static MAX_LEN: i8 = SIZE as i8 - 1;
static HALF_LEN: i8 = MAX_LEN / 2;
for i in 0..SIZE as i8 {
// prefix payload with a sequential letter to indicate which
// payloads were lost (if any)
let mut buff = stream[i as usize];
buff[0] = (i as u8) + if i < 26 { 65 } else { 71 };
let abs_diff = (HALF_LEN - i).abs_diff(0) as i8;
for j in 0..MAX_LEN {
let c = j >= (HALF_LEN + abs_diff) || j < (HALF_LEN - abs_diff);
buff[j as usize + 1] = c as u8 + 48;
}
}
stream
}

/// The TX role.
///
/// Uses the [`App::radio`] as a transmitter.
pub fn tx(&mut self, count: u8) -> Result<()> {
// put radio into TX mode
self.radio.as_tx().map_err(|e| anyhow!("{e:?}"))?;
let mut flags = StatusFlags::default();
for cnt in 0..count {
let stream = Self::make_payloads();
self.radio.flush_tx().map_err(|e| anyhow!("{e:?}"))?;
let mut failures = 0u8;
let start = std::time::Instant::now();
for (buff_index, buf) in stream.iter().enumerate().take(SIZE) {
while !self
.radio
.write(buf, false, true)
.map_err(|e| anyhow!("{e:?}"))?
{
// upload to TX FIFO failed because TX FIFO is full.
// check for transmission errors
self.radio.update().map_err(|e| anyhow!("{e:?}"))?;
self.radio.get_status_flags(&mut flags);
if flags.tx_df() {
// transmission failed
failures += 1; // increment manual retry count
// rewrite() resets the tx_df flag and reuses top level of TX FIFO
self.radio.rewrite().map_err(|e| anyhow!("{e:?}"))?;
if failures > 99 {
break;
}
}
if failures > 99 && buff_index < 7 && cnt < 2 {
// we need to prevent an infinite loop
println!("Make sure other node is listening. Aborting stream");
break;
}
}
if failures > 99 {
// escape for loop when too many transmissions fail
break;
}
}
// wait for radio to finish transmitting everything in the TX FIFO
while failures < 99
&& self
.radio
.get_fifo_state(true)
.map_err(|e| anyhow!("{e:?}"))?
!= FifoState::Empty
{
self.radio.get_status_flags(&mut flags);
if flags.tx_df() {
failures += 1;
self.radio.rewrite().map_err(|e| anyhow!("{e:?}"))?;
}
}
let end = std::time::Instant::now();
println!(
"Transmission took {} ms with {} failures detected",
end.saturating_duration_since(start).as_millis(),
failures,
);
}
Ok(())
}

/// The RX role.
///
/// Uses the [`App::radio`] as a receiver.
pub fn rx(&mut self, timeout: u8) -> Result<()> {
let _end = Duration::from_secs(timeout as u64);
// put radio into active RX mode
self.radio.as_rx().map_err(|e| anyhow!("{e:?}"))?;
let mut count = 0u16;
let mut end_time =
std::time::Instant::now() + std::time::Duration::from_secs(timeout as u64);
while std::time::Instant::now() < end_time {
if self.radio.available().map_err(|e| anyhow!("{e:?}"))? {
let mut buf = [0u8; SIZE];
count += 1;
self.radio
.read(&mut buf, None)
.map_err(|e| anyhow!("{e:?}"))?;
// print pipe number and payload length and payload
println!("{} - {count}", std::string::String::from_utf8_lossy(&buf));
// reset timeout
end_time =
std::time::Instant::now() + std::time::Duration::from_secs(timeout as u64);
}
}

// It is highly recommended to keep the radio idling in an inactive TX mode
self.radio.as_tx().map_err(|e| anyhow!("{e:?}"))?;
Ok(())
}

pub fn set_role(&mut self) -> Result<bool> {
let prompt = "*** Enter 'R' for receiver role.\n\
*** Enter 'T' for transmitter role.\n\
*** Enter 'Q' to quit example.";
println!("{prompt}");
let mut input = std::string::String::new();
std::io::stdin().read_line(&mut input)?;
let mut inputs = input.trim().split(' ');
let role = inputs
.next()
.map(|v| v.to_uppercase())
.unwrap_or("?".to_string());
if role.starts_with('T') {
let count = inputs
.next()
.and_then(|v| v.parse::<u8>().ok())
.unwrap_or(5);
self.tx(count)?;
return Ok(true);
} else if role.starts_with('R') {
let timeout = inputs
.next()
.and_then(|v| v.parse::<u8>().ok())
.unwrap_or(6);
self.rx(timeout)?;
return Ok(true);
} else if role.starts_with('Q') {
self.radio.power_down().map_err(|e| anyhow!("{e:?}"))?;
return Ok(false);
}
println!("{role} is an unrecognized input. Please try again.");
Ok(true)
}
}

fn main() -> Result<()> {
let mut app = App::new()?;
let mut input = std::string::String::new();
print!("Which radio is this? Enter '0' or '1'. Defaults to '0' ");
std::io::stdout().flush()?;
std::io::stdin().read_line(&mut input)?;
let radio_number = input
.trim()
.chars()
.next()
.map(|c| if c == '1' { 1 } else { 0 })
.unwrap_or_default();
app.setup(radio_number)?;
while app.set_role()? {}
Ok(())
}

0 comments on commit afebdc8

Please sign in to comment.