Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
seriousben committed Jan 9, 2025
1 parent 5cb8ac9 commit edf60e2
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 43 deletions.
4 changes: 2 additions & 2 deletions server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use filter::LabelsFilter;
use indexify_utils::{default_creation_time, get_epoch_time_in_ms};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use strum::{AsRefStr, IntoStaticStr};
use strum::AsRefStr;

// Invoke graph for all existing payloads
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -984,7 +984,7 @@ impl StateChangeId {
}

/// Return key to store in k/v db
pub fn to_key(&self) -> [u8; 8] {
fn to_key(&self) -> [u8; 8] {
self.0.to_be_bytes()
}
}
Expand Down
61 changes: 34 additions & 27 deletions server/processor/src/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ impl ProcessorLogic for NamespaceProcessor {
"running namespace processor, requests_len={}",
requests.len()
);
let timer_kvs = &[KeyValue::new("processor", "namespace")];
let timer_kvs = &[KeyValue::new(
"processor",
ProcessorType::Namespace.as_ref().to_string(),
)];
let _timer = Timer::start_with_labels(&self.metrics.processors_duration, timer_kvs);

for request in requests {
Expand All @@ -98,17 +101,19 @@ impl ProcessorLogic for NamespaceProcessor {
};
}

let mut create_task_requests = vec![];
let mut processed_state_changes = vec![];
let mut new_reduction_tasks = vec![];
let mut processed_reduction_tasks = vec![];

let state_changes = self
.indexify_state
.reader()
.get_unprocessed_state_changes(ProcessorId::new(ProcessorType::Namespace))?;

