Skip to content

Commit

Permalink
feat: snapshot header changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Pratik Mishra authored and Pratik Mishra committed May 30, 2024
1 parent b59876e commit 779e975
Show file tree
Hide file tree
Showing 19 changed files with 166 additions and 138 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ leptos = { version = "0.5.2" }
leptos_meta = { version = "0.5.2" }
leptos_router = { version = "0.5.2" }
thiserror = { version = "1.0.57" }
regex = { version = "1.9.1" }
2 changes: 1 addition & 1 deletion crates/caclang/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ path = "src/bin.rs"
toml = "0.8.8"
clap = { version = "4.3.0", features = ["derive"] }
inquire = "0.6.2"
regex = "1.9.1"
regex = { workspace = true }
serde = "1.0.163"
blake3 = { workspace = true }
anyhow = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/context_aware_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ leptos_router = { workspace = true }
actix-files = { version = "0.6" }
anyhow = { workspace = true }
superposition_types = { path = "../superposition_types" }
regex = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ CREATE TABLE public.config_versions (
tags varchar(100)[] check (array_position(tags, null) is null),
created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL
);

CREATE INDEX IF NOT EXISTS config_verions_tags_index ON public.config_versions USING gin(tags);
CREATE INDEX IF NOT EXISTS config_versions_id_index ON public.config_versions(id);
--
57 changes: 25 additions & 32 deletions crates/context_aware_config/src/api/config/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,19 @@ use std::collections::HashSet;
use std::{collections::HashMap, str::FromStr};

use super::helpers::{
filter_config_by_dimensions, filter_config_by_prefix, filter_context
filter_config_by_dimensions, filter_config_by_prefix, filter_context,
};
use super::types::{Config, Context};
use crate::api::context::{
delete_context_api, hash, put, validate_dimensions_and_calculate_priority, PutReq,
};
use crate::api::dimension::get_all_dimension_schema_map;
use crate::{
db::schema::{
contexts::dsl as ctxt, default_configs::dsl as def_conf,
event_log::dsl as event_log, config_versions::dsl as config_versions
},
helpers::{json_to_sorted_string, generate_cac},
db::schema::{config_versions::dsl as config_versions, event_log::dsl as event_log},
helpers::generate_cac,
};
use actix_http::header::{HeaderName, HeaderValue};
use actix_web::web;
use actix_web::{
error::ErrorBadRequest, get, put, web::Query, HttpRequest, HttpResponse, Scope,
};
use actix_web::{get, put, web, web::Query, HttpRequest, HttpResponse, Scope};
use cac_client::{eval_cac, eval_cac_with_reasoning, MergeStrategy};
use chrono::{DateTime, NaiveDateTime, TimeZone, Timelike, Utc};
use diesel::{
Expand Down Expand Up @@ -138,23 +132,24 @@ pub fn generate_config_from_version(
version: Option<i64>,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
) -> superposition::Result<Config> {
let config = match version {
None => config_versions::config_versions
let config = if let Some(val) = version {
config_versions::config_versions
.select(config_versions::config)
.order_by(config_versions::created_at.desc())
.first::<Value>(conn)
.filter(config_versions::id.eq(val))
.get_result::<Value>(conn)
.map_err(|err| {
log::error!("failed to fetch config with error: {}", err);
db_error!(err)
}),
Some(val) => config_versions::config_versions
})
} else {
config_versions::config_versions
.select(config_versions::config)
.filter(config_versions::id.eq(val))
.get_result::<Value>(conn)
.order_by(config_versions::created_at.desc())
.first::<Value>(conn)
.map_err(|err| {
log::error!("failed to fetch config with error: {}", err);
db_error!(err)
}),
})
}?;

