diff --git a/config/config.example.toml b/config/config.example.toml index 999266321144..61fa05111b43 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -75,13 +75,10 @@ max_feed_count = 200 # The maximum number of frames that will be fe # This section provides configs for currency conversion api [forex_api] -call_delay = 21600 # Api calls are made after every 6 hrs -local_fetch_retry_count = 5 # Fetch from Local cache has retry count as 5 -local_fetch_retry_delay = 1000 # Retry delay for checking write condition -api_timeout = 20000 # Api timeouts once it crosses 20000 ms -api_key = "YOUR API KEY HERE" # Api key for making request to foreign exchange Api -fallback_api_key = "YOUR API KEY" # Api key for the fallback service -redis_lock_timeout = 26000 # Redis remains write locked for 26000 ms once the acquire_redis_lock is called +call_delay = 21600 # Expiration time for data in cache as well as redis in seconds +api_key = "" # Api key for making request to foreign exchange Api +fallback_api_key = "" # Api key for the fallback service +redis_lock_timeout = 100 # Redis remains write locked for 100 s once the acquire_redis_lock is called # Logging configuration. Logging can be either to file or console or both. diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index 967b847dae51..f7d6415e85fb 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -101,13 +101,10 @@ bucket_name = "bucket" # The AWS S3 bucket name for file storage # This section provides configs for currency conversion api [forex_api] -call_delay = 21600 # Api calls are made after every 6 hrs -local_fetch_retry_count = 5 # Fetch from Local cache has retry count as 5 -local_fetch_retry_delay = 1000 # Retry delay for checking write condition -api_timeout = 20000 # Api timeouts once it crosses 20000 ms -api_key = "YOUR API KEY HERE" # Api key for making request to foreign exchange Api -fallback_api_key = "YOUR API KEY" # Api key for the fallback service -redis_lock_timeout = 26000 # Redis remains write locked for 26000 ms once the acquire_redis_lock is called +call_delay = 21600 # Expiration time for data in cache as well as redis in seconds +api_key = "" # Api key for making request to foreign exchange Api +fallback_api_key = "" # Api key for the fallback service +redis_lock_timeout = 100 # Redis remains write locked for 100 s once the acquire_redis_lock is called [jwekey] # 3 priv/pub key pair vault_encryption_key = "" # public key in pem format, corresponding private key in rust locker diff --git a/config/development.toml b/config/development.toml index 4c9b8516b5ad..b3e3eb3665e1 100644 --- a/config/development.toml +++ b/config/development.toml @@ -78,12 +78,9 @@ ttl_for_storage_in_secs = 220752000 [forex_api] call_delay = 21600 -local_fetch_retry_count = 5 -local_fetch_retry_delay = 1000 -api_timeout = 20000 -api_key = "YOUR API KEY HERE" -fallback_api_key = "YOUR API KEY HERE" -redis_lock_timeout = 26000 +api_key = "" +fallback_api_key = "" +redis_lock_timeout = 100 [jwekey] vault_encryption_key = "" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 75699d0a9674..5952422f246a 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -31,12 +31,9 @@ pool_size = 5 [forex_api] call_delay = 21600 -local_fetch_retry_count = 5 -local_fetch_retry_delay = 1000 -api_timeout = 20000 -api_key = "YOUR API KEY HERE" -fallback_api_key = "YOUR API KEY HERE" -redis_lock_timeout = 26000 +api_key = "" +fallback_api_key = "" +redis_lock_timeout = 100 [replica_database] username = "db_user" diff --git a/crates/analytics/docs/README.md b/crates/analytics/docs/README.md index e24dc6c5af79..2eca9c7859cb 100644 --- a/crates/analytics/docs/README.md +++ b/crates/analytics/docs/README.md @@ -115,8 +115,8 @@ To configure the Forex APIs, update the `config/development.toml` or `config/doc ```toml [forex_api] -api_key = "YOUR API KEY HERE" # Replace the placeholder with your Primary API Key -fallback_api_key = "YOUR API KEY HERE" # Replace the placeholder with your Fallback API Key +api_key = "" # Replace the placeholder with your Primary API Key +fallback_api_key = "" # Replace the placeholder with your Fallback API Key ``` ### Important Note ```bash @@ -159,4 +159,4 @@ To view the data on the OpenSearch dashboard perform the following steps: - Select a time field that will be used for time-based queries - Save the index pattern -Now, head on to `Discover` under the `OpenSearch Dashboards` tab, to select the newly created index pattern and query the data \ No newline at end of file +Now, head on to `Discover` under the `OpenSearch Dashboards` tab, to select the newly created index pattern and query the data diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index ad5d9e89aaae..5f5b77fc72c6 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -299,16 +299,11 @@ pub struct PaymentLink { #[derive(Debug, Deserialize, Clone, Default)] #[serde(default)] pub struct ForexApi { - pub local_fetch_retry_count: u64, pub api_key: Secret, pub fallback_api_key: Secret, - /// in ms + /// in s pub call_delay: i64, - /// in ms - pub local_fetch_retry_delay: u64, - /// in ms - pub api_timeout: u64, - /// in ms + /// in s pub redis_lock_timeout: u64, } diff --git a/crates/router/src/core/currency.rs b/crates/router/src/core/currency.rs index 912484b014a7..8c1a85128929 100644 --- a/crates/router/src/core/currency.rs +++ b/crates/router/src/core/currency.rs @@ -15,16 +15,11 @@ pub async fn retrieve_forex( ) -> CustomResult, ApiErrorResponse> { let forex_api = state.conf.forex_api.get_inner(); Ok(ApplicationResponse::Json( - get_forex_rates( - &state, - forex_api.call_delay, - forex_api.local_fetch_retry_delay, - forex_api.local_fetch_retry_count, - ) - .await - .change_context(ApiErrorResponse::GenericNotFoundError { - message: "Unable to fetch forex rates".to_string(), - })?, + get_forex_rates(&state, forex_api.call_delay) + .await + .change_context(ApiErrorResponse::GenericNotFoundError { + message: "Unable to fetch forex rates".to_string(), + })?, )) } @@ -53,14 +48,9 @@ pub async fn get_forex_exchange_rates( state: SessionState, ) -> CustomResult { let forex_api = state.conf.forex_api.get_inner(); - let rates = get_forex_rates( - &state, - forex_api.call_delay, - forex_api.local_fetch_retry_delay, - forex_api.local_fetch_retry_count, - ) - .await - .change_context(AnalyticsError::ForexFetchFailed)?; + let rates = get_forex_rates(&state, forex_api.call_delay) + .await + .change_context(AnalyticsError::ForexFetchFailed)?; Ok((*rates.data).clone()) } diff --git a/crates/router/src/utils/currency.rs b/crates/router/src/utils/currency.rs index 9ab2780da732..7bdb8121417d 100644 --- a/crates/router/src/utils/currency.rs +++ b/crates/router/src/utils/currency.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc, time::Duration}; +use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc}; use api_models::enums; use common_utils::{date_time, errors::CustomResult, events::ApiEventMetric, ext_traits::AsyncExt}; @@ -10,7 +10,8 @@ use redis_interface::DelReply; use router_env::{instrument, tracing}; use rust_decimal::Decimal; use strum::IntoEnumIterator; -use tokio::{sync::RwLock, time::sleep}; +use tokio::sync::RwLock; +use tracing_futures::Instrument; use crate::{ logger, @@ -50,10 +51,14 @@ pub enum ForexCacheError { CouldNotAcquireLock, #[error("Provided currency not acceptable")] CurrencyNotAcceptable, + #[error("Forex configuration error: {0}")] + ConfigurationError(String), #[error("Incorrect entries in default Currency response")] DefaultCurrencyParsingError, #[error("Entry not found in cache")] EntryNotFound, + #[error("Forex data unavailable")] + ForexDataUnavailable, #[error("Expiration time invalid")] InvalidLogExpiry, #[error("Error reading local")] @@ -116,35 +121,10 @@ async fn save_forex_to_local( ) -> CustomResult<(), ForexCacheError> { let mut local = FX_EXCHANGE_RATES_CACHE.write().await; *local = Some(exchange_rates_cache_entry); + logger::debug!("forex_log: forex saved in cache"); Ok(()) } -// Alternative handler for handling the case, When no data in local as well as redis -#[allow(dead_code)] -async fn waited_fetch_and_update_caches( - state: &SessionState, - local_fetch_retry_delay: u64, - local_fetch_retry_count: u64, -) -> CustomResult { - for _n in 1..local_fetch_retry_count { - sleep(Duration::from_millis(local_fetch_retry_delay)).await; - //read from redis and update local plus break the loop and return - match retrieve_forex_from_redis(state).await { - Ok(Some(rates)) => { - save_forex_to_local(rates.clone()).await?; - return Ok(rates.clone()); - } - Ok(None) => continue, - Err(error) => { - logger::error!(?error); - continue; - } - } - } - //acquire lock one last time and try to fetch and update local & redis - successive_fetch_and_save_forex(state, None).await -} - impl TryFrom for ExchangeRates { type Error = error_stack::Report; fn try_from(value: DefaultExchangeRates) -> Result { @@ -178,44 +158,39 @@ impl From for CurrencyFactors { pub async fn get_forex_rates( state: &SessionState, call_delay: i64, - local_fetch_retry_delay: u64, - local_fetch_retry_count: u64, ) -> CustomResult { if let Some(local_rates) = retrieve_forex_from_local().await { if local_rates.is_expired(call_delay) { // expired local data - handler_local_expired(state, call_delay, local_rates).await + logger::debug!("forex_log: Forex stored in cache is expired"); + successive_fetch_and_save_forex(state, Some(local_rates)).await } else { // Valid data present in local + logger::debug!("forex_log: forex found in cache"); Ok(local_rates) } } else { // No data in local - handler_local_no_data( - state, - call_delay, - local_fetch_retry_delay, - local_fetch_retry_count, - ) - .await + handler_local_no_data(state, call_delay).await } } async fn handler_local_no_data( state: &SessionState, call_delay: i64, - _local_fetch_retry_delay: u64, - _local_fetch_retry_count: u64, ) -> CustomResult { match retrieve_forex_from_redis(state).await { Ok(Some(data)) => fallback_forex_redis_check(state, data, call_delay).await, Ok(None) => { // No data in local as well as redis - Ok(successive_fetch_and_save_forex(state, None).await?) + successive_fetch_and_save_forex(state, None).await?; + Err(ForexCacheError::ForexDataUnavailable.into()) } Err(error) => { - logger::error!(?error); - Ok(successive_fetch_and_save_forex(state, None).await?) + // Error in deriving forex rates from redis + logger::error!("forex_error: {:?}", error); + successive_fetch_and_save_forex(state, None).await?; + Err(ForexCacheError::ForexDataUnavailable.into()) } } } @@ -224,53 +199,64 @@ async fn successive_fetch_and_save_forex( state: &SessionState, stale_redis_data: Option, ) -> CustomResult { - match acquire_redis_lock(state).await { - Ok(lock_acquired) => { - if !lock_acquired { - return stale_redis_data.ok_or(ForexCacheError::CouldNotAcquireLock.into()); + // spawn a new thread and do the api fetch and write operations on redis. + let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); + if forex_api_key.is_empty() { + Err(ForexCacheError::ConfigurationError("api_keys not provided".into()).into()) + } else { + let state = state.clone(); + tokio::spawn( + async move { + acquire_redis_lock_and_fetch_data(&state) + .await + .map_err(|err| { + logger::error!(forex_error=?err); + }) + .ok(); } - let api_rates = fetch_forex_rates(state).await; - match api_rates { - Ok(rates) => successive_save_data_to_redis_local(state, rates).await, - Err(error) => { - // API not able to fetch data call secondary service - logger::error!(?error); - let secondary_api_rates = fallback_fetch_forex_rates(state).await; - match secondary_api_rates { - Ok(rates) => Ok(successive_save_data_to_redis_local(state, rates).await?), - Err(error) => stale_redis_data.ok_or({ - logger::error!(?error); - release_redis_lock(state).await?; - ForexCacheError::ApiUnresponsive.into() - }), + .in_current_span(), + ); + stale_redis_data.ok_or(ForexCacheError::EntryNotFound.into()) + } +} + +async fn acquire_redis_lock_and_fetch_data( + state: &SessionState, +) -> CustomResult<(), ForexCacheError> { + let lock_acquired = acquire_redis_lock(state).await?; + if !lock_acquired { + Err(ForexCacheError::CouldNotAcquireLock.into()) + } else { + logger::debug!("forex_log: redis lock acquired"); + let api_rates = fetch_forex_rates(state).await; + match api_rates { + Ok(rates) => successive_save_data_to_redis_local(state, rates).await, + Err(error) => { + logger::error!(forex_error=?error,"primary_forex_error"); + // API not able to fetch data call secondary service + let secondary_api_rates = fallback_fetch_forex_rates(state).await; + match secondary_api_rates { + Ok(rates) => successive_save_data_to_redis_local(state, rates).await, + Err(error) => { + release_redis_lock(state).await?; + Err(error) } } } } - Err(error) => stale_redis_data.ok_or({ - logger::error!(?error); - ForexCacheError::ApiUnresponsive.into() - }), } } async fn successive_save_data_to_redis_local( state: &SessionState, forex: FxExchangeRatesCacheEntry, -) -> CustomResult { - Ok(save_forex_to_redis(state, &forex) +) -> CustomResult<(), ForexCacheError> { + save_forex_to_redis(state, &forex) .await .async_and_then(|_rates| release_redis_lock(state)) .await .async_and_then(|_val| save_forex_to_local(forex.clone())) .await - .map_or_else( - |error| { - logger::error!(?error); - forex.clone() - }, - |_| forex.clone(), - )) } async fn fallback_forex_redis_check( @@ -282,6 +268,7 @@ async fn fallback_forex_redis_check( Some(redis_forex) => { // Valid data present in redis let exchange_rates = FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone()); + logger::debug!("forex_log: forex response found in redis"); save_forex_to_local(exchange_rates.clone()).await?; Ok(exchange_rates) } @@ -292,47 +279,19 @@ async fn fallback_forex_redis_check( } } -async fn handler_local_expired( - state: &SessionState, - call_delay: i64, - local_rates: FxExchangeRatesCacheEntry, -) -> CustomResult { - match retrieve_forex_from_redis(state).await { - Ok(redis_data) => { - match is_redis_expired(redis_data.as_ref(), call_delay).await { - Some(redis_forex) => { - // Valid data present in redis - let exchange_rates = - FxExchangeRatesCacheEntry::new(redis_forex.as_ref().clone()); - save_forex_to_local(exchange_rates.clone()).await?; - Ok(exchange_rates) - } - None => { - // Redis is expired going for API request - successive_fetch_and_save_forex(state, Some(local_rates)).await - } - } - } - Err(error) => { - // data not present in redis waited fetch - logger::error!(?error); - successive_fetch_and_save_forex(state, Some(local_rates)).await - } - } -} - async fn fetch_forex_rates( state: &SessionState, ) -> Result> { let forex_api_key = state.conf.forex_api.get_inner().api_key.peek(); + logger::debug!("forex_log: Primary api call for forex fetch"); let forex_url: String = format!("{}{}{}", FOREX_BASE_URL, forex_api_key, FOREX_BASE_CURRENCY); let forex_request = services::RequestBuilder::new() .method(services::Method::Get) .url(&forex_url) .build(); - logger::info!(?forex_request); + logger::info!(primary_forex_request=?forex_request,"forex_log: Primary api call for forex fetch"); let response = state .api_client .send_request( @@ -352,7 +311,7 @@ async fn fetch_forex_rates( "Unable to parse response received from primary api into ForexResponse", )?; - logger::info!("{:?}", forex_response); + logger::info!(primary_forex_response=?forex_response,"forex_log"); let mut conversions: HashMap = HashMap::new(); for enum_curr in enums::Currency::iter() { @@ -361,7 +320,10 @@ async fn fetch_forex_rates( let from_factor = match Decimal::new(1, 0).checked_div(**rate) { Some(rate) => rate, None => { - logger::error!("Rates for {} not received from API", &enum_curr); + logger::error!( + "forex_error: Rates for {} not received from API", + &enum_curr + ); continue; } }; @@ -369,7 +331,10 @@ async fn fetch_forex_rates( conversions.insert(enum_curr, currency_factors); } None => { - logger::error!("Rates for {} not received from API", &enum_curr); + logger::error!( + "forex_error: Rates for {} not received from API", + &enum_curr + ); } }; } @@ -392,7 +357,7 @@ pub async fn fallback_fetch_forex_rates( .url(&fallback_forex_url) .build(); - logger::info!(?fallback_forex_request); + logger::info!(fallback_forex_request=?fallback_forex_request,"forex_log: Fallback api call for forex fetch"); let response = state .api_client .send_request( @@ -413,7 +378,8 @@ pub async fn fallback_fetch_forex_rates( "Unable to parse response received from falback api into ForexResponse", )?; - logger::info!("{:?}", fallback_forex_response); + logger::info!(fallback_forex_response=?fallback_forex_response,"forex_log"); + let mut conversions: HashMap = HashMap::new(); for enum_curr in enums::Currency::iter() { match fallback_forex_response.quotes.get( @@ -428,7 +394,10 @@ pub async fn fallback_fetch_forex_rates( let from_factor = match Decimal::new(1, 0).checked_div(**rate) { Some(rate) => rate, None => { - logger::error!("Rates for {} not received from API", &enum_curr); + logger::error!( + "forex_error: Rates for {} not received from API", + &enum_curr + ); continue; } }; @@ -441,7 +410,10 @@ pub async fn fallback_fetch_forex_rates( CurrencyFactors::new(Decimal::new(1, 0), Decimal::new(1, 0)); conversions.insert(enum_curr, currency_factors); } else { - logger::error!("Rates for {} not received from API", &enum_curr); + logger::error!( + "forex_error: Rates for {} not received from API", + &enum_curr + ); } } }; @@ -450,17 +422,18 @@ pub async fn fallback_fetch_forex_rates( let rates = FxExchangeRatesCacheEntry::new(ExchangeRates::new(enums::Currency::USD, conversions)); match acquire_redis_lock(state).await { - Ok(_) => Ok(successive_save_data_to_redis_local(state, rates).await?), - Err(e) => { - logger::error!(?e); + Ok(_) => { + successive_save_data_to_redis_local(state, rates.clone()).await?; Ok(rates) } + Err(e) => Err(e), } } async fn release_redis_lock( state: &SessionState, ) -> Result> { + logger::debug!("forex_log: Releasing redis lock"); state .store .get_redis_conn() @@ -473,6 +446,7 @@ async fn release_redis_lock( async fn acquire_redis_lock(state: &SessionState) -> CustomResult { let forex_api = state.conf.forex_api.get_inner(); + logger::debug!("forex_log: Acquiring redis lock"); state .store .get_redis_conn() @@ -481,11 +455,8 @@ async fn acquire_redis_lock(state: &SessionState) -> CustomResult CustomResult<(), ForexCacheError> { + logger::debug!("forex_log: Saving forex to redis"); app_state .store .get_redis_conn() @@ -511,6 +483,7 @@ async fn save_forex_to_redis( async fn retrieve_forex_from_redis( app_state: &SessionState, ) -> CustomResult, ForexCacheError> { + logger::debug!("forex_log: Retrieving forex from redis"); app_state .store .get_redis_conn() @@ -529,6 +502,7 @@ async fn is_redis_expired( if cache.timestamp + call_delay > date_time::now_unix_timestamp() { Some(cache.data.clone()) } else { + logger::debug!("forex_log: Forex stored in redis is expired"); None } }) @@ -542,14 +516,9 @@ pub async fn convert_currency( from_currency: String, ) -> CustomResult { let forex_api = state.conf.forex_api.get_inner(); - let rates = get_forex_rates( - &state, - forex_api.call_delay, - forex_api.local_fetch_retry_delay, - forex_api.local_fetch_retry_count, - ) - .await - .change_context(ForexCacheError::ApiError)?; + let rates = get_forex_rates(&state, forex_api.call_delay) + .await + .change_context(ForexCacheError::ApiError)?; let to_currency = enums::Currency::from_str(to_currency.as_str()) .change_context(ForexCacheError::CurrencyNotAcceptable) diff --git a/loadtest/config/development.toml b/loadtest/config/development.toml index ec58ab08b878..cd0000d2a9d4 100644 --- a/loadtest/config/development.toml +++ b/loadtest/config/development.toml @@ -48,12 +48,9 @@ ttl_for_storage_in_secs = 220752000 [forex_api] call_delay = 21600 -local_fetch_retry_count = 5 -local_fetch_retry_delay = 1000 -api_timeout = 20000 -api_key = "YOUR API KEY HERE" -fallback_api_key = "YOUR API KEY HERE" -redis_lock_timeout = 26000 +api_key = "" +fallback_api_key = "" +redis_lock_timeout = 100 [eph_key] validity = 1