From c58ed4c4845959993fd659430dcea6c742e0d763 Mon Sep 17 00:00:00 2001 From: Jackmin801 <56836461+Jackmin801@users.noreply.github.com> Date: Thu, 16 Jan 2025 09:48:02 -0800 Subject: [PATCH] Dont return quorum if requester isnt involved (#72) * dont return quorum if requester isnt involved * refactor: make cloning more consistent * add unit test * refactor: take out some boilerplate --- src/lighthouse.rs | 130 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 128 insertions(+), 2 deletions(-) diff --git a/src/lighthouse.rs b/src/lighthouse.rs index 420bb3c..20d72b8 100644 --- a/src/lighthouse.rs +++ b/src/lighthouse.rs @@ -458,7 +458,7 @@ impl LighthouseService for Arc { requester.replica_id.clone(), QuorumMemberDetails { joined: Instant::now(), - member: requester, + member: requester.clone(), }, ); let rx = state.channel.subscribe(); @@ -471,7 +471,28 @@ impl LighthouseService for Arc { rx }; - let quorum = rx.recv().await.map_err(|e| Status::from_error(e.into()))?; + let quorum = loop { + let current_quorum = rx.recv().await.map_err(|e| Status::from_error(e.into()))?; + + if current_quorum + .participants + .iter() + .any(|p| p.replica_id == requester.replica_id) + { + break current_quorum; + } + + // Only continue the loop if the replica is not in the quorum + let mut state = self.state.lock().await; + state.participants.insert( + requester.replica_id.clone(), + QuorumMemberDetails { + joined: Instant::now(), + member: requester.clone(), + }, + ); + info!("Replica {} not in quorum, retrying", &requester.replica_id); + }; let reply = LighthouseQuorumResponse { quorum: Some(quorum), @@ -995,4 +1016,109 @@ mod tests { // replica_id changed assert!(quorum_changed(&a, &c)); } + #[tokio::test] + + async fn test_lighthouse_join_during_shrink() -> Result<()> { + fn create_member(id: &str, addr_num: &str, step: i64, shrink_only: bool) -> QuorumMember { + QuorumMember { + replica_id: id.to_string(), + address: format!("addr{}", addr_num), + store_address: format!("store{}", addr_num), + step, + world_size: 1, + shrink_only, + } + } + + fn create_request(member: &QuorumMember) -> tonic::Request { + tonic::Request::new(LighthouseQuorumRequest { + requester: Some(member.clone()), + }) + } + + let opt = LighthouseOpt { + min_replicas: 2, + bind: "[::]:0".to_string(), + join_timeout_ms: 1000, + quorum_tick_ms: 10, + heartbeat_timeout_ms: 5000, + }; + + // Start the lighthouse service + let lighthouse = Lighthouse::new(opt).await?; + let lighthouse_task = tokio::spawn(lighthouse.clone().run()); + + // Create client to interact with lighthouse + let mut client = lighthouse_client_new(lighthouse.address()).await?; + + // 1. First quorum + let mut first_request = create_request(&create_member("replica0", "1", 1, false)); + let mut second_request = create_request(&create_member("replica1", "2", 1, false)); + + tokio::spawn({ + let mut client = client.clone(); + async move { client.quorum(first_request).await } + }); + let first_response = client.quorum(second_request).await?; + let first_quorum = first_response.into_inner().quorum.unwrap(); + assert_eq!(first_quorum.participants.len(), 2); + assert_eq!(first_quorum.participants[0].replica_id, "replica0"); + assert_eq!(first_quorum.participants[1].replica_id, "replica1"); + assert_eq!(first_quorum.participants[1].step, 1); + + // 2. Quorum without joiner + let joining_request = create_request(&create_member("joiner", "3", 1, false)); + let joining_task = tokio::spawn({ + let mut client = client.clone(); + async move { client.quorum(joining_request).await } + }); + + // Try to shrink only + first_request = create_request(&create_member("replica0", "1", 2, true)); + second_request = create_request(&create_member("replica1", "2", 2, false)); + + tokio::spawn({ + let mut client = client.clone(); + async move { client.quorum(first_request).await } + }); + let second_response = client.quorum(second_request).await?; + let second_quorum = second_response.into_inner().quorum.unwrap(); + assert!(second_quorum + .participants + .iter() + .all(|p| p.replica_id != "joiner")); + assert_eq!(second_quorum.participants.len(), 2); + assert_eq!(second_quorum.participants[0].replica_id, "replica0"); + assert_eq!(second_quorum.participants[1].replica_id, "replica1"); + assert_eq!(second_quorum.participants[1].step, 2); + + // 3. Quorum with joiner + first_request = create_request(&create_member("replica0", "1", 3, false)); + second_request = create_request(&create_member("replica1", "2", 3, false)); + + tokio::spawn({ + let mut client = client.clone(); + async move { client.quorum(first_request).await } + }); + let third_response = client.quorum(second_request).await?; + let third_quorum = third_response.into_inner().quorum.unwrap(); + assert!(third_quorum + .participants + .iter() + .any(|p| p.replica_id == "joiner")); + assert_eq!(third_quorum.participants.len(), 3); + assert_eq!(third_quorum.participants[2].step, 3); + + let join_result = joining_task.await?; + let join_quorum = join_result.unwrap().into_inner().quorum.unwrap(); + assert!(join_quorum + .participants + .iter() + .any(|p| p.replica_id == "joiner")); + assert_eq!(join_quorum.participants.len(), 3); + assert_eq!(join_quorum.participants[2].step, 3); + + lighthouse_task.abort(); + Ok(()) + } }