Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config and status endpoints. data filtering query params, reconnect #17

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
/logs
13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,28 @@

[package]
name = "pmu"
version = "0.1.0"
version = "0.0.1"
edition = "2021"


[dependencies]
arrow = { version = "53.2.0", features = ["ipc"] }
axum = "0.7.7"
bytes = "1.7.1"
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
tower = "0.5.1"
tower-http = "0.6.1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
reqwest = "0.12.8"
reqwest = { version = "0.12.8", features = ["json"] }
tokio = { version = "1.0", features = ["full"] }
chrono = { version = "0.4", features = ["serde"] }
arrow = { version = "53.2.0", features = ["ipc"] }
26 changes: 26 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM rust:1.75 AS builder

RUN mkdir /app
WORKDIR /app
COPY . /app

RUN cargo build --release

FROM debian:bookworm-slim AS app

RUN apt-get update && apt-get install -y libssl-dev && rm -rf /var/lib/apt/lists/*

WORKDIR /usr/local/bin
COPY --from=builder /app/target/release/pmu .
COPY tests/test_data ./tests/test_data

# Environment variables
ENV PDC_HOST=localhost \
PDC_PORT=8123 \
PDC_IDCODE=8080 \
SERVER_PORT=7734 \
BUFFER_DURATION_SECS=120

EXPOSE 7734

CMD ["pmu", "server"]
68 changes: 67 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,50 @@ Running the CLI using
cargo run --help
```

## How to ...
## Running With Docker-Compose

### Using the mock-pdc server (all platforms)

*All data will be identical with the exception of timestamps*

First build the containers
```console
docker-compose build
```

Then run
```console
docker-compose up
```

### Using openPDC (x86/64)
*Data will be randomized and simulate a single PDC/PMU*

Assuming you have build the container using the instructions above. You can download and run the open-pdc server and pmu buffer using the command below.

```console
docker-compose -f docker-compose-openpdc.yml up --no-attach open-pdc
```

### Login to OpenPDC

You can log in to the local [openPDC](http://127.0.0.1:8280) instance and update the configuration. The default username and password are.

**USER**
```
.\admin
```

**PASSWORD**
```
admin
```

You can also install the latest release of openPDC with more features [here](https://github.com/GridProtectionAlliance/openPDC/releases/tag/v2.9.148)


## Running the Application with Cargo


### ... start the Mock PDC server

Expand Down Expand Up @@ -75,3 +118,26 @@ s = requests.get(url, timeout=10)
df = pd.read_feather(io.BytesIO(requests.get(url, timeout=10).content))
df.head()
```


## Buidling the application Binary

```console
cargo build --release
```

This will build the application binary in .target/release. For windows, the executable will be ./target/release/pmu.exe

You can add the executable to your path and run the commands similar to the commands above replacing **cargo run** with **pmu**.
```console
cargo run server
```
vs
``` console
pmu server
```
or
```console
./target/release/pmu mock-pdc
```

48 changes: 48 additions & 0 deletions docker-compose-openpdc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
services:
open-pdc:
image: gridprotectionalliance/openpdc:v2.9.148
container_name: openPDC
networks:
- pmu-network

ports:
- "8280:8280"
- "8900:8900"

pmu-server:
build: .
container_name: pmu-server
volumes:
- ./logs:/usr/local/bin/logs
command:
[
"pmu",
"server",
"--pdc-ip",
"open-pdc",
"--pdc-port",
"8900",
"--pdc-idcode",
"235",
"--http-port",
"7734",
"--duration",
"120",
]
ports:
- "7734:7734"
networks:
- pmu-network
restart: on-failure:3
deploy:
restart_policy:
condition: on-failure
delay: 15s
max_attempts: 3
window: 120s


networks:
pmu-network:
driver: bridge
47 changes: 47 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
---
services:
mock-pdc:
build: .
container_name: mock-pdc
command: ["pmu", "mock-pdc", "--ip", "mock-pdc", "--port", "8123"]
ports:
- "8123:8123"
volumes:
- ./tests/test_data:/app/tests/test_data
networks:
- pmu-network
#healthcheck:
# test: ["CMD", "nc", "-z", "localhost", "8123"]
# interval: 5s
# timeout: 3s
# retries: 10

pmu-server:
build: .
container_name: pmu-server
command:
[
"pmu",
"server",
"--pdc-ip",
"mock-pdc",
"--pdc-port",
"8123",
"--pdc-idcode",
"8080",
"--http-port",
"7734",
"--duration",
"120",
]
ports:
- "7734:7734"
networks:
- pmu-network
#depends_on:
# mock-pdc:
#condition: service_healthy

networks:
pmu-network:
driver: bridge
2 changes: 1 addition & 1 deletion src/frame_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ pub fn parse_config_frame_1and2(buffer: &[u8]) -> Result<ConfigurationFrame1and2
};

// determine the next length of bytes to read populate the chnam field.
let chnam_bytes_len = 16 * (phnmr + annmr + 16 * dgnmr) as usize;
let chnam_bytes_len: usize = (16 * (phnmr + annmr + 16 * dgnmr)) as usize;
// read from offset to chname_bytes_len into a vec<u8> variable.
let chnam = buffer[offset..offset + chnam_bytes_len].to_vec();
offset += chnam_bytes_len as usize;
Expand Down
116 changes: 113 additions & 3 deletions src/frames.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#![allow(unused)]
use serde::ser::{SerializeStruct, Serializer};
use serde::{self, Deserialize, Serialize};
use std::collections::HashMap;
// GOAL: Turn Sequence of Bytes in TCP packets into IEEE C37.118.2 formatted structs.
// Define structures common to all frames
Expand All @@ -24,7 +26,7 @@ pub fn calculate_crc(buffer: &[u8]) -> u16 {
crc
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct PrefixFrame2011 {
pub sync: u16, // Leading byte = AA hex,
// second byte: Frame type and version
Expand Down Expand Up @@ -74,7 +76,7 @@ impl PrefixFrame2011 {
}
}

#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct HeaderFrame2011 {
pub prefix: PrefixFrame2011,
pub data_source: [u8; 32], // Data source identifier 32 byte ASCII
Expand Down Expand Up @@ -271,7 +273,7 @@ pub struct ChannelInfo {
pub size: usize, // Size in bytes
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct ConfigurationFrame1and2_2011 {
pub prefix: PrefixFrame2011,
pub time_base: u32, // Resolution of
Expand Down Expand Up @@ -483,4 +485,112 @@ impl PMUConfigurationFrame2011 {

channel_names
}
pub fn get_phasor_columns(&self) -> Vec<String> {
let mut channel_names = Vec::new();
let station_name = String::from_utf8_lossy(&self.stn).trim().to_string();

for chunk in self.chnam.chunks(16).take(self.phnmr as usize) {
let channel = String::from_utf8_lossy(chunk).trim().to_string();
let full_name = format!("{}_{}_{}", station_name, self.idcode, channel);
channel_names.push(full_name);
}

channel_names
}

pub fn get_analog_columns(&self) -> Vec<String> {
let mut channel_names = Vec::new();
let station_name = String::from_utf8_lossy(&self.stn).trim().to_string();

for chunk in self
.chnam
.chunks(16)
.skip(self.phnmr as usize)
.take(self.annmr as usize)
{
let channel = String::from_utf8_lossy(chunk).trim().to_string();
let full_name = format!("{}_{}_{}", station_name, self.idcode, channel);
channel_names.push(full_name);
}

channel_names
}

pub fn get_digital_columns(&self) -> Vec<String> {
let mut channel_names = Vec::new();
let station_name = String::from_utf8_lossy(&self.stn).trim().to_string();

for chunk in self
.chnam
.chunks(16)
.skip((self.phnmr + self.annmr) as usize)
.take(self.dgnmr as usize)
{
let channel = String::from_utf8_lossy(chunk).trim().to_string();
let full_name = format!("{}_{}_{}", station_name, self.idcode, channel);
channel_names.push(full_name);
}

channel_names
}
}

// Implement custom serialization
impl Serialize for PMUConfigurationFrame2011 {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// Calculate number of fields we'll serialize
let num_fields = 11; // Adjust based on number of fields we're serializing

let mut state = serializer.serialize_struct("PMUConfigurationFrame2011", num_fields)?;

// Convert station name from [u8; 16] to String, trimming nulls and whitespace
let stn_str = String::from_utf8_lossy(&self.stn)
.trim_end_matches(char::from(0))
.trim()
.to_string();

state.serialize_field("stn", &stn_str)?;
state.serialize_field("idcode", &self.idcode)?;
state.serialize_field("format", &self.format)?;
state.serialize_field("phnmr", &self.phnmr)?;
state.serialize_field("annmr", &self.annmr)?;
state.serialize_field("dgnmr", &self.dgnmr)?;

// Use get_column_names() instead of raw chnam
let channel_names = self.get_column_names();
state.serialize_field("channels", &channel_names)?;

// You might want to add more meaningful representations of these units
state.serialize_field("phunit", &self.phunit)?;
state.serialize_field("anunit", &self.anunit)?;
state.serialize_field("digunit", &self.digunit)?;
state.serialize_field("fnom", &self.fnom)?;
state.serialize_field("cfgcnt", &self.cfgcnt)?;

// Add some computed properties that might be useful
state.serialize_field("is_polar", &self.is_phasor_polar())?;

// Add format flags as readable booleans
let format_flags = FormatFlags {
freq_dfreq_float: self.format & 0x0008 != 0,
analog_float: self.format & 0x0004 != 0,
phasor_float: self.format & 0x0002 != 0,
phasor_polar: self.format & 0x0001 != 0,
};
state.serialize_field("format_flags", &format_flags)?;

state.end()
}
}

// Helper structure for format flags
#[derive(Serialize)]
struct FormatFlags {
freq_dfreq_float: bool,
analog_float: bool,
phasor_float: bool,
phasor_polar: bool,
}
Loading