Skip to content

Commit

Permalink
rust repro of group stuck on prior epoch while streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
cameronvoell committed Feb 4, 2025
1 parent ae4eb9c commit 9de4394
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
98 changes: 96 additions & 2 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,11 @@ impl FfiConversation {
let conversation_type = self.inner.conversation_type(&provider).await?;
Ok(conversation_type.into())
}

pub async fn epoch(&self) -> u64 {
let provider = self.inner.mls_provider().unwrap();
self.inner.epoch(&provider).await.unwrap()
}
}

#[uniffi::export]
Expand Down Expand Up @@ -4296,8 +4301,6 @@ mod tests {
assert_eq!(alix_members.len(), 1);
}

// test is also showing intermittent failures with database locked msg
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_stream_and_update_name_without_forking_group() {
let alix = new_test_client().await;
Expand Down Expand Up @@ -6448,4 +6451,95 @@ mod tests {
.unwrap();
assert_eq!(dms.len(), 1, "Should still have one DM conversation");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_stream_and_receive_metadata_update() {
// Create test clients
let alix = new_test_client().await;
let bo = new_test_client().await;

// If we comment out this stream, the test passes
let stream_callback = Arc::new(RustStreamCallback::default());
let stream = bo
.conversations()
.stream_all_messages(stream_callback.clone())
.await;
stream.wait_for_ready().await;

// Create group and perform actions
let alix_group = alix
.conversations()
.create_group(
vec![bo.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

// Send first message
let mut buf = Vec::new();
TextCodec::encode("hello1".to_string())
.unwrap()
.encode(&mut buf)
.unwrap();
alix_group.send(buf).await.unwrap();

// Update group name
alix_group
.update_group_name("hello".to_string())
.await
.unwrap();

// Send second message
let mut buf = Vec::new();
TextCodec::encode("hello2".to_string())
.unwrap()
.encode(&mut buf)
.unwrap();
alix_group.send(buf).await.unwrap();

// Sync Bo's client
bo.conversations().sync().await.unwrap();

// Get Bo's groups and verify count
let bo_groups = bo
.conversations()
.list_groups(FfiListConversationsOptions::default())
.unwrap();
assert_eq!(bo_groups.len(), 1);
let bo_group = bo_groups[0].conversation.clone();

// Sync both groups
bo_group.sync().await.unwrap();
alix_group.sync().await.unwrap();

// Get Bo's messages and verify content types
let bo_messages = bo_group
.find_messages(FfiListMessagesOptions::default())
.await
.unwrap();
assert_eq!(bo_messages.len(), 3);

// Verify message content types
let message_types: Vec<String> = bo_messages
.iter()
.map(|msg| {
let encoded_content = EncodedContent::decode(msg.content.as_slice()).unwrap();
encoded_content.r#type.unwrap().type_id
})
.collect();

assert_eq!(message_types[0], "text");
assert_eq!(message_types[1], "group_updated");
assert_eq!(message_types[2], "text");

// this assertion is failing even though bo_group has the group_updated msg in the DB (returned from find_messages() call)
assert_eq!(alix_group.epoch().await, bo_group.epoch().await);
assert_eq!(alix_group.group_name().unwrap(), "hello");
// this assertion will also fail
assert_eq!(bo_group.group_name().unwrap(), "hello");

// Clean up stream
stream.end_and_wait().await.unwrap();
}
}
8 changes: 8 additions & 0 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,14 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
.await
}

/// Get the current epoch number of the group.
pub async fn epoch(&self, provider: &XmtpOpenMlsProvider) -> Result<u64, GroupError> {
self.load_mls_group_with_lock_async(provider, |mls_group| {
futures::future::ready(Ok(mls_group.epoch().as_u64()))
})
.await
}

/// Get the `GroupMutableMetadata` of the group.
pub fn mutable_metadata(
&self,
Expand Down

0 comments on commit 9de4394

Please sign in to comment.