Skip to content

Commit

Permalink
arrow frame creation and corresponding testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
micahpw committed Oct 26, 2024
1 parent 71d002e commit 66895ac
Show file tree
Hide file tree
Showing 8 changed files with 1,261 additions and 53 deletions.
614 changes: 613 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ edition = "2021"


[dependencies]
arrow = { version = "53.2.0", features = ["ipc", "csv"] }
axum = "0.7.7"
bincode = "1.3.3"
bytes = "1.7.1"
chrono = "0.4.38"
circular-buffer = "0.1.8"
clap = { version = "4.0", features = ["derive"] }
crc = "3.2.1" # May be only needed for dev
crc = "3.2.1" # May be only needed for dev
env_logger = "0.11.5"
femme = "2.2.1"
log = "0.4.22"
rand = "0.8.5" #May be only needed for dev
rand = "0.8.5" #May be only needed for dev
serde = { version = "1.0.210", features = ["derive"] }
thiserror = "1.0.64"
tokio = { version = "1", features = ["full"] }
tower-http = "0.6.1"
tracing-subscriber = "0.3.18"
uuid = { version = "1.10.0", features = ["v4"] }

[dev-dependencies]
Expand Down
197 changes: 197 additions & 0 deletions src/arrow_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
#![allow(unused)]
#![allow(unused)]
use crate::frames::{ChannelDataType, ChannelInfo, ConfigurationFrame1and2_2011};
use arrow::array::{
ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, StringArray, UInt16Array,
};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::ipc::writer::FileWriter;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::Arc;

pub fn build_arrow_schema(channel_map: &HashMap<String, ChannelInfo>) -> Schema {
let mut fields = vec![Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
)];

for (name, info) in channel_map {
match info.data_type {
ChannelDataType::PhasorFloat => {
fields.push(Field::new(
&format!("{}_magnitude", name),
DataType::Float32,
false,
));
fields.push(Field::new(
&format!("{}_angle", name),
DataType::Float32,
false,
));
}
ChannelDataType::PhasorFixed => {
fields.push(Field::new(&format!("{}_X", name), DataType::Int16, false));
fields.push(Field::new(&format!("{}_Y", name), DataType::Int16, false));
}
ChannelDataType::AnalogFloat
| ChannelDataType::FreqFloat
| ChannelDataType::DfreqFloat => {
fields.push(Field::new(name, DataType::Float32, false));
}
ChannelDataType::AnalogFixed
| ChannelDataType::FreqFixed
| ChannelDataType::DfreqFixed => {
fields.push(Field::new(name, DataType::Int16, false));
}
ChannelDataType::Digital => {
fields.push(Field::new(name, DataType::UInt16, false));
}
}
}

Schema::new(fields)
}

fn extract_float32_values(
buffer: &[u8],
frame_size: usize,
channel_info: &ChannelInfo,
) -> Float32Array {
let mut values = Vec::new();
for frame in buffer.chunks(frame_size) {
if frame.len() < frame_size {
break;
}
let data_start = channel_info.offset;
let data_end = data_start + channel_info.size;
if data_end <= frame.len() {
let data = &frame[data_start..data_end];
let value = f32::from_be_bytes(data.try_into().unwrap());
values.push(value);
}
}
Float32Array::from(values)
}

fn extract_int16_values(
buffer: &[u8],
frame_size: usize,
channel_info: &ChannelInfo,
) -> Int16Array {
let mut values = Vec::new();
for frame in buffer.chunks(frame_size) {
if frame.len() < frame_size {
break;
}
let data_start = channel_info.offset;
let data_end = data_start + channel_info.size;
if data_end <= frame.len() {
let data = &frame[data_start..data_end];
let value = i16::from_be_bytes(data.try_into().unwrap());
values.push(value);
}
}
Int16Array::from(values)
}

fn extract_uint16_values(
buffer: &[u8],
frame_size: usize,
channel_info: &ChannelInfo,
) -> UInt16Array {
let mut values = Vec::new();
for frame in buffer.chunks(frame_size) {
if frame.len() < frame_size {
break;
}
let data_start = channel_info.offset;
let data_end = data_start + channel_info.size;
if data_end <= frame.len() {
let data = &frame[data_start..data_end];
let value = u16::from_be_bytes(data.try_into().unwrap());
values.push(value);
}
}
UInt16Array::from(values)
}

pub fn extract_channel_values(
buffer: &[u8],
frame_size: usize,
channel_info: &ChannelInfo,
) -> Vec<ArrayRef> {
match channel_info.data_type {
ChannelDataType::PhasorFloat => {
let mut values = Vec::new();
for frame in buffer.chunks(frame_size) {
if frame.len() < frame_size {
break;
}
let data_start = channel_info.offset;
let data_end = data_start + channel_info.size;
if data_end <= frame.len() {
let data = &frame[data_start..data_end];
let magnitude = f32::from_be_bytes(data[..4].try_into().unwrap());
let angle = f32::from_be_bytes(data[4..].try_into().unwrap());
values.push((magnitude, angle));
}
}
vec![
Arc::new(Float32Array::from(
values.iter().map(|(m, _)| *m).collect::<Vec<_>>(),
)),
Arc::new(Float32Array::from(
values.iter().map(|(_, a)| *a).collect::<Vec<_>>(),
)),
]
}
ChannelDataType::PhasorFixed => {
let mut values = Vec::new();
for frame in buffer.chunks(frame_size) {
if frame.len() < frame_size {
break;
}
let data_start = channel_info.offset;
let data_end = data_start + channel_info.size;
if data_end <= frame.len() {
let data = &frame[data_start..data_end];
let magnitude = i16::from_be_bytes(data[..2].try_into().unwrap());
let angle = i16::from_be_bytes(data[2..].try_into().unwrap());
values.push((magnitude, angle));
}
}
vec![
Arc::new(Int16Array::from(
values.iter().map(|(m, _)| *m).collect::<Vec<_>>(),
)),
Arc::new(Int16Array::from(
values.iter().map(|(_, a)| *a).collect::<Vec<_>>(),
)),
]
}
ChannelDataType::AnalogFloat | ChannelDataType::FreqFloat | ChannelDataType::DfreqFloat => {
vec![Arc::new(extract_float32_values(
buffer,
frame_size,
channel_info,
))]
}
ChannelDataType::AnalogFixed | ChannelDataType::FreqFixed | ChannelDataType::DfreqFixed => {
vec![Arc::new(extract_int16_values(
buffer,
frame_size,
channel_info,
))]
}
ChannelDataType::Digital => {
vec![Arc::new(extract_uint16_values(
buffer,
frame_size,
channel_info,
))]
}
}
}
126 changes: 126 additions & 0 deletions src/frames.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![allow(unused)]
use std::collections::HashMap;
// GOAL: Turn Sequence of Bytes in TCP packets into IEEE C37.118.2 formatted structs.
// Define structures common to all frames

