Skip to content

Commit

Permalink
viable release, pmu_buffer_server, pdc_client, pdc_server (mock)
Browse files Browse the repository at this point in the history
  • Loading branch information
micahpw committed Oct 28, 2024
1 parent 66895ac commit 393e200
Show file tree
Hide file tree
Showing 15 changed files with 1,284 additions and 807 deletions.
845 changes: 508 additions & 337 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 3 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,14 @@ edition = "2021"


[dependencies]
arrow = { version = "53.2.0", features = ["ipc", "csv"] }
arrow = { version = "53.2.0", features = ["ipc"] }
axum = "0.7.7"
bincode = "1.3.3"
bytes = "1.7.1"
chrono = "0.4.38"
circular-buffer = "0.1.8"
clap = { version = "4.0", features = ["derive"] }
crc = "3.2.1" # May be only needed for dev
env_logger = "0.11.5"
femme = "2.2.1"
log = "0.4.22"
rand = "0.8.5" #May be only needed for dev
serde = { version = "1.0.210", features = ["derive"] }
thiserror = "1.0.64"
tokio = { version = "1", features = ["full"] }
tower = "0.5.1"
tower-http = "0.6.1"
tracing-subscriber = "0.3.18"
uuid = { version = "1.10.0", features = ["v4"] }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
reqwest = "0.12.8"
11 changes: 2 additions & 9 deletions src/arrow_utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
#![allow(unused)]
#![allow(unused)]
use crate::frames::{ChannelDataType, ChannelInfo, ConfigurationFrame1and2_2011};
use arrow::array::{
ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt16Array,
};
use crate::frames::{ChannelDataType, ChannelInfo};
use arrow::array::{ArrayRef, Float32Array, Int16Array, UInt16Array};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::ipc::writer::FileWriter;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down
36 changes: 0 additions & 36 deletions src/client.rs

This file was deleted.

5 changes: 0 additions & 5 deletions src/errors.rs

This file was deleted.

73 changes: 54 additions & 19 deletions src/frame_parser.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
#![allow(unused)]

use crate::frames::{
calculate_crc, ConfigurationFrame1and2_2011, DataFrame2011, HeaderFrame2011,
PMUConfigurationFrame2011, PMUDataFrame, PMUDataFrameFixedFreq2011, PMUDataFrameFloatFreq2011,
PMUFrameType, PrefixFrame2011,
calculate_crc, CommandFrame2011, ConfigurationFrame1and2_2011, DataFrame2011, HeaderFrame2011,
PMUConfigurationFrame2011, PMUDataFrameFixedFreq2011, PMUDataFrameFloatFreq2011, PMUFrameType,
PrefixFrame2011,
};

// Define constants
const PREFIX_SIZE: usize = 14; // Size of HeaderFrame2011 in bytes
const CONFIG_HEADER_SIZE: usize = 6;
const TAIL_SIZE: usize = 2; // Size of TailFrame2011 in bytes

