Skip to content

Commit

Permalink
refactor: Correlations (#1115)
Browse files Browse the repository at this point in the history
Co-authored-by: Nikhil Sinha <[email protected]>
  • Loading branch information
de-sh and nikhilsinhaparseable authored Jan 28, 2025
1 parent d58ed54 commit 12c507c
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 225 deletions.
257 changes: 151 additions & 106 deletions src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,25 @@
*
*/

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use actix_web::{http::header::ContentType, Error};
use chrono::Utc;
use datafusion::error::DataFusionError;
use http::StatusCode;
use itertools::Itertools;
use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use serde::{Deserialize, Serialize};
use serde_json::Error as SerdeError;
use tokio::sync::RwLock;
use tracing::{error, trace, warn};
use tracing::error;

use crate::{
handlers::http::rbac::RBACError,
handlers::http::{
rbac::RBACError,
users::{CORRELATION_DIR, USERS_ROOT_DIR},
},
option::CONFIG,
query::QUERY_SESSION,
rbac::{map::SessionKey, Users},
Expand All @@ -39,167 +43,208 @@ use crate::{
utils::{get_hash, user_auth_for_query},
};

pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);
pub static CORRELATIONS: Lazy<Correlations> = Lazy::new(Correlations::default);

#[derive(Debug, Default)]
pub struct Correlation(RwLock<Vec<CorrelationConfig>>);
type CorrelationMap = HashMap<CorrelationId, CorrelationConfig>;

impl Correlation {
//load correlations from storage
#[derive(Debug, Default, derive_more::Deref)]
pub struct Correlations(RwLock<CorrelationMap>);

impl Correlations {
// Load correlations from storage
pub async fn load(&self) -> anyhow::Result<()> {
let store = CONFIG.storage().get_object_store();
let all_correlations = store.get_all_correlations().await.unwrap_or_default();

let correlations: Vec<CorrelationConfig> = all_correlations
.into_iter()
.flat_map(|(_, correlations_bytes)| correlations_bytes)
.filter_map(|correlation| {
serde_json::from_slice(&correlation)
.inspect_err(|e| {
error!("Unable to load correlation: {e}");
})
.ok()
})
.collect();
for correlations_bytes in all_correlations.values().flatten() {
let correlation = match serde_json::from_slice::<CorrelationConfig>(correlations_bytes)
{
Ok(c) => c,
Err(e) => {
error!("Unable to load correlation file : {e}");
continue;
}
};

self.write()
.await
.insert(correlation.id.to_owned(), correlation);
}

let mut s = self.0.write().await;
s.extend(correlations);
Ok(())
}

pub async fn list_correlations_for_user(
pub async fn list_correlations(
&self,
session_key: &SessionKey,
user_id: &str,
) -> Result<Vec<CorrelationConfig>, CorrelationError> {
let correlations = self.0.read().await.iter().cloned().collect_vec();

let mut user_correlations = vec![];
let permissions = Users.get_permissions(session_key);

for c in correlations {
let tables = &c
for correlation in self.read().await.values() {
let tables = &correlation
.table_configs
.iter()
.map(|t| t.table_name.clone())
.collect_vec();
if user_auth_for_query(&permissions, tables).is_ok() && c.user_id == user_id {
user_correlations.push(c);
if user_auth_for_query(&permissions, tables).is_ok() {
user_correlations.push(correlation.clone());
}
}

Ok(user_correlations)
}

pub async fn get_correlation(
&self,
correlation_id: &str,
user_id: &str,
) -> Result<CorrelationConfig, CorrelationError> {
let read = self.0.read().await;
let correlation = read
.iter()
.find(|c| c.id == correlation_id && c.user_id == user_id)
.cloned();

correlation.ok_or_else(|| {
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
"Unable to find correlation with ID- {correlation_id}"
)))
})
self.read()
.await
.get(correlation_id)
.cloned()
.ok_or_else(|| {
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
"Unable to find correlation with ID- {correlation_id}"
)))
})
}

pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> {
// save to memory
let mut s = self.0.write().await;
s.retain(|c| c.id != correlation.id);
s.push(correlation.clone());
Ok(())
/// Create correlation associated with the user
pub async fn create(
&self,
mut correlation: CorrelationConfig,
session_key: &SessionKey,
) -> Result<CorrelationConfig, CorrelationError> {
correlation.id = get_hash(Utc::now().timestamp_micros().to_string().as_str());
correlation.validate(session_key).await?;

// Update in storage
let correlation_bytes = serde_json::to_vec(&correlation)?.into();
let path = correlation.path();
CONFIG
.storage()
.get_object_store()
.put_object(&path, correlation_bytes)
.await?;

// Update in memory
self.write().await.insert(
correlation.id.to_owned(),
correlation.clone(),
);

Ok(correlation)
}

pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> {
// now delete from memory
let read_access = self.0.read().await;
/// Update existing correlation for the user and with the same ID
pub async fn update(
&self,
mut updated_correlation: CorrelationConfig,
session_key: &SessionKey,
) -> Result<CorrelationConfig, CorrelationError> {
// validate whether user has access to this correlation object or not
let correlation = self.get_correlation(&updated_correlation.id).await?;
if correlation.user_id != updated_correlation.user_id {
return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
r#"User "{}" isn't authorized to update correlation with ID - {}"#,
updated_correlation.user_id, correlation.id
))));
}

let index = read_access
.iter()
.enumerate()
.find(|(_, c)| c.id == correlation_id)
.to_owned();

if let Some((index, _)) = index {
// drop the read access in order to get exclusive write access
drop(read_access);
self.0.write().await.remove(index);
trace!("removed correlation from memory");
} else {
warn!("Correlation ID- {correlation_id} not found in memory!");
correlation.validate(session_key).await?;
updated_correlation.update(correlation);

// Update in storage
let correlation_bytes = serde_json::to_vec(&updated_correlation)?.into();
let path = updated_correlation.path();
CONFIG
.storage()
.get_object_store()
.put_object(&path, correlation_bytes)
.await?;

// Update in memory
self.write().await.insert(
updated_correlation.id.to_owned(),
updated_correlation.clone(),
);

Ok(updated_correlation)
}

/// Delete correlation from memory and storage
pub async fn delete(
&self,
correlation_id: &str,
user_id: &str,
) -> Result<(), CorrelationError> {
let correlation = CORRELATIONS.get_correlation(correlation_id).await?;
if correlation.user_id != user_id {
return Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
r#"User "{user_id}" isn't authorized to delete correlation with ID - {correlation_id}"#
))));
}

// Delete from memory
self.write().await.remove(&correlation.id);

// Delete from storage
let path = correlation.path();
CONFIG
.storage()
.get_object_store()
.delete_object(&path)
.await?;

Ok(())
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum CorrelationVersion {
#[default]
V1,
}

type CorrelationId = String;
type UserId = String;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CorrelationConfig {
#[serde(default)]
pub version: CorrelationVersion,
pub title: String,
pub id: String,
pub user_id: String,
#[serde(default)]
pub id: CorrelationId,
#[serde(default)]
pub user_id: UserId,
pub table_configs: Vec<TableConfig>,
pub join_config: JoinConfig,
pub filter: Option<FilterQuery>,
pub start_time: Option<String>,
pub end_time: Option<String>,
}

impl CorrelationConfig {}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CorrelationRequest {
pub title: String,
pub table_configs: Vec<TableConfig>,
pub join_config: JoinConfig,
pub filter: Option<FilterQuery>,
pub start_time: Option<String>,
pub end_time: Option<String>,
}

impl From<CorrelationRequest> for CorrelationConfig {
fn from(val: CorrelationRequest) -> Self {
Self {
version: CorrelationVersion::V1,
title: val.title,
id: get_hash(Utc::now().timestamp_micros().to_string().as_str()),
user_id: String::default(),
table_configs: val.table_configs,
join_config: val.join_config,
filter: val.filter,
start_time: val.start_time,
end_time: val.end_time,
}
impl CorrelationConfig {
pub fn path(&self) -> RelativePathBuf {
RelativePathBuf::from_iter([
USERS_ROOT_DIR,
&self.user_id,
CORRELATION_DIR,
&format!("{}.json", self.id),
])
}
}

impl CorrelationRequest {
pub fn generate_correlation_config(self, id: String, user_id: String) -> CorrelationConfig {
CorrelationConfig {
version: CorrelationVersion::V1,
title: self.title,
id,
user_id,
table_configs: self.table_configs,
join_config: self.join_config,
filter: self.filter,
start_time: self.start_time,
end_time: self.end_time,
}
pub fn update(&mut self, update: Self) {
self.title = update.title;
self.table_configs = update.table_configs;
self.join_config = update.join_config;
self.filter = update.filter;
self.start_time = update.start_time;
self.end_time = update.end_time;
}

/// This function will validate the TableConfigs, JoinConfig, and user auth
Expand Down
Loading

0 comments on commit 12c507c

Please sign in to comment.