for state_change in &state_changes {
let mut create_task_requests = vec![];
let mut processed_state_changes = vec![];
let mut new_reduction_tasks = vec![];
let mut processed_reduction_tasks = vec![];

trace!("processing state change: {:?}", state_change);

match self.process_state_change(state_change).await {
Ok(result) => {
processed_state_changes.push(state_change.clone());
Expand All @@ -130,29 +135,31 @@ impl ProcessorLogic for NamespaceProcessor {
continue;
}
}
}

// Do not write an update request if there are no state changes to mark as
// processed since we did no work.
if processed_state_changes.is_empty() {
return Ok(());
}
// Do not write an update request if there are no state changes to mark as
// processed since we did no work.
if processed_state_changes.is_empty() {
return Ok(());
}

let scheduler_update_request = StateMachineUpdateRequest {
payload: RequestPayload::NamespaceProcessorUpdate(NamespaceProcessorUpdateRequest {
task_requests: create_task_requests,
reduction_tasks: ReductionTasks {
new_reduction_tasks,
processed_reduction_tasks,
},
}),
process_state_change: Some(ProcessedStateChange {
processor_id: ProcessorId::new(ProcessorType::Namespace),
state_changes: processed_state_changes,
}),
};
if let Err(err) = self.indexify_state.write(scheduler_update_request).await {
error!("error writing namespace update request: {:?}", err);
let scheduler_update_request = StateMachineUpdateRequest {
payload: RequestPayload::NamespaceProcessorUpdate(
NamespaceProcessorUpdateRequest {
task_requests: create_task_requests,
reduction_tasks: ReductionTasks {
new_reduction_tasks,
processed_reduction_tasks,
},
},
),
process_state_change: Some(ProcessedStateChange {
state_changes: processed_state_changes,
processor_id: ProcessorId::new(ProcessorType::Namespace),
}),
};
if let Err(err) = self.indexify_state.write(scheduler_update_request).await {
error!("error writing namespace update request: {:?}", err);
}
}

Ok(())
Expand Down
5 changes: 4 additions & 1 deletion server/processor/src/task_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ impl ProcessorLogic for TaskAllocationProcessor {
requests.len()
);

let timer_kvs = &[KeyValue::new("processor", "task_allocator")];
let timer_kvs = &[KeyValue::new(
"processor",
ProcessorType::TaskAllocator.as_ref().to_string(),
)];
let _timer = Timer::start_with_labels(&self.metrics.processors_duration, timer_kvs);

for request in requests {
Expand Down
34 changes: 23 additions & 11 deletions server/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ mod tests {
InvocationPayloadBuilder,
InvokeComputeGraphRequest,
Node,
ProcessorId,
ProcessorType,
RegisterExecutorRequest,
RequestPayload,
RuntimeInformation,
Expand All @@ -39,6 +41,7 @@ mod tests {
state_machine::IndexifyObjectsColumns,
test_state_store,
};
use tracing::error;

use crate::{service::Service, testing};

Expand Down Expand Up @@ -346,7 +349,9 @@ mod tests {
let Service { indexify_state, .. } = test_srv.service.clone();

let invocation_id = test_state_store::with_router_graph(&indexify_state).await;

test_srv.process_all().await?;

let tasks = indexify_state
.reader()
.list_tasks_by_compute_graph(TEST_NAMESPACE, "graph_B", &invocation_id, None, None)?
Expand All @@ -362,7 +367,9 @@ mod tests {
)
.await
.unwrap();

test_srv.process_all().await?;

let tasks = indexify_state
.reader()
.list_tasks_by_compute_graph(TEST_NAMESPACE, "graph_B", &invocation_id, None, None)
Expand All @@ -373,8 +380,14 @@ mod tests {
.reader()
.get_unprocessed_state_changes_all_processors()
.unwrap();
// has task crated state change in it.
assert_eq!(unprocessed_state_changes.len(), 1);

// has task created state change in it.
assert_eq!(
unprocessed_state_changes.len(),
0,
"{:?}",
unprocessed_state_changes
);

// Now finish the router task and we should have 3 tasks
// The last one would be for the edge which the router picks
Expand All @@ -386,20 +399,19 @@ mod tests {
)
.await
.unwrap();

test_srv.process_all().await?;

let tasks = indexify_state
.reader()
.list_tasks_by_compute_graph(TEST_NAMESPACE, "graph_B", &invocation_id, None, None)
.unwrap()
.0;
assert_eq!(tasks.len(), 3);
assert_eq!(tasks.len(), 3, "tasks: {:?}", tasks);

Ok(())
}

// TODO write edge case test case when all fn_map finish state changes are
// handled in the same runloop of the executor!

// test a simple reducer graph
//
// Tasks:
Expand Down Expand Up @@ -521,9 +533,9 @@ mod tests {
.collect();

assert_eq!(
pending_tasks.len(),
expected_num,
"pending tasks: {:?}",
pending_tasks.len(),
"pending tasks: {:#?}",
pending_tasks
);
pending_tasks.iter().for_each(|t| {
Expand Down Expand Up @@ -634,7 +646,7 @@ mod tests {
.invocation_ctx(&graph.namespace, &graph.name, &invocation_payload.id)?
.unwrap();
assert_eq!(graph_ctx.outstanding_tasks, 0);
assert_eq!(graph_ctx.completed, true);
assert!(graph_ctx.completed);
}

Ok(())
Expand Down Expand Up @@ -1479,7 +1491,7 @@ mod tests {
for state_change in all_unprocessed_state_changes_reduce.clone() {
indexify_state.db.delete_cf(
&IndexifyObjectsColumns::UnprocessedStateChanges.cf_db(&indexify_state.db),
&state_change.id.to_key(),
state_change.key(&ProcessorId::new(ProcessorType::Namespace)),
)?;
}

Expand Down Expand Up @@ -1546,7 +1558,7 @@ mod tests {
let serialized_state_change = JsonEncoder::encode(&state_change)?;
indexify_state.db.put_cf(
&IndexifyObjectsColumns::UnprocessedStateChanges.cf_db(&indexify_state.db),
&state_change.id.to_key(),
state_change.key(&ProcessorId::new(ProcessorType::Namespace)),
serialized_state_change,
)?;
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct TestService {
impl TestService {
pub async fn new() -> Result<Self> {
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("trace"));
let _ = subscriber::set_global_default(
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_filter(env_filter)),
Expand Down
3 changes: 2 additions & 1 deletion server/state_store/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use rocksdb::{
TransactionDB,
};
use strum::AsRefStr;
use tracing::{debug, error, info, instrument};
use tracing::{debug, error, info, instrument, trace};

use super::serializer::{JsonEncode, JsonEncoder};
use crate::StateChangeDispatcher;
Expand Down Expand Up @@ -943,6 +943,7 @@ pub(crate) fn mark_state_changes_processed(
process: &ProcessedStateChange,
) -> Result<()> {
for state_change in process.state_changes.iter() {
trace!("Marking state change processed: {:?}", state_change);
let key = &state_change.key(&process.processor_id);
txn.delete_cf(
&IndexifyObjectsColumns::UnprocessedStateChanges.cf_db(&db),
Expand Down

0 comments on commit edf60e2

Please sign in to comment.