Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: OrgId, WorkspaceId, SchemaName cleanup and refactor #379

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ SUPERPOSITION_VERSION="v0.1.0"
HOSTNAME="<application_name>-<deployment_id>-<replicaset>-<pod>"
ACTIX_KEEP_ALIVE=120
MAX_DB_CONNECTION_POOL_SIZE=3
ENABLE_TENANT_AND_SCOPE=true
TENANTS=dev,test,superposition
TENANT_MIDDLEWARE_EXCLUSION_LIST="/health,/assets/favicon.ico,/pkg/frontend.js,/pkg,/pkg/frontend_bg.wasm,/pkg/tailwind.css,/pkg/style.css,/assets,/admin,/oidc/login,/admin/organisations,/organisations,/organisations/switch/{organisation_id},/"
SERVICE_PREFIX=""
Expand Down
6 changes: 3 additions & 3 deletions crates/context_aware_config/src/api/audit_log/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_web::{
};
use chrono::{Duration, Utc};
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
use service_utils::service::types::{DbConnection, Tenant};
use service_utils::service::types::{DbConnection, SchemaName};
use superposition_types::{
database::{models::cac::EventLog, schema::event_log::dsl as event_log},
result as superposition, PaginatedResponse,
Expand All @@ -21,12 +21,12 @@ pub fn endpoints() -> Scope {
async fn get_audit_logs(
filters: Query<AuditQueryFilters>,
db_conn: DbConnection,
tenant: Tenant,
schema_name: SchemaName,
) -> superposition::Result<Json<PaginatedResponse<EventLog>>> {
let DbConnection(mut conn) = db_conn;

let query_builder = |filters: &AuditQueryFilters| {
let mut builder = event_log::event_log.schema_name(&tenant).into_boxed();
let mut builder = event_log::event_log.schema_name(&schema_name).into_boxed();
if let Some(tables) = filters.table.clone() {
builder = builder.filter(event_log::table_name.eq_any(tables.0));
}
Expand Down
85 changes: 45 additions & 40 deletions crates/context_aware_config/src/api/config/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ use itertools::Itertools;
use serde_json::{json, Map, Value};
#[cfg(feature = "high-performance-mode")]
use service_utils::service::types::AppState;
use service_utils::service::types::Tenant;
use service_utils::{
helpers::extract_dimensions,
service::types::{AppHeader, DbConnection},
service::types::{AppHeader, DbConnection, SchemaName},
};
#[cfg(feature = "high-performance-mode")]
use superposition_macros::response_error;
Expand Down Expand Up @@ -81,13 +80,13 @@ fn validate_version_in_params(
pub fn add_audit_id_to_header(
conn: &mut DBConnection,
resp_builder: &mut HttpResponseBuilder,
tenant: &Tenant,
schema_name: &SchemaName,
) {
if let Ok(uuid) = event_log::event_log
.select(event_log::id)
.filter(event_log::table_name.eq("contexts"))
.order_by(event_log::timestamp.desc())
.schema_name(tenant)
.schema_name(schema_name)
.first::<Uuid>(conn)
{
resp_builder.insert_header((AppHeader::XAuditId.to_string(), uuid.to_string()));
Expand Down Expand Up @@ -126,12 +125,12 @@ fn add_config_version_to_header(

fn get_max_created_at(
conn: &mut DBConnection,
tenant: &Tenant,
schema_name: &SchemaName,
) -> Result<NaiveDateTime, diesel::result::Error> {
event_log::event_log
.select(max(event_log::timestamp))
.filter(event_log::table_name.eq_any(vec!["contexts", "default_configs"]))
.schema_name(tenant)
.schema_name(schema_name)
.first::<Option<NaiveDateTime>>(conn)
.and_then(|res| res.ok_or(diesel::result::Error::NotFound))
}
Expand All @@ -156,14 +155,14 @@ fn is_not_modified(max_created_at: Option<NaiveDateTime>, req: &HttpRequest) ->
pub fn generate_config_from_version(
version: &mut Option<i64>,
conn: &mut DBConnection,
tenant: &Tenant,
schema_name: &SchemaName,
) -> superposition::Result<Config> {
if let Some(val) = version {
let val = val.clone();
let config = config_versions::config_versions
.select(config_versions::config)
.filter(config_versions::id.eq(val))
.schema_name(tenant)
.schema_name(schema_name)
.get_result::<Value>(conn)
.map_err(|err| {
log::error!("failed to fetch config with error: {}", err);
Expand All @@ -177,19 +176,19 @@ pub fn generate_config_from_version(
match config_versions::config_versions
.select((config_versions::id, config_versions::config))
.order(config_versions::created_at.desc())
.schema_name(tenant)
.schema_name(schema_name)
.first::<(i64, Value)>(conn)
{
Ok((latest_version, config)) => {
*version = Some(latest_version);
serde_json::from_value::<Config>(config).or_else(|err| {
log::error!("failed to decode config: {}", err);
generate_cac(conn, tenant)
generate_cac(conn, schema_name)
})
}
Err(err) => {
log::error!("failed to find latest config: {err}");
generate_cac(conn, tenant)
generate_cac(conn, schema_name)
}
}
}
Expand Down Expand Up @@ -410,15 +409,15 @@ fn construct_new_payload(

#[allow(clippy::too_many_arguments)]
async fn reduce_config_key(
user: User,
user: &User,
conn: &mut DBConnection,
mut og_contexts: Vec<Context>,
mut og_overrides: HashMap<String, Overrides>,
check_key: &str,
dimension_schema_map: &HashMap<String, DimensionData>,
default_config: Map<String, Value>,
is_approve: bool,
tenant: Tenant,
schema_name: &SchemaName,
) -> superposition::Result<Config> {
let default_config_val =
default_config
Expand Down Expand Up @@ -489,15 +488,21 @@ async fn reduce_config_key(

if *to_be_deleted {
if is_approve {
let _ = context::delete(cid.clone(), user.clone(), conn, &tenant);
let _ = context::delete(cid.clone(), user, conn, schema_name);
}
og_contexts.retain(|x| x.id != *cid);
} else {
if is_approve {
let _ = context::delete(cid.clone(), user.clone(), conn, &tenant);
let _ = context::delete(cid.clone(), user, conn, schema_name);
if let Ok(put_req) = construct_new_payload(request_payload) {
let _ =
context::put(put_req, conn, false, &user, &tenant, false);
let _ = context::put(
put_req,
conn,
false,
&user,
schema_name,
false,
);
}
}

Expand Down Expand Up @@ -541,7 +546,7 @@ async fn reduce_config(
req: HttpRequest,
user: User,
db_conn: DbConnection,
tenant: Tenant,
schema_name: SchemaName,
) -> superposition::Result<HttpResponse> {
let DbConnection(mut conn) = db_conn;
let is_approve = req
Expand All @@ -550,28 +555,28 @@ async fn reduce_config(
.and_then(|value| value.to_str().ok().and_then(|s| s.parse::<bool>().ok()))
.unwrap_or(false);

let dimensions_vec = get_dimension_data(&mut conn, &tenant)?;
let dimensions_vec = get_dimension_data(&mut conn, &schema_name)?;
let dimensions_data_map = get_dimension_data_map(&dimensions_vec)?;
let mut config = generate_cac(&mut conn, &tenant)?;
let mut config = generate_cac(&mut conn, &schema_name)?;
let default_config = (config.default_configs).clone();
for (key, _) in default_config {
let contexts = config.contexts;
let overrides = config.overrides;
let default_config = config.default_configs;
config = reduce_config_key(
user.clone(),
&user,
&mut conn,
contexts.clone(),
overrides.clone(),
key.as_str(),
&dimensions_data_map,
default_config.clone(),
is_approve,
tenant.clone(),
&schema_name,
)
.await?;
if is_approve {
config = generate_cac(&mut conn, &tenant)?;
config = generate_cac(&mut conn, &schema_name)?;
}
}

Expand All @@ -581,16 +586,16 @@ async fn reduce_config(
#[cfg(feature = "high-performance-mode")]
#[get("/fast")]
async fn get_config_fast(
tenant: Tenant,
schema_name: SchemaName,
state: Data<AppState>,
) -> superposition::Result<HttpResponse> {
use fred::interfaces::MetricsInterface;

log::debug!("Started redis fetch");
let config_key = format!("{}::cac_config", *tenant);
let last_modified_at_key = format!("{}::cac_config::last_modified_at", *tenant);
let audit_id_key = format!("{}::cac_config::audit_id", *tenant);
let config_version_key = format!("{}::cac_config::config_version", *tenant);
let config_key = format!("{}::cac_config", *schema_name);
let last_modified_at_key = format!("{}::cac_config::last_modified_at", *schema_name);
let audit_id_key = format!("{}::cac_config::audit_id", *schema_name);
let config_version_key = format!("{}::cac_config::config_version", *schema_name);
let client = state.redis.next_connected();
let config = client.get::<String, String>(config_key).await;
let metrics = client.take_latency_metrics();
Expand Down Expand Up @@ -672,11 +677,11 @@ async fn get_config(
req: HttpRequest,
db_conn: DbConnection,
query_map: superposition_query::Query<QueryMap>,
tenant: Tenant,
schema_name: SchemaName,
) -> superposition::Result<HttpResponse> {
let DbConnection(mut conn) = db_conn;

let max_created_at = get_max_created_at(&mut conn, &tenant)
let max_created_at = get_max_created_at(&mut conn, &schema_name)
.map_err(|e| log::error!("failed to fetch max timestamp from event_log: {e}"))
.ok();

Expand All @@ -691,7 +696,7 @@ async fn get_config(
let mut query_params_map = query_map.into_inner();
let mut config_version = validate_version_in_params(&mut query_params_map)?;
let mut config =
generate_config_from_version(&mut config_version, &mut conn, &tenant)?;
generate_config_from_version(&mut config_version, &mut conn, &schema_name)?;

config = apply_prefix_filter_to_config(&mut query_params_map, config)?;

Expand All @@ -701,7 +706,7 @@ async fn get_config(

let mut response = HttpResponse::Ok();
add_last_modified_to_header(max_created_at, &mut response);
add_audit_id_to_header(&mut conn, &mut response, &tenant);
add_audit_id_to_header(&mut conn, &mut response, &schema_name);
add_config_version_to_header(&config_version, &mut response);
Ok(response.json(config))
}
Expand All @@ -711,12 +716,12 @@ async fn get_resolved_config(
req: HttpRequest,
db_conn: DbConnection,
query_map: superposition_query::Query<QueryMap>,
tenant: Tenant,
schema_name: SchemaName,
) -> superposition::Result<HttpResponse> {
let DbConnection(mut conn) = db_conn;
let mut query_params_map = query_map.into_inner();

let max_created_at = get_max_created_at(&mut conn, &tenant)
let max_created_at = get_max_created_at(&mut conn, &schema_name)
.map_err(|e| log::error!("failed to fetch max timestamp from event_log : {e}"))
.ok();

Expand All @@ -728,7 +733,7 @@ async fn get_resolved_config(

let mut config_version = validate_version_in_params(&mut query_params_map)?;
let mut config =
generate_config_from_version(&mut config_version, &mut conn, &tenant)?;
generate_config_from_version(&mut config_version, &mut conn, &schema_name)?;

config = apply_prefix_filter_to_config(&mut query_params_map, config)?;

Expand Down Expand Up @@ -772,7 +777,7 @@ async fn get_resolved_config(
};
let mut resp = HttpResponse::Ok();
add_last_modified_to_header(max_created_at, &mut resp);
add_audit_id_to_header(&mut conn, &mut resp, &tenant);
add_audit_id_to_header(&mut conn, &mut resp, &schema_name);
add_config_version_to_header(&config_version, &mut resp);

Ok(resp.json(response))
Expand All @@ -782,13 +787,13 @@ async fn get_resolved_config(
async fn get_config_versions(
db_conn: DbConnection,
filters: Query<PaginationParams>,
tenant: Tenant,
schema_name: SchemaName,
) -> superposition::Result<Json<PaginatedResponse<ConfigVersion>>> {
let DbConnection(mut conn) = db_conn;

if let Some(true) = filters.all {
let config_versions: Vec<ConfigVersion> = config_versions::config_versions
.schema_name(&tenant)
.schema_name(&schema_name)
.get_results(&mut conn)?;
return Ok(Json(PaginatedResponse {
total_pages: 1,
Expand All @@ -799,12 +804,12 @@ async fn get_config_versions(

let n_version: i64 = config_versions::config_versions
.count()
.schema_name(&tenant)
.schema_name(&schema_name)
.get_result(&mut conn)?;

let limit = filters.count.unwrap_or(10);
let mut builder = config_versions::config_versions
.schema_name(&tenant)
.schema_name(&schema_name)
.into_boxed()
.order(config_versions::created_at.desc())
.limit(limit);
Expand Down
Loading
Loading