#[derive(Debug)]
pub enum ParseError {
Expand All @@ -27,6 +24,7 @@ pub enum Frame {
Prefix(PrefixFrame2011),
Configuration(ConfigurationFrame1and2_2011),
Data(DataFrame2011),
Command(CommandFrame2011),
}

pub fn parse_header(buffer: &[u8]) -> Result<HeaderFrame2011, ParseError> {
Expand All @@ -35,7 +33,25 @@ pub fn parse_header(buffer: &[u8]) -> Result<HeaderFrame2011, ParseError> {

pub fn parse_command_frame(buffer: &[u8]) -> Result<Frame, ParseError> {
// TODO/skip
todo!("Implement command frame parsing")
let prefix_slice: &[u8; PREFIX_SIZE] = buffer[..PREFIX_SIZE].try_into().unwrap();
let prefix = PrefixFrame2011::from_hex(prefix_slice).map_err(|_| ParseError::InvalidHeader)?;

let command = u16::from_be_bytes([buffer[PREFIX_SIZE], buffer[PREFIX_SIZE + 1]]);

let extframe = if buffer.len() > PREFIX_SIZE + 4 {
Some(buffer[PREFIX_SIZE + 2..buffer.len() - 2].to_vec())
} else {
None
};

let chk = u16::from_be_bytes([buffer[buffer.len() - 2], buffer[buffer.len() - 1]]);

Ok(Frame::Command(CommandFrame2011 {
prefix,
command,
extframe,
chk,
}))
}

pub fn parse_data_frames(
Expand Down Expand Up @@ -146,9 +162,6 @@ pub fn parse_data_frames(
}

pub fn parse_config_frame_1and2(buffer: &[u8]) -> Result<ConfigurationFrame1and2_2011, ParseError> {
// Unsure about return type right now. Needs to be some sort
// of nested structure.

// get the header frame struct using the parse_header_frame function

let prefix_slice: &[u8; PREFIX_SIZE] = buffer[..PREFIX_SIZE].try_into().unwrap();
Expand Down Expand Up @@ -275,30 +288,39 @@ pub fn parse_config_frame_3(buffer: &[u8]) -> Result<Frame, ParseError> {

pub fn parse_frame(
buffer: &[u8],
config: ConfigurationFrame1and2_2011,
config: Option<ConfigurationFrame1and2_2011>,
) -> Result<Frame, ParseError> {
// read first two bytes as the sync variable.
// read second byte and convert to binary
// check bits 3-0 to get version number of IEEE standard.
// if bits 3-0 == 0001, use IEEEstandard 2005
// if bits 3-0 == 0010, use IEEE standard from 2011
// if bits 3-0 do not equal 0010, throw ParseError:VersionNotSupported
println!("Reading Frame Prefix");
let sync = u16::from_be_bytes([buffer[0], buffer[1]]);
if sync >> 8 != 0xAA {
println!("Invalid Sync value");
return Err(ParseError::InvalidHeader);
}
println!("Reading version");
let version = buffer[1] & 0b1111;

match version {
1 => {
// Use IEEE standard 2005
return Err(ParseError::VersionNotSupported);
//
println!("Standard 2005 not supported");
//return Err(ParseError::VersionNotSupported);
}
2 => {
println!("Standard 2011 detected");
// Use IEEE standard 2011
// No error
}
_ => return Err(ParseError::VersionNotSupported),
_ => {
println!("Unknown version detected");
return Err(ParseError::VersionNotSupported);
}
}

// Next, get framesize variable at bytes 3-4
Expand All @@ -309,10 +331,11 @@ pub fn parse_frame(
if framesize as usize != buffer.len() {
return Err(ParseError::InvalidFrameSize);
}

println!("Calculating CRC");
let calculated_crc = calculate_crc(&buffer[..buffer.len() - 2]);
let frame_crc = u16::from_be_bytes([buffer[buffer.len() - 2], buffer[buffer.len() - 1]]);
if calculated_crc != frame_crc {
println!("Invalid CRC");
return Err(ParseError::InvalidCRC);
}

Expand All @@ -322,22 +345,34 @@ pub fn parse_frame(
// if bits 6-4 equal 010 or 011 -> parse_config_frame_1and2(buffer, framesize)
// If bits 6-4 equal 101 -> parse_config_frame_3(buffer, framesize)
// if bits 6-4 equal 100 -> parse_command_frame(buffer, framesize)
println!("Determining Frame Type");
let frame_type = (buffer[1] >> 4) & 0b111;
match frame_type {
0b000 => {
let data_frame = parse_data_frames(buffer, &config)?;
Ok(Frame::Data(data_frame))
}
0b000 => match config {
Some(config) => {
let data_frame = parse_data_frames(buffer, &config)?;
Ok(Frame::Data(data_frame))
}
None => {
println!("Configuration Frame required to parse data frame.");
Err(ParseError::InsufficientData)
}
},
0b001 => {
println!("Parsing Header");
let header = parse_header(buffer)?;
Ok(Frame::Header(header))
}
0b010 | 0b011 => {
println!("parsing config frame");
let config = parse_config_frame_1and2(buffer)?;
Ok(Frame::Configuration(config))
}
0b101 => parse_config_frame_3(buffer),
0b100 => parse_command_frame(buffer),
0b100 => {
println!("Parsing command frame");
parse_command_frame(buffer)
}
_ => Err(ParseError::InvalidFrameSize),
}
}
6 changes: 3 additions & 3 deletions src/frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub fn calculate_crc(buffer: &[u8]) -> u16 {
crc
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct PrefixFrame2011 {
pub sync: u16, // Leading byte = AA hex,
// second byte: Frame type and version
Expand Down Expand Up @@ -271,7 +271,7 @@ pub struct ChannelInfo {
pub size: usize, // Size in bytes
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ConfigurationFrame1and2_2011 {
pub prefix: PrefixFrame2011,
pub time_base: u32, // Resolution of
Expand Down Expand Up @@ -421,7 +421,7 @@ impl ConfigurationFrame1and2_2011 {
}
// This struct is repeated NUM_PMU times.
// For parsing entire configuration frame, need to take into account num_pmu.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct PMUConfigurationFrame2011 {
pub stn: [u8; 16], // Station Name 16 bytes ASCII
pub idcode: u16, // Data source ID number, identifies source of each data block.
Expand Down
36 changes: 0 additions & 36 deletions src/handshake.rs

This file was deleted.

5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// everything public in this file can be used in testing with pmu::...?
pub mod arrow_utils;
pub mod frame_buffer;
pub mod frame_parser;
pub mod frames;
//pub mod pdc_buffer_server;
pub mod arrow_utils;
pub mod pdc_buffer_server;
pub mod pdc_client;
pub mod pdc_server;
Loading

0 comments on commit 393e200

Please sign in to comment.