From 2e1f75588beb4afa5f8a8baac461a99f2ce1e3a0 Mon Sep 17 00:00:00 2001 From: Pratik Mishra Date: Mon, 3 Jun 2024 14:10:46 +0530 Subject: [PATCH] feat: snapshot changes - apis --- Cargo.lock | 2 + crates/context_aware_config/Cargo.toml | 2 +- .../2024-04-22-122806_config_verions/down.sql | 1 + .../2024-04-22-122806_config_verions/up.sql | 14 ++ .../src/api/config/handlers.rs | 116 ++++++++--------- .../src/api/config/mod.rs | 2 +- .../src/api/config/types.rs | 7 +- .../src/api/context/handlers.rs | 123 +++++++++++++----- .../src/api/context/types.rs | 2 +- .../src/api/default_config/handlers.rs | 93 +++++++------ crates/context_aware_config/src/db/models.rs | 15 ++- .../context_aware_config/src/db/schema.patch | 19 +++ crates/context_aware_config/src/db/schema.rs | 11 ++ crates/context_aware_config/src/helpers.rs | 104 ++++++++++++++- .../context_aware_config/tests/cac_tests.rs | 42 ------ crates/experimentation_platform/Cargo.toml | 1 + .../src/api/experiments/handlers.rs | 42 +++++- .../src/api/experiments/types.rs | 3 +- .../experimentation_platform/src/db/models.rs | 13 +- crates/service_utils/Cargo.toml | 1 + crates/service_utils/src/helpers.rs | 47 ++++++- crates/service_utils/src/service/types.rs | 24 +++- crates/superposition/src/main.rs | 5 +- postman/cac/.meta.json | 2 +- postman/cac/config/Get Config/event.test.js | 2 +- 25 files changed, 489 insertions(+), 204 deletions(-) create mode 100644 crates/context_aware_config/migrations/2024-04-22-122806_config_verions/down.sql create mode 100644 crates/context_aware_config/migrations/2024-04-22-122806_config_verions/up.sql create mode 100644 crates/context_aware_config/src/db/schema.patch delete mode 100644 crates/context_aware_config/tests/cac_tests.rs diff --git a/Cargo.lock b/Cargo.lock index e52fc1c4..d50dd18e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1510,6 +1510,7 @@ name = "experimentation_platform" version = "0.15.1" dependencies = [ "actix", + "actix-http", "actix-web", "anyhow", "chrono", @@ -3922,6 +3923,7 @@ dependencies = [ "jsonschema", "log", "once_cell", + "regex", "reqwest", "rs-snowflake", "rusoto_core", diff --git a/crates/context_aware_config/Cargo.toml b/crates/context_aware_config/Cargo.toml index abd09d5e..544288e6 100644 --- a/crates/context_aware_config/Cargo.toml +++ b/crates/context_aware_config/Cargo.toml @@ -59,5 +59,5 @@ leptos_meta = { workspace = true } leptos_router = { workspace = true } actix-files = { version = "0.6" } anyhow = { workspace = true } -regex = { workspace = true } superposition_types = { path = "../superposition_types" } +regex = { workspace = true } diff --git a/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/down.sql b/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/down.sql new file mode 100644 index 00000000..d9a93fe9 --- /dev/null +++ b/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/down.sql @@ -0,0 +1 @@ +-- This file should undo anything in `up.sql` diff --git a/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/up.sql b/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/up.sql new file mode 100644 index 00000000..0800b5b8 --- /dev/null +++ b/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/up.sql @@ -0,0 +1,14 @@ +-- Your SQL goes here +-- Name: functions; Type: TABLE; Schema: public; Owner: - +-- +CREATE TABLE public.config_versions ( + id bigint PRIMARY KEY, + config json NOT NULL, + config_hash TEXT NOT NULL, + tags varchar(100)[] check (array_position(tags, null) is null), + created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL +); + +CREATE INDEX IF NOT EXISTS config_verions_tags_index ON public.config_versions USING gin(tags); +CREATE INDEX IF NOT EXISTS config_versions_id_index ON public.config_versions(id); +-- \ No newline at end of file diff --git a/crates/context_aware_config/src/api/config/handlers.rs b/crates/context_aware_config/src/api/config/handlers.rs index d63dba06..037ec90e 100644 --- a/crates/context_aware_config/src/api/config/handlers.rs +++ b/crates/context_aware_config/src/api/config/handlers.rs @@ -9,12 +9,12 @@ use crate::api::context::{ delete_context_api, hash, put, validate_dimensions_and_calculate_priority, PutReq, }; use crate::api::dimension::get_all_dimension_schema_map; -use crate::db::schema::{ - contexts::dsl as ctxt, default_configs::dsl as def_conf, event_log::dsl as event_log, +use crate::{ + db::schema::{config_versions::dsl as config_versions, event_log::dsl as event_log}, + helpers::generate_cac, }; use actix_http::header::{HeaderName, HeaderValue}; -use actix_web::web; -use actix_web::{get, put, web::Query, HttpRequest, HttpResponse, Scope}; +use actix_web::{get, put, web, web::Query, HttpRequest, HttpResponse, Scope}; use cac_client::{eval_cac, eval_cac_with_reasoning, MergeStrategy}; use chrono::{DateTime, NaiveDateTime, TimeZone, Timelike, Utc}; use diesel::{ @@ -23,15 +23,18 @@ use diesel::{ ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, }; use serde_json::{json, Map, Value}; -use service_utils::service::types::DbConnection; -use service_utils::{bad_argument, db_error, unexpected_error}; +use service_utils::{ + bad_argument, db_error, result as superposition, service::types::DbConnection, + unexpected_error, +}; use itertools::Itertools; use jsonschema::JSONSchema; use service_utils::helpers::extract_dimensions; -use service_utils::result::{self as superposition, AppError}; +use service_utils::result::AppError; use superposition_types::User; use uuid::Uuid; + pub fn endpoints() -> Scope { Scope::new("") .service(get) @@ -40,6 +43,25 @@ pub fn endpoints() -> Scope { .service(get_filtered_config) } +fn validate_version_in_params( + query_params_map: &mut Map, +) -> superposition::Result> { + query_params_map + .remove("version") + .map_or(Ok(None), |version| { + version + .as_str() + .map_or(None, |val| val.to_owned().parse::().ok()) + .map_or_else( + || { + log::error!("failed to decode version as integer: {}", version); + Err(bad_argument!("version is not of type integer")) + }, + |v| Ok(Some(v)), + ) + }) +} + pub fn add_audit_header( conn: &mut PooledConnection>, mut res: HttpResponse, @@ -106,53 +128,26 @@ fn is_not_modified(max_created_at: Option, req: &HttpRequest) -> max_created_at.is_some() && parsed_max <= last_modified } -async fn generate_cac( +pub fn generate_config_from_version( + version: Option, conn: &mut PooledConnection>, ) -> superposition::Result { - let contexts_vec = ctxt::contexts - .select((ctxt::id, ctxt::value, ctxt::override_id, ctxt::override_)) - .order_by((ctxt::priority.asc(), ctxt::created_at.asc())) - .load::<(String, Value, String, Value)>(conn) - .map_err(|err| { - log::error!("failed to fetch contexts with error: {}", err); - db_error!(err) - })?; - - let (contexts, overrides) = contexts_vec.into_iter().fold( - (Vec::new(), Map::new()), - |(mut ctxts, mut overrides), (id, condition, override_id, override_)| { - let ctxt = super::types::Context { - id, - condition, - override_with_keys: [override_id.to_owned()], - }; - ctxts.push(ctxt); - overrides.insert(override_id, override_); - (ctxts, overrides) - }, - ); - - let default_config_vec = def_conf::default_configs - .select((def_conf::key, def_conf::value)) - .load::<(String, Value)>(conn) - .map_err(|err| { - log::error!("failed to fetch default_configs with error: {}", err); - db_error!(err) - })?; - - let default_configs = - default_config_vec - .into_iter() - .fold(Map::new(), |mut acc, item| { - acc.insert(item.0, item.1); - acc - }); - - Ok(Config { - contexts, - overrides, - default_configs, - }) + if let Some(val) = version { + let config = config_versions::config_versions + .select(config_versions::config) + .filter(config_versions::id.eq(val)) + .get_result::(conn) + .map_err(|err| { + log::error!("failed to fetch config with error: {}", err); + db_error!(err) + })?; + serde_json::from_value::(config).map_err(|err| { + log::error!("failed to decode config: {}", err); + unexpected_error!("failed to decode config") + }) + } else { + generate_cac(conn) + } } fn generate_subsets(map: &Map) -> Vec> { @@ -396,12 +391,12 @@ async fn reduce_config_key( ) => { if *to_be_deleted { if is_approve { - let _ = delete_context_api(cid.clone(), user.clone(), conn).await; + let _ = delete_context_api(cid.clone(), user.clone(), conn); } og_contexts.retain(|x| x.id != *cid); } else { if is_approve { - let _ = delete_context_api(cid.clone(), user.clone(), conn).await; + let _ = delete_context_api(cid.clone(), user.clone(), conn); let put_req = construct_new_payload(request_payload); let _ = put(put_req, conn, false, &user); } @@ -454,7 +449,7 @@ async fn reduce_config( .unwrap_or(false); let dimensions_schema_map = get_all_dimension_schema_map(&mut conn)?; - let mut config = generate_cac(&mut conn).await?; + let mut config = generate_cac(&mut conn)?; let default_config = (config.default_configs).clone(); for (key, _) in default_config { let contexts = config.contexts; @@ -472,7 +467,7 @@ async fn reduce_config( ) .await?; if is_approve { - config = generate_cac(&mut conn).await?; + config = generate_cac(&mut conn)?; } } @@ -514,7 +509,8 @@ async fn get( ); } - let mut config = generate_cac(&mut conn).await?; + let config_version = validate_version_in_params(&mut query_params_map)?; + let mut config = generate_config_from_version(config_version, &mut conn)?; if let Some(prefix) = query_params_map.get("prefix") { let prefix_list: HashSet<&str> = prefix .as_str() @@ -572,7 +568,8 @@ async fn get_resolved_config( return Ok(HttpResponse::NotModified().finish()); } - let res = generate_cac(&mut conn).await?; + let config_version = validate_version_in_params(&mut query_params_map)?; + let res = generate_config_from_version(config_version, &mut conn)?; let cac_client_contexts = res .contexts @@ -645,7 +642,8 @@ async fn get_filtered_config( .map_or_else(|_| json!(value), |int_val| json!(int_val)), ); } - let config = generate_cac(&mut conn).await?; + let config_version = validate_version_in_params(&mut query_params_map)?; + let config = generate_config_from_version(config_version, &mut conn)?; let contexts = config.contexts; let filtered_context = filter_context(&contexts, &query_params_map)?; diff --git a/crates/context_aware_config/src/api/config/mod.rs b/crates/context_aware_config/src/api/config/mod.rs index d4417bb8..c7fb9def 100644 --- a/crates/context_aware_config/src/api/config/mod.rs +++ b/crates/context_aware_config/src/api/config/mod.rs @@ -1,4 +1,4 @@ mod handlers; -mod types; +pub mod types; pub use handlers::endpoints; mod helpers; diff --git a/crates/context_aware_config/src/api/config/types.rs b/crates/context_aware_config/src/api/config/types.rs index 332bfc03..4ae24fb8 100644 --- a/crates/context_aware_config/src/api/config/types.rs +++ b/crates/context_aware_config/src/api/config/types.rs @@ -1,16 +1,17 @@ -use serde::Serialize; +use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; -#[derive(Serialize)] +#[derive(Serialize, Deserialize)] pub struct Config { pub contexts: Vec, pub overrides: Map, pub default_configs: Map, } -#[derive(Serialize, Clone)] +#[derive(Serialize, Clone, Deserialize)] pub struct Context { pub id: String, pub condition: Value, + pub priority: i32, pub override_with_keys: [String; 1], } diff --git a/crates/context_aware_config/src/api/context/handlers.rs b/crates/context_aware_config/src/api/context/handlers.rs index 2700b56d..56649d5a 100644 --- a/crates/context_aware_config/src/api/context/handlers.rs +++ b/crates/context_aware_config/src/api/context/handlers.rs @@ -2,7 +2,8 @@ extern crate base64; use std::str; use crate::helpers::{ - calculate_context_priority, json_to_sorted_string, validate_context_jsonschema, + add_config_version, calculate_context_priority, json_to_sorted_string, + validate_context_jsonschema, }; use crate::{ api::{ @@ -20,6 +21,9 @@ use crate::{ }, }, }; +use actix_web::web::Data; +use service_utils::service::types::{AppState, CustomHeaders}; + use actix_web::{ delete, get, put, web::{Json, Path, Query}, @@ -35,7 +39,7 @@ use diesel::{ }; use jsonschema::{Draft, JSONSchema, ValidationError}; use serde_json::{from_value, json, Map, Value}; -use service_utils::helpers::validation_err_to_str; +use service_utils::helpers::{parse_config_tags, validation_err_to_str}; use service_utils::service::types::DbConnection; use service_utils::{db_error, not_found, unexpected_error, validation_error}; use std::collections::HashMap; @@ -303,31 +307,44 @@ pub fn put( #[put("")] async fn put_handler( + state: Data, + custom_headers: CustomHeaders, req: Json, mut db_conn: DbConnection, user: User, ) -> superposition::Result> { - put(req, &mut db_conn, false, &user) - .map(|resp| Json(resp)) - .map_err(|err: superposition::AppError| { - log::info!("context put failed with error: {:?}", err); - err - }) + let tags = parse_config_tags(custom_headers.config_tags)?; + db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let put_response = put(req, transaction_conn, true, &user) + .map(|resp| Json(resp)) + .map_err(|err: superposition::AppError| { + log::info!("context put failed with error: {:?}", err); + err + })?; + add_config_version(&state, tags, transaction_conn)?; + Ok(put_response) + }) } fn override_helper( req: Json, conn: &mut PooledConnection>, + already_under_txn: bool, user: &User, ) -> superposition::Result> { use contexts::dsl::contexts; let new_ctx = create_ctx_from_put_req(req, conn, user)?; + if already_under_txn { + diesel::sql_query("SAVEPOINT insert_ctx_savepoint").execute(conn)?; + } let insert = diesel::insert_into(contexts).values(&new_ctx).execute(conn); - match insert { Ok(_) => Ok(Json(get_put_resp(new_ctx))), Err(DatabaseError(UniqueViolation, _)) => { + if already_under_txn { + diesel::sql_query("ROLLBACK TO insert_ctx_savepoint").execute(conn)?; + } replace_override_of_existing_ctx(conn, new_ctx) // no need for .map(Json) } Err(e) => { @@ -339,13 +356,22 @@ fn override_helper( #[put("/overrides")] async fn update_override_handler( + state: Data, + custom_headers: CustomHeaders, req: Json, mut db_conn: DbConnection, user: User, ) -> superposition::Result> { - override_helper(req, &mut db_conn, &user).map_err(|err: superposition::AppError| { - log::info!("context put failed with error: {:?}", err); - err + let tags = parse_config_tags(custom_headers.config_tags)?; + db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let override_resp = override_helper(req, transaction_conn, true, &user).map_err( + |err: superposition::AppError| { + log::info!("context put failed with error: {:?}", err); + err + }, + )?; + add_config_version(&state, tags, transaction_conn)?; + Ok(override_resp) }) } @@ -431,17 +457,24 @@ fn r#move( #[put("/move/{ctx_id}")] async fn move_handler( + state: Data, path: Path, + custom_headers: CustomHeaders, req: Json, mut db_conn: DbConnection, user: User, ) -> superposition::Result> { - r#move(path.into_inner(), req, &mut db_conn, false, &user) - .map(|resp| Json(resp)) - .map_err(|err| { - log::info!("move api failed with error: {:?}", err); - err - }) + let tags = parse_config_tags(custom_headers.config_tags)?; + db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let move_reponse = r#move(path.into_inner(), req, transaction_conn, true, &user) + .map(|resp| Json(resp)) + .map_err(|err| { + log::info!("move api failed with error: {:?}", err); + err + })?; + add_config_version(&state, tags, transaction_conn)?; + Ok(move_reponse) + }) } #[get("/{ctx_id}")] @@ -493,7 +526,7 @@ async fn list_contexts( Ok(Json(result)) } -pub async fn delete_context_api( +pub fn delete_context_api( ctx_id: String, user: User, conn: &mut PooledConnection>, @@ -515,22 +548,32 @@ pub async fn delete_context_api( #[delete("/{ctx_id}")] async fn delete_context( + state: Data, path: Path, + custom_headers: CustomHeaders, user: User, mut db_conn: DbConnection, ) -> superposition::Result { let ctx_id = path.into_inner(); - delete_context_api(ctx_id, user, &mut db_conn).await + let tags = parse_config_tags(custom_headers.config_tags)?; + db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let delete_resp = delete_context_api(ctx_id, user, transaction_conn)?; + add_config_version(&state, tags, transaction_conn)?; + Ok(delete_resp) + }) } #[put("/bulk-operations")] async fn bulk_operations( + state: Data, + custom_headers: CustomHeaders, reqs: Json>, db_conn: DbConnection, user: User, ) -> superposition::Result>> { use contexts::dsl::contexts; let DbConnection(mut conn) = db_conn; + let tags = parse_config_tags(custom_headers.config_tags)?; let mut response = Vec::::new(); conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { @@ -585,6 +628,7 @@ async fn bulk_operations( } } } + add_config_version(&state, tags, transaction_conn)?; Ok(()) // Commit the transaction })?; Ok(Json(response)) @@ -592,8 +636,11 @@ async fn bulk_operations( #[put("/priority/recompute")] async fn priority_recompute( + state: Data, + custom_headers: CustomHeaders, db_conn: DbConnection, -) -> superposition::Result { + _user: User, +) -> superposition::Result>> { use crate::db::schema::contexts::dsl::*; let DbConnection(mut conn) = db_conn; @@ -604,6 +651,7 @@ async fn priority_recompute( let dimension_schema_map = get_all_dimension_schema_map(&mut conn)?; let mut response: Vec = vec![]; + let tags = parse_config_tags(custom_headers.config_tags)?; let update_contexts = result .clone() @@ -637,20 +685,23 @@ async fn priority_recompute( }) .collect::>>()?; - let insert = diesel::insert_into(contexts) - .values(&update_contexts) - .on_conflict(id) - .do_update() - .set(priority.eq(excluded(priority))) - .execute(&mut conn); - - match insert { - Ok(_) => Ok(HttpResponse::Ok().json(response)), - Err(err) => { - log::error!( - "Failed to execute query while recomputing priority, error: {err}" - ); - Err(db_error!(err)) + conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let insert = diesel::insert_into(contexts) + .values(&update_contexts) + .on_conflict(id) + .do_update() + .set(priority.eq(excluded(priority))) + .execute(transaction_conn); + add_config_version(&state, tags, transaction_conn)?; + + match insert { + Ok(_) => Ok(Json(response)), + Err(err) => { + log::error!( + "Failed to execute query while recomputing priority, error: {err}" + ); + Err(db_error!(err)) + } } - } + }) } diff --git a/crates/context_aware_config/src/api/context/types.rs b/crates/context_aware_config/src/api/context/types.rs index 88f616e0..6db85ac8 100644 --- a/crates/context_aware_config/src/api/context/types.rs +++ b/crates/context_aware_config/src/api/context/types.rs @@ -30,7 +30,7 @@ pub struct PaginationParams { pub size: Option, } -#[derive(serde::Deserialize)] +#[derive(serde::Deserialize, Clone)] pub enum ContextAction { PUT(PutReq), DELETE(String), diff --git a/crates/context_aware_config/src/api/default_config/handlers.rs b/crates/context_aware_config/src/api/default_config/handlers.rs index 58a39766..931f0e99 100644 --- a/crates/context_aware_config/src/api/default_config/handlers.rs +++ b/crates/context_aware_config/src/api/default_config/handlers.rs @@ -1,8 +1,11 @@ extern crate base64; use super::types::CreateReq; -use service_utils::helpers::validation_err_to_str; use service_utils::{ - bad_argument, db_error, not_found, unexpected_error, validation_error, + bad_argument, db_error, + helpers::{parse_config_tags, validation_err_to_str}, + not_found, result as superposition, + service::types::{AppState, CustomHeaders, DbConnection}, + unexpected_error, validation_error, }; use superposition_types::{SuperpositionUser, User}; @@ -15,7 +18,7 @@ use crate::{ models::{Context, DefaultConfig}, schema::{contexts::dsl::contexts, default_configs::dsl::default_configs}, }, - helpers::validate_jsonschema, + helpers::{add_config_version, validate_jsonschema}, }; use actix_web::{ delete, get, put, @@ -23,6 +26,7 @@ use actix_web::{ HttpResponse, Scope, }; use chrono::Utc; +use diesel::Connection; use diesel::{ r2d2::{ConnectionManager, PooledConnection}, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, @@ -30,10 +34,6 @@ use diesel::{ use jsonschema::{Draft, JSONSchema, ValidationError}; use regex::Regex; use serde_json::{from_value, json, Map, Value}; -use service_utils::{ - result as superposition, - service::types::{AppState, DbConnection}, -}; const KEY_NAME_REGEX: &str = "^[a-zA-Z0-9-_]([a-zA-Z0-9-_.]{0,62}[a-zA-Z0-9-_])?$"; @@ -45,6 +45,7 @@ pub fn endpoints() -> Scope { async fn create( state: Data, key: web::Path, + custom_headers: CustomHeaders, request: web::Json, db_conn: DbConnection, user: User, @@ -52,6 +53,7 @@ async fn create( let DbConnection(mut conn) = db_conn; let req = request.into_inner(); let key = key.into_inner(); + let tags = parse_config_tags(custom_headers.config_tags)?; let regex = Regex::new(KEY_NAME_REGEX).map_err(|err| { unexpected_error!("could not parse regex due to: {}", err.to_string()) @@ -164,25 +166,27 @@ async fn create( )?; } } - - let upsert = diesel::insert_into(default_configs) - .values(&default_config) - .on_conflict(db::schema::default_configs::key) - .do_update() - .set(&default_config) - .execute(&mut conn); - - match upsert { - Ok(_) => Ok(HttpResponse::Ok().json(json!({ - "message": "DefaultConfig created/updated successfully." - }))), - Err(e) => { - log::info!("DefaultConfig creation failed with error: {e}"); - Err(unexpected_error!( - "Something went wrong, failed to create DefaultConfig" - )) - } - } + conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let upsert = diesel::insert_into(default_configs) + .values(&default_config) + .on_conflict(db::schema::default_configs::key) + .do_update() + .set(&default_config) + .execute(transaction_conn); + add_config_version(&state, tags, transaction_conn)?; + let ok_resp = match upsert { + Ok(_) => Ok(HttpResponse::Ok().json(json!({ + "message": "DefaultConfig created/updated successfully." + }))), + Err(e) => { + log::info!("DefaultConfig creation failed with error: {e}"); + Err(unexpected_error!( + "Something went wrong, failed to create DefaultConfig" + )) + } + }?; + Ok(ok_resp) + }) } fn fetch_default_key( @@ -232,32 +236,41 @@ pub fn get_key_usage_context_ids( #[delete("/{key}")] async fn delete( + state: Data, path: Path, + custom_headers: CustomHeaders, db_conn: DbConnection, user: User, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; + let tags = parse_config_tags(custom_headers.config_tags)?; let key = path.into_inner(); fetch_default_key(&key, &mut conn)?; let context_ids = get_key_usage_context_ids(&key, &mut conn) .map_err(|_| unexpected_error!("Something went wrong"))?; if context_ids.is_empty() { - let deleted_row = diesel::delete( - default_configs.filter(db::schema::default_configs::key.eq(&key)), - ) - .execute(&mut conn); - match deleted_row { - Ok(0) => Err(not_found!("default config key `{}` doesn't exists", key)), - Ok(_) => { - log::info!("default config key: {key} deleted by {}", user.get_email()); - Ok(HttpResponse::NoContent().finish()) - } - Err(e) => { - log::error!("default config delete query failed with error: {e}"); - Err(unexpected_error!("Something went wrong.")) + conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let deleted_row = diesel::delete( + default_configs.filter(db::schema::default_configs::key.eq(&key)), + ) + .execute(transaction_conn); + match deleted_row { + Ok(0) => Err(not_found!("default config key `{}` doesn't exists", key)), + Ok(_) => { + add_config_version(&state, tags, transaction_conn)?; + log::info!( + "default config key: {key} deleted by {}", + user.get_email() + ); + Ok(HttpResponse::NoContent().finish()) + } + Err(e) => { + log::error!("default config delete query failed with error: {e}"); + Err(unexpected_error!("Something went wrong.")) + } } - } + }) } else { Err(bad_argument!( "Given key already in use in contexts: {}", diff --git a/crates/context_aware_config/src/db/models.rs b/crates/context_aware_config/src/db/models.rs index 86e27552..42c3964b 100644 --- a/crates/context_aware_config/src/db/models.rs +++ b/crates/context_aware_config/src/db/models.rs @@ -1,4 +1,6 @@ -use crate::db::schema::{contexts, default_configs, dimensions, event_log, functions}; +use crate::db::schema::{ + config_versions, contexts, default_configs, dimensions, event_log, functions, +}; use chrono::{offset::Utc, DateTime, NaiveDateTime}; use diesel::{AsChangeset, Insertable, Queryable, Selectable}; use serde::Serialize; @@ -74,3 +76,14 @@ pub struct EventLog { pub new_data: Option, pub query: String, } + +#[derive(Queryable, Selectable, Insertable, Serialize, Clone, Debug)] +#[diesel(check_for_backend(diesel::pg::Pg))] +#[diesel(primary_key(id))] +pub struct ConfigVersion { + pub id: i64, + pub config: Value, + pub config_hash: String, + pub tags: Option>, + pub created_at: NaiveDateTime, +} diff --git a/crates/context_aware_config/src/db/schema.patch b/crates/context_aware_config/src/db/schema.patch new file mode 100644 index 00000000..acf43751 --- /dev/null +++ b/crates/context_aware_config/src/db/schema.patch @@ -0,0 +1,19 @@ +diff --git a/crates/context_aware_config/src/db/schema.rs b/crates/context_aware_config/src/db/schema.rs +index 15c2eee..25c8088 100644 +--- a/crates/context_aware_config/src/db/schema.rs ++++ b/crates/context_aware_config/src/db/schema.rs +@@ -2,13 +2,13 @@ + + diesel::table! { + config_versions (id) { + id -> Int8, + config -> Json, + config_hash -> Text, +- tags -> Nullable>>, ++ tags -> Nullable>, + created_at -> Timestamp, + } + } + + diesel::table! { + contexts (id) { diff --git a/crates/context_aware_config/src/db/schema.rs b/crates/context_aware_config/src/db/schema.rs index 99d135ac..25c8088c 100644 --- a/crates/context_aware_config/src/db/schema.rs +++ b/crates/context_aware_config/src/db/schema.rs @@ -1,5 +1,15 @@ // @generated automatically by Diesel CLI. +diesel::table! { + config_versions (id) { + id -> Int8, + config -> Json, + config_hash -> Text, + tags -> Nullable>, + created_at -> Timestamp, + } +} + diesel::table! { contexts (id) { id -> Varchar, @@ -602,6 +612,7 @@ diesel::joinable!(default_configs -> functions (function_name)); diesel::joinable!(dimensions -> functions (function_name)); diesel::allow_tables_to_appear_in_same_query!( + config_versions, contexts, default_configs, dimensions, diff --git a/crates/context_aware_config/src/helpers.rs b/crates/context_aware_config/src/helpers.rs index 663707d4..b37844a0 100644 --- a/crates/context_aware_config/src/helpers.rs +++ b/crates/context_aware_config/src/helpers.rs @@ -1,9 +1,29 @@ +use crate::{ + api::config::types::{Config, Context}, + db::{ + models::ConfigVersion, + schema::{ + config_versions, contexts::dsl as ctxt, default_configs::dsl as def_conf, + }, + }, +}; use actix_web::http::header::{HeaderMap, HeaderName, HeaderValue}; +use actix_web::web::Data; +use chrono::Utc; +use diesel::{ + r2d2::{ConnectionManager, PooledConnection}, + ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, +}; + use itertools::{self, Itertools}; use jsonschema::{Draft, JSONSchema, ValidationError}; -use serde_json::{json, Value}; +use serde_json::{json, Map, Value}; use service_utils::{ - helpers::validation_err_to_str, result as superposition, validation_error, + db_error, + helpers::{generate_snowflake_id, validation_err_to_str}, + result as superposition, + service::types::AppState, + validation_error, }; use std::collections::HashMap; @@ -287,6 +307,86 @@ pub fn calculate_context_priority( } } +pub fn generate_cac( + conn: &mut PooledConnection>, +) -> superposition::Result { + let contexts_vec = ctxt::contexts + .select(( + ctxt::id, + ctxt::value, + ctxt::priority, + ctxt::override_id, + ctxt::override_, + )) + .order_by((ctxt::priority.asc(), ctxt::created_at.asc())) + .load::<(String, Value, i32, String, Value)>(conn) + .map_err(|err| { + log::error!("failed to fetch contexts with error: {}", err); + db_error!(err) + })?; + + let (contexts, overrides) = contexts_vec.into_iter().fold( + (Vec::new(), Map::new()), + |(mut ctxts, mut overrides), + (id, condition, priority_, override_id, override_)| { + let ctxt = Context { + id, + condition, + priority: priority_, + override_with_keys: [override_id.to_owned()], + }; + ctxts.push(ctxt); + overrides.insert(override_id, override_); + (ctxts, overrides) + }, + ); + + let default_config_vec = def_conf::default_configs + .select((def_conf::key, def_conf::value)) + .load::<(String, Value)>(conn) + .map_err(|err| { + log::error!("failed to fetch default_configs with error: {}", err); + db_error!(err) + })?; + + let default_configs = + default_config_vec + .into_iter() + .fold(Map::new(), |mut acc, item| { + acc.insert(item.0, item.1); + acc + }); + + Ok(Config { + contexts, + overrides, + default_configs, + }) +} + +pub fn add_config_version( + state: &Data, + tags: Option>, + db_conn: &mut PooledConnection>, +) -> superposition::Result { + use config_versions::dsl::config_versions; + let version_id = generate_snowflake_id(state)?; + let config = generate_cac(db_conn)?; + let json_config = json!(config); + let config_hash = blake3::hash(json_config.to_string().as_bytes()).to_string(); + let config_version = ConfigVersion { + id: version_id, + config: json_config, + config_hash, + tags: tags, + created_at: Utc::now().naive_utc(), + }; + diesel::insert_into(config_versions) + .values(&config_version) + .execute(db_conn)?; + Ok(version_id) +} + // ************ Tests ************* #[cfg(test)] diff --git a/crates/context_aware_config/tests/cac_tests.rs b/crates/context_aware_config/tests/cac_tests.rs deleted file mode 100644 index cd812a1d..00000000 --- a/crates/context_aware_config/tests/cac_tests.rs +++ /dev/null @@ -1,42 +0,0 @@ -use context_aware_config::validation_functions::{compile_fn, execute_fn}; -use serde_json::json; -use service_utils::result as superposition; - -// #[test] //todo : currently there is issue in running this test -fn test_execute_fn() { - let code_ok = r#" - function validate() { - return true; - }; - "#; - - let execute_code_error = r#" - function validate() { - return false; - } - "#; - - let compile_code_error = r#" - function validate( { - return true; - } - "#; - - let err_execute = - match execute_fn(&(execute_code_error.to_owned()), "test", json!(10)) { - Ok(_) => false, - Err((e, _)) => e.contains("Bad schema"), - }; - let err_compile = match compile_fn(&(compile_code_error.to_owned())) { - Ok(()) => false, - Err(superposition::AppError::ValidationError(_)) => true, - _ => false, - }; - assert_eq!( - execute_fn(&(code_ok.to_owned()), "test", json!(10)), - Ok("true".to_string()) - ); - assert_eq!(err_execute, true); - assert_eq!(compile_fn(&(code_ok.to_owned())).unwrap(), ()); - assert_eq!(err_compile, true); -} diff --git a/crates/experimentation_platform/Cargo.toml b/crates/experimentation_platform/Cargo.toml index 36f92bb7..55965bd7 100644 --- a/crates/experimentation_platform/Cargo.toml +++ b/crates/experimentation_platform/Cargo.toml @@ -11,6 +11,7 @@ dotenv = { workspace = true } # Https server framework actix = { workspace = true } actix-web = { workspace = true } +actix-http = "3.3.1" # To help generate snowflake ids rs-snowflake = { workspace = true } # To help with generating uuids diff --git a/crates/experimentation_platform/src/api/experiments/handlers.rs b/crates/experimentation_platform/src/api/experiments/handlers.rs index 06c03333..afdb2c41 100644 --- a/crates/experimentation_platform/src/api/experiments/handlers.rs +++ b/crates/experimentation_platform/src/api/experiments/handlers.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, HashSet}; +use actix_http::header::{HeaderMap, HeaderName, HeaderValue}; use actix_web::{ get, patch, post, put, web::{self, Data, Json, Query}, @@ -14,7 +15,7 @@ use diesel::{ use service_utils::{ bad_argument, - helpers::{construct_request_headers, request}, + helpers::{construct_request_headers, generate_snowflake_id, request}, response_error, result::{self as superposition, AppError}, unexpected_error, @@ -23,7 +24,7 @@ use service_utils::{ use superposition_types::{SuperpositionUser, User}; use reqwest::{Method, Response, StatusCode}; -use service_utils::service::types::{AppState, DbConnection, Tenant}; +use service_utils::service::types::{AppState, CustomHeaders, DbConnection, Tenant}; use super::{ helpers::{ @@ -58,6 +59,25 @@ pub fn endpoints(scope: Scope) -> Scope { .service(update_overrides) } +fn construct_header_map( + tenant: &str, + config_tags: Option, +) -> superposition::Result { + let mut headers = HeaderMap::new(); + let tenant_val = HeaderValue::from_str(tenant).map_err(|err| { + log::error!("failed to set header: {}", err); + unexpected_error!("Something went wrong") + })?; + headers.insert(HeaderName::from_static("x-tenant"), tenant_val); + if let Some(val) = config_tags { + let tag_val = HeaderValue::from_str(val.as_str()).map_err(|err| { + log::error!("failed to set header: {}", err); + unexpected_error!("Something went wrong") + })?; + headers.insert(HeaderName::from_static("x-config-tags"), tag_val); + } + Ok(headers) +} async fn parse_error_response( response: reqwest::Response, ) -> superposition::Result<(StatusCode, superposition::ErrorResponse)> { @@ -105,6 +125,7 @@ async fn process_cac_http_response( #[post("")] async fn create( state: Data, + custom_headers: CustomHeaders, req: web::Json, db_conn: DbConnection, tenant: Tenant, @@ -163,8 +184,7 @@ async fn create( } // generating snowflake id for experiment - let mut snowflake_generator = state.snowflake_generator.lock().unwrap(); - let experiment_id = snowflake_generator.real_time_generate(); + let experiment_id = generate_snowflake_id(&state)?; //create overrides in CAC, if successfull then create experiment in DB let mut cac_operations: Vec = vec![]; @@ -195,11 +215,12 @@ async fn create( // creating variants' context in CAC let http_client = reqwest::Client::new(); let url = state.cac_host.clone() + "/context/bulk-operations"; + let headers_map = construct_header_map(tenant.as_str(), custom_headers.config_tags)?; // Step 1: Perform the HTTP request and handle errors let response = http_client .put(&url) - .header("x-tenant", tenant.as_str()) + .headers(headers_map.into()) .header( "Authorization", format!("{} {}", user.get_auth_type(), user.get_auth_token()), @@ -256,6 +277,7 @@ async fn create( async fn conclude_handler( state: Data, path: web::Path, + custom_headers: CustomHeaders, req: web::Json, db_conn: DbConnection, tenant: Tenant, @@ -265,6 +287,7 @@ async fn conclude_handler( let response = conclude( state, path.into_inner(), + custom_headers.config_tags, req.into_inner(), conn, tenant, @@ -277,6 +300,7 @@ async fn conclude_handler( pub async fn conclude( state: Data, experiment_id: i64, + config_tags: Option, req: ConcludeExperimentRequest, mut conn: PooledConnection>, tenant: Tenant, @@ -368,9 +392,11 @@ pub async fn conclude( // calling CAC bulk api with operations as payload let http_client = reqwest::Client::new(); let url = state.cac_host.clone() + "/context/bulk-operations"; + let headers_map = construct_header_map(tenant.as_str(), config_tags)?; + let response = http_client .put(&url) - .header("x-tenant", tenant.as_str()) + .headers(headers_map.into()) .header( "Authorization", format!("{} {}", user.get_auth_type(), user.get_auth_token()), @@ -541,6 +567,7 @@ async fn ramp( async fn update_overrides( params: web::Path, state: Data, + custom_headers: CustomHeaders, db_conn: DbConnection, req: web::Json, tenant: Tenant, @@ -684,10 +711,11 @@ async fn update_overrides( let http_client = reqwest::Client::new(); let url = state.cac_host.clone() + "/context/bulk-operations"; + let headers_map = construct_header_map(tenant.as_str(), custom_headers.config_tags)?; let response = http_client .put(&url) - .header("x-tenant", tenant.as_str()) + .headers(headers_map.into()) .header( "Authorization", format!("{} {}", user.get_auth_type(), user.get_auth_token()), diff --git a/crates/experimentation_platform/src/api/experiments/types.rs b/crates/experimentation_platform/src/api/experiments/types.rs index d4a1c0b5..f6db7c71 100644 --- a/crates/experimentation_platform/src/api/experiments/types.rs +++ b/crates/experimentation_platform/src/api/experiments/types.rs @@ -25,7 +25,6 @@ pub struct Variant { #[derive(Deserialize)] pub struct ExperimentCreateRequest { pub name: String, - pub context: Value, pub variants: Vec, } @@ -106,7 +105,7 @@ pub struct ContextPutReq { pub r#override: Value, } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub enum ContextAction { PUT(ContextPutReq), DELETE(String), diff --git a/crates/experimentation_platform/src/db/models.rs b/crates/experimentation_platform/src/db/models.rs index 3711c011..830f9042 100644 --- a/crates/experimentation_platform/src/db/models.rs +++ b/crates/experimentation_platform/src/db/models.rs @@ -1,12 +1,21 @@ use crate::db::schema::*; use chrono::{DateTime, NaiveDateTime, Utc}; -use diesel::{Insertable, Queryable, QueryableByName, Selectable}; +use diesel::{ + query_builder::QueryId, Insertable, Queryable, QueryableByName, Selectable, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; #[derive( - Debug, Clone, Copy, PartialEq, Deserialize, Serialize, diesel_derive_enum::DbEnum, + Debug, + Clone, + Copy, + PartialEq, + Deserialize, + Serialize, + diesel_derive_enum::DbEnum, + QueryId, )] #[DbValueStyle = "UPPERCASE"] #[ExistingTypePath = "crate::db::schema::sql_types::ExperimentStatusType"] diff --git a/crates/service_utils/Cargo.toml b/crates/service_utils/Cargo.toml index eb4bd6c9..1dc51c6d 100644 --- a/crates/service_utils/Cargo.toml +++ b/crates/service_utils/Cargo.toml @@ -34,3 +34,4 @@ derive_more = { workspace = true } reqwest = { workspace = true } thiserror = { workspace = true } once_cell = { workspace = true } +regex = { workspace = true } diff --git a/crates/service_utils/src/helpers.rs b/crates/service_utils/src/helpers.rs index 1ee5c331..b47453d4 100644 --- a/crates/service_utils/src/helpers.rs +++ b/crates/service_utils/src/helpers.rs @@ -1,16 +1,20 @@ -use actix_web::{error::ErrorInternalServerError, Error}; +use super::result; +use crate::service::types::AppState; +use actix_web::{error::ErrorInternalServerError, web::Data, Error}; +use anyhow::anyhow; use jsonschema::{error::ValidationErrorKind, ValidationError}; use log::info; +use regex::Regex; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::de::{self, IntoDeserializer}; +use serde_json::{Map, Value}; use std::{ env::VarError, fmt::{self, Display}, str::FromStr, }; -use super::result; -use serde_json::{Map, Value}; +const CONFIG_TAG_REGEX: &str = "^[a-zA-Z0-9_-]{1,64}$"; //WARN Do NOT use this fxn inside api requests, instead add the required //env to AppState and get value from there. As this panics, it should @@ -369,3 +373,40 @@ where response.json::().await } +pub fn generate_snowflake_id(state: &Data) -> result::Result { + let mut snowflake_generator = state.snowflake_generator.lock().map_err(|e| { + log::error!("snowflake_id generation failed {}", e); + result::AppError::UnexpectedError(anyhow!("snowflake_id generation failed {}", e)) + })?; + let id = snowflake_generator.real_time_generate(); + // explicitly dropping snowflake_generator so that lock is released and it can be acquired in bulk-operations handler + drop(snowflake_generator); + Ok(id) +} + +pub fn parse_config_tags( + config_tags: Option, +) -> result::Result>> { + let regex = Regex::new(CONFIG_TAG_REGEX).map_err(|err| { + log::error!("regex match failed for tags {}", err); + result::AppError::UnexpectedError(anyhow!("Something went wrong")) + })?; + match config_tags { + None => Ok(None), + Some(val) => { + let tags = val + .split(",") + .map(|s| { + if !regex.is_match(s) { + return Err(result::AppError::BadArgument( + "Invalid config_tags value".to_string(), + )); + } else { + Ok(s.to_owned()) + } + }) + .collect::>>()?; + Ok(Some(tags)) + } + } +} diff --git a/crates/service_utils/src/service/types.rs b/crates/service_utils/src/service/types.rs index 7774a06e..f2ec0ace 100644 --- a/crates/service_utils/src/service/types.rs +++ b/crates/service_utils/src/service/types.rs @@ -7,6 +7,7 @@ use std::{ collections::HashSet, future::{ready, Ready}, str::FromStr, + sync::Arc, }; use actix_web::{error, web::Data, Error, FromRequest, HttpMessage}; @@ -37,7 +38,7 @@ pub struct AppState { pub default_config_validation_schema: JSONSchema, pub meta_schema: JSONSchema, pub experimentation_flags: ExperimentationFlags, - pub snowflake_generator: Mutex, + pub snowflake_generator: Arc>, pub enable_tenant_and_scope: bool, pub tenant_middleware_exclusion_list: HashSet, pub service_prefix: String, @@ -208,3 +209,24 @@ impl FromRequest for DbConnection { ready(result) } } + +pub struct CustomHeaders { + pub config_tags: Option, +} +impl FromRequest for CustomHeaders { + type Error = Error; + type Future = Ready>; + + fn from_request( + req: &actix_web::HttpRequest, + _: &mut actix_web::dev::Payload, + ) -> Self::Future { + let header_val = req.headers(); + let val = CustomHeaders { + config_tags: header_val.get("x-config-tags").and_then(|header_val| { + header_val.to_str().map_or(None, |v| Some(v.to_string())) + }), + }; + ready(Ok(val)) + } +} diff --git a/crates/superposition/src/main.rs b/crates/superposition/src/main.rs index 14a65fb2..2a539331 100644 --- a/crates/superposition/src/main.rs +++ b/crates/superposition/src/main.rs @@ -7,6 +7,7 @@ use context_aware_config::helpers::{ }; use dotenv; use experimentation_platform::api::*; +use std::sync::Arc; use std::{collections::HashSet, io::Result}; use superposition_types::User; @@ -128,6 +129,8 @@ async fn main() -> Result<()> { return view! { }; }); + let snowflake_generator = Arc::new(Mutex::new(SnowflakeIdGenerator::new(1, 1))); + HttpServer::new(move || { let leptos_options = &conf.leptos_options; let site_root = &leptos_options.site_root; @@ -155,7 +158,7 @@ async fn main() -> Result<()> { allow_same_keys_non_overlapping_ctx.to_owned(), }, - snowflake_generator: Mutex::new(SnowflakeIdGenerator::new(1,1)), + snowflake_generator: snowflake_generator.clone(), meta_schema: get_meta_schema(), app_env: app_env.to_owned(), enable_tenant_and_scope: enable_tenant_and_scope.to_owned(), diff --git a/postman/cac/.meta.json b/postman/cac/.meta.json index 7d010895..1e7dc4e1 100644 --- a/postman/cac/.meta.json +++ b/postman/cac/.meta.json @@ -1,7 +1,7 @@ { "childrenOrder": [ - "config", "Default Config", + "config", "Dimension", "Context", "audit log" diff --git a/postman/cac/config/Get Config/event.test.js b/postman/cac/config/Get Config/event.test.js index bb89bec7..9cb78dde 100644 --- a/postman/cac/config/Get Config/event.test.js +++ b/postman/cac/config/Get Config/event.test.js @@ -4,7 +4,7 @@ pm.test("200 check", function() { let expected_response = { "contexts": [], "overrides": {}, - "default_configs": {} + "default_configs": {"key1": "value1"} }; pm.expect(JSON.stringify(response)).to.be.eq(JSON.stringify(expected_response)); })