serde_json::from_value::<Config>(config).map_err(|err| {
Expand All @@ -169,17 +164,15 @@ fn generate_subsets(map: &Map<String, Value>) -> Vec<Map<String, Value>> {
let all_subsets_keys = generate_subsets_keys(keys);

for subset_keys in &all_subsets_keys {
if subset_keys.len() >= 0 {
let mut subset_map = Map::new();
let mut subset_map = Map::new();

for key in subset_keys {
if let Some(value) = map.get(key) {
subset_map.insert(key.to_string(), value.clone());
}
for key in subset_keys {
if let Some(value) = map.get(key) {
subset_map.insert(key.to_string(), value.clone());
}

subsets.push(subset_map);
}

subsets.push(subset_map);
}

subsets
Expand Down Expand Up @@ -406,12 +399,12 @@ async fn reduce_config_key(
) => {
if *to_be_deleted {
if is_approve {
let _ = delete_context_api(cid.clone(), user.clone(), conn).await;
let _ = delete_context_api(cid.clone(), user.clone(), conn);
}
og_contexts.retain(|x| x.id != *cid);
} else {
if is_approve {
let _ = delete_context_api(cid.clone(), user.clone(), conn).await;
let _ = delete_context_api(cid.clone(), user.clone(), conn);
let put_req = construct_new_payload(request_payload);
let _ = put(put_req, conn, false, &user);
}
Expand Down Expand Up @@ -464,9 +457,9 @@ async fn reduce_config(
.unwrap_or(false);

let dimensions_schema_map = get_all_dimension_schema_map(&mut conn)?;
let mut config = generate_cac(&mut conn).await?;
let mut config = generate_cac(&mut conn)?;
let default_config = (config.default_configs).clone();
for (key, val) in default_config {
for (key, _val) in default_config {
let contexts = config.contexts;
let overrides = config.overrides;
let default_config = config.default_configs;
Expand All @@ -482,7 +475,7 @@ async fn reduce_config(
)
.await?;
if is_approve {
config = generate_cac(&mut conn).await?;
config = generate_cac(&mut conn)?;
}
}

Expand Down
46 changes: 28 additions & 18 deletions crates/context_aware_config/src/api/context/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use crate::helpers::{
use crate::{
api::{
context::types::{
BulkOperationReq, ContextAction, ContextBulkResponse, DeleteReq,
DimensionCondition, MoveReq, PaginationParams, PriorityRecomputeResponse,
PutReq, PutResp,
ContextAction, ContextBulkResponse, DimensionCondition, MoveReq,
PaginationParams, PriorityRecomputeResponse, PutReq, PutResp,
},
dimension::get_all_dimension_schema_map,
},
Expand All @@ -23,7 +22,7 @@ use crate::{
},
};
use actix_web::web::Data;
use service_utils::service::types::AppState;
use service_utils::service::types::{AppState, CustomHeaders};

use actix_web::{
delete, get, put,
Expand All @@ -40,7 +39,7 @@ use diesel::{
};
use jsonschema::{Draft, JSONSchema, ValidationError};
use serde_json::{from_value, json, Map, Value};
use service_utils::helpers::validation_err_to_str;
use service_utils::helpers::{validate_config_tags, validation_err_to_str};
use service_utils::service::types::DbConnection;
use service_utils::{db_error, not_found, unexpected_error, validation_error};
use std::collections::HashMap;
Expand Down Expand Up @@ -309,11 +308,12 @@ pub fn put(
#[put("")]
async fn put_handler(
state: Data<AppState>,
custom_headers: CustomHeaders,
req: Json<PutReq>,
mut db_conn: DbConnection,
user: User,
) -> superposition::Result<Json<PutResp>> {
let tags = req.version_tags.to_owned();
let tags = validate_config_tags(custom_headers.config_tags)?;
db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| {
let put_response = put(req, transaction_conn, true, &user)
.map(|resp| Json(resp))
Expand All @@ -328,12 +328,13 @@ async fn put_handler(

fn override_helper(
state: &Data<AppState>,
config_tags: Option<String>,
req: Json<PutReq>,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
user: &User,
) -> superposition::Result<Json<PutResp>> {
use contexts::dsl::contexts;
let tags = req.version_tags.to_owned();
let tags = validate_config_tags(config_tags)?;
let new_ctx = create_ctx_from_put_req(req, conn, user)?;
(*conn).transaction::<_, superposition::AppError, _>(|transaction_conn| {
let insert = diesel::insert_into(contexts)
Expand All @@ -356,11 +357,12 @@ fn override_helper(
#[put("/overrides")]
async fn update_override_handler(
state: Data<AppState>,
custom_headers: CustomHeaders,
req: Json<PutReq>,
mut db_conn: DbConnection,
user: User,
) -> superposition::Result<Json<PutResp>> {
override_helper(&state, req, &mut db_conn, &user).map_err(
override_helper(&state, custom_headers.config_tags, req, &mut db_conn, &user).map_err(
|err: superposition::AppError| {
log::info!("context put failed with error: {:?}", err);
err
Expand Down Expand Up @@ -452,11 +454,12 @@ fn r#move(
async fn move_handler(
state: Data<AppState>,
path: Path<String>,
custom_headers: CustomHeaders,
req: Json<MoveReq>,
mut db_conn: DbConnection,
user: User,
) -> superposition::Result<Json<PutResp>> {
let tags = req.version_tags.to_owned();
let tags = validate_config_tags(custom_headers.config_tags)?;
db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| {
let move_reponse = r#move(path.into_inner(), req, transaction_conn, true, &user)
.map(|resp| Json(resp))
Expand Down Expand Up @@ -530,39 +533,46 @@ pub fn delete_context_api(
Ok(_) => {
log::info!("{ctx_id} context deleted by {}", user.get_email());
Ok(HttpResponse::NoContent().finish())
},
}
Err(e) => {
log::error!("context delete query failed with error: {e}");
Err(unexpected_error!("Something went wrong."))
}
}
}
}

#[delete("/{ctx_id}")]
async fn delete_context(
state: Data<AppState>,
path: Path<String>,
custom_headers: CustomHeaders,
user: User,
mut db_conn: DbConnection,
) -> superposition::Result<HttpResponse> {
let ctx_id = path.into_inner();
delete_context_api(ctx_id, user, &mut db_conn)
let tags = validate_config_tags(custom_headers.config_tags)?;
db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| {
let delete_resp = delete_context_api(ctx_id, user, transaction_conn)?;
add_config_version(&state, tags, transaction_conn)?;
Ok(delete_resp)
})
}

#[put("/bulk-operations")]
async fn bulk_operations(
state: Data<AppState>,
req: Json<BulkOperationReq>,
custom_headers: CustomHeaders,
reqs: Json<Vec<ContextAction>>,
db_conn: DbConnection,
user: User,
) -> superposition::Result<Json<Vec<ContextBulkResponse>>> {
use contexts::dsl::contexts;
let DbConnection(mut conn) = db_conn;
let tags = req.version_tags.to_owned();
let context_actions = req.into_inner().context_actions.clone();
let tags = validate_config_tags(custom_headers.config_tags)?;

let mut response = Vec::<ContextBulkResponse>::new();
conn.transaction::<_, superposition::AppError, _>(|transaction_conn| {
for action in context_actions.into_iter() {
for action in reqs.into_inner().into_iter() {
match action {
ContextAction::PUT(put_req) => {
let put_resp = put(Json(put_req), transaction_conn, true, &user)
Expand Down Expand Up @@ -622,7 +632,7 @@ async fn bulk_operations(
#[put("/priority/recompute")]
async fn priority_recompute(
state: Data<AppState>,
req: Json<DeleteReq>,
custom_headers: CustomHeaders,
db_conn: DbConnection,
_user: User,
) -> superposition::Result<Json<Vec<PriorityRecomputeResponse>>> {
Expand All @@ -636,7 +646,7 @@ async fn priority_recompute(

let dimension_schema_map = get_all_dimension_schema_map(&mut conn)?;
let mut response: Vec<PriorityRecomputeResponse> = vec![];
let tags = req.version_tags.to_owned();
let tags = validate_config_tags(custom_headers.config_tags)?;

let update_contexts = result
.clone()
Expand Down
13 changes: 0 additions & 13 deletions crates/context_aware_config/src/api/context/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,11 @@ use serde_json::{Map, Value};
pub struct PutReq {
pub context: Map<String, Value>,
pub r#override: Map<String, Value>,
pub version_tags: Option<Vec<String>>,
}

#[derive(Deserialize, Clone)]
pub struct MoveReq {
pub context: Map<String, Value>,
pub version_tags: Option<Vec<String>>,
}

#[derive(Deserialize, Clone)]
pub struct DeleteReq {
pub version_tags: Option<Vec<String>>,
}

#[derive(Deserialize, Clone)]
Expand All @@ -37,12 +30,6 @@ pub struct PaginationParams {
pub size: Option<u32>,
}

#[derive(Deserialize, Clone)]
pub struct BulkOperationReq {
pub context_actions: Vec<ContextAction>,
pub version_tags: Option<Vec<String>>,
}

#[derive(serde::Deserialize, Clone)]
pub enum ContextAction {
PUT(PutReq),
Expand Down
Loading

0 comments on commit 779e975

Please sign in to comment.