Expand Down Expand Up @@ -251,6 +252,25 @@ impl<T> PMUDataFrame<T> {
pub type PMUDataFrameFixedFreq2011 = PMUDataFrame<i16>;
pub type PMUDataFrameFloatFreq2011 = PMUDataFrame<f32>;

#[derive(Debug, Clone)]
pub enum ChannelDataType {
PhasorFloat, // 8 bytes (magnitude + angle as f32)
PhasorFixed, // 4 bytes (magnitude + angle as i16)
AnalogFloat, // 4 bytes (f32)
AnalogFixed, // 2 bytes (i16)
Digital, // 2 bytes (u16)
FreqFloat, // 4 bytes (f32)
FreqFixed, // 2 bytes (i16)
DfreqFloat, // 4 bytes (f32)
DfreqFixed, // 2 bytes (i16)
}
#[derive(Debug, Clone)]
pub struct ChannelInfo {
pub data_type: ChannelDataType,
pub offset: usize, // Offset from start of PMU data section
pub size: usize, // Size in bytes
}

#[derive(Debug)]
pub struct ConfigurationFrame1and2_2011 {
pub prefix: PrefixFrame2011,
Expand Down Expand Up @@ -292,6 +312,112 @@ impl ConfigurationFrame1and2_2011 {

total_size
}
pub fn get_channel_map(&self) -> HashMap<String, ChannelInfo> {
let mut channel_map = HashMap::new();
let mut current_offset = 2; // Start after STAT
let prefix_offset = 14;

for pmu_config in &self.pmu_configs {
let station_name = String::from_utf8_lossy(&pmu_config.stn).trim().to_string();
let channel_names = pmu_config.get_column_names();
let id_code = pmu_config.idcode;
// Add frequency and DFREQ channels
let freq_type = if pmu_config.format & 0x0008 != 0 {
ChannelDataType::FreqFloat
} else {
ChannelDataType::FreqFixed
};
let dfreq_type = if pmu_config.format & 0x0008 != 0 {
ChannelDataType::DfreqFloat
} else {
ChannelDataType::DfreqFixed
};

// Add phasor channels
let phasor_type = if pmu_config.format & 0x0002 != 0 {
ChannelDataType::PhasorFloat
} else {
ChannelDataType::PhasorFixed
};

let phasor_size = pmu_config.phasor_size();
for name in channel_names.iter().take(pmu_config.phnmr as usize) {
channel_map.insert(
name.clone(),
ChannelInfo {
data_type: phasor_type.clone(),
offset: current_offset + prefix_offset,
size: phasor_size,
},
);
current_offset += phasor_size;
}

let freq_size = pmu_config.freq_dfreq_size();
channel_map.insert(
format!("{}_{}_FREQ", station_name, id_code),
ChannelInfo {
data_type: freq_type,
offset: current_offset + prefix_offset,
size: freq_size,
},
);
current_offset += freq_size;

channel_map.insert(
format!("{}_{}_DFREQ", station_name, id_code),
ChannelInfo {
data_type: dfreq_type,
offset: current_offset + prefix_offset,
size: freq_size,
},
);
current_offset += freq_size;

// Add analog channels
let analog_type = if pmu_config.format & 0x0004 != 0 {
ChannelDataType::AnalogFloat
} else {
ChannelDataType::AnalogFixed
};

let analog_size = pmu_config.analog_size();
for name in channel_names
.iter()
.skip(pmu_config.phnmr as usize) // skip the freq/dfreq values and the number of phasors
.take(pmu_config.annmr as usize)
{
channel_map.insert(
name.clone(),
ChannelInfo {
data_type: analog_type.clone(),
offset: current_offset + prefix_offset,
size: analog_size,
},
);
current_offset += analog_size;
}

// Add digital channels
for name in channel_names
.iter()
.skip(pmu_config.phnmr as usize + pmu_config.annmr as usize)
.take(pmu_config.dgnmr as usize)
{
channel_map.insert(
name.clone(),
ChannelInfo {
data_type: ChannelDataType::Digital,
offset: current_offset + prefix_offset,
size: 2,
},
);
current_offset += 2;
}
}

channel_map
}
}
// This struct is repeated NUM_PMU times.
// For parsing entire configuration frame, need to take into account num_pmu.
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pub mod frame_buffer;
pub mod frame_parser;
pub mod frames;
//pub mod pdc_buffer_server;
pub mod arrow_utils;
pub mod pdc_client;
Loading

0 comments on commit 66895ac

Please sign in to comment.