Skip to content

Commit

Permalink
Use samllvec to reduce heap use, use inline functions and improvments
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Dec 6, 2024
1 parent bfcb1e4 commit 2dfb84e
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 100 deletions.
1 change: 1 addition & 0 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ lazy_static = "1.4.0"
thiserror = "1.0.47"
futures = { version = "0.3.28" }
async-trait = "0.1.82"
smallvec = "1.13.2"

# VISS
axum = { version = "0.6.20", optional = true, features = ["ws"] }
Expand Down
141 changes: 108 additions & 33 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
use crate::permissions::{PermissionError, Permissions};
pub use crate::types;

use smallvec::SmallVec;

use crate::query;
pub use crate::types::{ChangeType, DataType, DataValue, EntryType};

Expand All @@ -22,7 +24,7 @@ use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -112,6 +114,10 @@ pub enum Field {
ActuatorTarget,
MetadataUnit,
}
#[derive(Debug, Clone)]
pub struct StackVecField {
pub svec: SmallVec<[Field; 3]>,
}

#[derive(Default)]
pub struct Database {
Expand Down Expand Up @@ -142,7 +148,7 @@ pub struct QueryField {
pub struct ChangeNotification {
pub id: i32,
pub update: EntryUpdate,
pub fields: HashSet<Field>,
pub fields: StackVecField,
}

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -201,7 +207,7 @@ pub struct QuerySubscription {
}

pub struct ChangeSubscription {
entries: HashMap<i32, HashSet<Field>>,
entries: HashMap<i32, StackVecField>,
sender: broadcast::Sender<EntryUpdates>,
permissions: Permissions,
}
Expand Down Expand Up @@ -233,7 +239,63 @@ pub struct EntryUpdate {
pub unit: Option<String>,
}

impl StackVecField {
#[inline]
pub fn new() -> Self {
Self {
svec: SmallVec::new(),
}
}

#[inline]
pub fn push(&mut self, value: Field) {
self.svec.push(value);
}

#[inline]
pub fn contains(&self, element: &Field) -> bool {
self.svec.contains(element)
}

#[inline]
pub fn extend_from_stack(&mut self, other: &StackVecField) {
self.svec.extend(other.svec.iter().cloned());
}

#[inline]
pub fn with_elements(elements: SmallVec<[Field; 3]>) -> Self {
Self { svec: elements }
}

#[inline]
pub fn are_disjoint(&self, other: &StackVecField) -> bool {
for item in &self.svec {
if other.svec.contains(item) {
return false; // Found a common element
}
}
true // No common elements found
}

#[inline]
pub fn is_empty(&self) -> bool {
self.svec.is_empty()
}

#[inline]
pub fn iter(&self) -> impl Iterator<Item = &Field> {
self.svec.iter()
}
}

impl Default for StackVecField {
fn default() -> Self {
Self::new()
}
}

impl Entry {
#[inline]
pub fn diff(&self, mut update: EntryUpdate) -> EntryUpdate {
if let Some(datapoint) = &update.datapoint {
if self.metadata.change_type != ChangeType::Continuous {
Expand Down Expand Up @@ -710,16 +772,17 @@ impl Entry {
self.lag_datapoint = self.datapoint.clone();
}

pub fn apply(&mut self, update: EntryUpdate) -> HashSet<Field> {
let mut changed = HashSet::new();
#[inline]
pub fn apply(&mut self, update: EntryUpdate) -> StackVecField {
let mut changed = StackVecField::new();
if let Some(datapoint) = update.datapoint {
self.lag_datapoint = self.datapoint.clone();
self.datapoint = datapoint;
changed.insert(Field::Datapoint);
changed.push(Field::Datapoint);
}
if let Some(actuator_target) = update.actuator_target {
self.actuator_target = actuator_target;
changed.insert(Field::ActuatorTarget);
changed.push(Field::ActuatorTarget);
}

if let Some(updated_allowed) = update.allowed {
Expand Down Expand Up @@ -755,10 +818,23 @@ impl Subscriptions {

pub async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
changed: Option<&HashMap<i32, StackVecField>>,
db: &Database,
) -> Result<Option<HashMap<String, ()>>, NotificationError> {
let mut error = None;

for sub in &self.change_subscriptions {
match sub.notify(changed, db).await {
Ok(_) => {}
Err(err) => error = Some(err),
}
}

//Leave method here if error is none and query_subscription is empty
if error.is_none() && self.query_subscriptions.is_empty() {
return Ok(None);
}

let mut lag_updates: HashMap<String, ()> = HashMap::new();
for sub in &self.query_subscriptions {
match sub.notify(changed, db).await {
Expand All @@ -774,13 +850,6 @@ impl Subscriptions {
}
}

for sub in &self.change_subscriptions {
match sub.notify(changed, db).await {
Ok(_) => {}
Err(err) => error = Some(err),
}
}

match error {
Some(err) => Err(err),
None => {
Expand Down Expand Up @@ -837,7 +906,7 @@ impl Subscriptions {
impl ChangeSubscription {
async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
changed: Option<&HashMap<i32, StackVecField>>,
db: &Database,
) -> Result<(), NotificationError> {
let db_read = db.authorized_read_access(&self.permissions);
Expand All @@ -846,7 +915,7 @@ impl ChangeSubscription {
let mut matches = false;
for (id, changed_fields) in changed {
if let Some(fields) = self.entries.get(id) {
if !fields.is_disjoint(changed_fields) {
if !fields.are_disjoint(changed_fields) {
matches = true;
break;
}
Expand All @@ -858,25 +927,25 @@ impl ChangeSubscription {
let mut notifications = EntryUpdates::default();
for (id, changed_fields) in changed {
if let Some(fields) = self.entries.get(id) {
if !fields.is_disjoint(changed_fields) {
if !fields.are_disjoint(changed_fields) {
match db_read.get_entry_by_id(*id) {
Ok(entry) => {
let mut update = EntryUpdate::default();
let mut notify_fields = HashSet::new();
let mut notify_fields = StackVecField::new();
// TODO: Perhaps make path optional
update.path = Some(entry.metadata.path.clone());
if changed_fields.contains(&Field::Datapoint)
&& fields.contains(&Field::Datapoint)
{
update.datapoint = Some(entry.datapoint.clone());
notify_fields.insert(Field::Datapoint);
notify_fields.push(Field::Datapoint);
}
if changed_fields.contains(&Field::ActuatorTarget)
&& fields.contains(&Field::ActuatorTarget)
{
update.actuator_target =
Some(entry.actuator_target.clone());
notify_fields.insert(Field::ActuatorTarget);
notify_fields.push(Field::ActuatorTarget);
}
// fill unit field always
update.unit.clone_from(&entry.metadata.unit);
Expand Down Expand Up @@ -922,16 +991,16 @@ impl ChangeSubscription {
match db_read.get_entry_by_id(*id) {
Ok(entry) => {
let mut update = EntryUpdate::default();
let mut notify_fields = HashSet::new();
let mut notify_fields = StackVecField::new();
// TODO: Perhaps make path optional
update.path = Some(entry.metadata.path.clone());
if fields.contains(&Field::Datapoint) {
update.datapoint = Some(entry.datapoint.clone());
notify_fields.insert(Field::Datapoint);
notify_fields.push(Field::Datapoint);
}
if fields.contains(&Field::ActuatorTarget) {
update.actuator_target = Some(entry.actuator_target.clone());
notify_fields.insert(Field::ActuatorTarget);
notify_fields.push(Field::ActuatorTarget);
}
notifications.updates.push(ChangeNotification {
id: *id,
Expand Down Expand Up @@ -989,7 +1058,7 @@ impl QuerySubscription {
}
fn check_if_changes_match(
query: &CompiledQuery,
changed_origin: Option<&HashMap<i32, HashSet<Field>>>,
changed_origin: Option<&HashMap<i32, StackVecField>>,
db: &DatabaseReadAccess,
) -> bool {
match changed_origin {
Expand Down Expand Up @@ -1039,7 +1108,7 @@ impl QuerySubscription {
}
fn generate_input(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
changed: Option<&HashMap<i32, StackVecField>>,
db: &DatabaseReadAccess,
) -> Option<impl ExecutionInput> {
let id_used_in_query = QuerySubscription::check_if_changes_match(&self.query, changed, db);
Expand All @@ -1055,7 +1124,7 @@ impl QuerySubscription {

async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
changed: Option<&HashMap<i32, StackVecField>>,
db: &Database,
) -> Result<Option<impl query::ExecutionInput>, NotificationError> {
let db_read = db.authorized_read_access(&self.permissions);
Expand Down Expand Up @@ -1208,7 +1277,7 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> {
&mut self,
path: &str,
update: EntryUpdate,
) -> Result<HashSet<Field>, UpdateError> {
) -> Result<StackVecField, UpdateError> {
match self.db.path_to_id.get(path) {
Some(id) => self.update(*id, update),
None => Err(UpdateError::NotFound),
Expand All @@ -1228,7 +1297,7 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> {
}
}

pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result<HashSet<Field>, UpdateError> {
pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result<StackVecField, UpdateError> {
match self.db.entries.get_mut(&id) {
Some(entry) => {
if update.path.is_some()
Expand Down Expand Up @@ -1569,7 +1638,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {

let cleanup_needed = {
let changed = {
let mut changed = HashMap::<i32, HashSet<Field>>::new();
let mut changed = HashMap::<i32, StackVecField>::new();
for (id, update) in updates {
debug!("setting id {} to {:?}", id, update);
match db_write.update(id, update) {
Expand Down Expand Up @@ -1631,7 +1700,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {

pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
valid_entries: HashMap<i32, StackVecField>,
buffer_size: Option<usize>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
Expand Down Expand Up @@ -4262,7 +4331,10 @@ pub mod tests {

let mut stream = broker
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
HashMap::from([(
id1,
StackVecField::with_elements(smallvec::smallvec![Field::Datapoint]),
)]),
buffer_size,
)
.await
Expand Down Expand Up @@ -4371,7 +4443,10 @@ pub mod tests {

match broker
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
HashMap::from([(
id1,
StackVecField::with_elements(smallvec::smallvec![Field::Datapoint]),
)]),
// 1001 is just outside valid range 0-1000
Some(1001),
)
Expand Down
Loading

0 comments on commit 2dfb84e

Please sign in to comment.