Skip to content

Commit

Permalink
re-enable history sync of consent update on msg send, move history sy…
Browse files Browse the repository at this point in the history
…nc to dedicated thread
  • Loading branch information
cameronvoell committed Jan 14, 2025
1 parent 03aad1e commit a5a531e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 35 deletions.
4 changes: 1 addition & 3 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5572,9 +5572,7 @@ mod tests {
for i in 1..=6 {
let conversation = alix_conversation.clone();
let message = format!("Message {}", i);
tasks.spawn(async move {
conversation.send(message.as_bytes().to_vec()).await
});
tasks.spawn(async move { conversation.send(message.as_bytes().to_vec()).await });
}

// Collect results as they complete
Expand Down
61 changes: 38 additions & 23 deletions xmtp_mls/src/groups/device_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,30 +326,45 @@ where
}

fn spawn_worker(mut self) {
crate::spawn(None, async move {
let inbox_id = self.client.inbox_id().to_string();
let installation_id = hex::encode(self.client.installation_public_key());
while let Err(err) = self.run().await {
tracing::info!("Running worker..");
match err {
DeviceSyncError::Client(ClientError::Storage(
StorageError::PoolNeedsConnection,
)) => {
tracing::warn!(
inbox_id,
installation_id,
"Pool disconnected. task will restart on reconnect"
);
break;
}
_ => {
tracing::error!(inbox_id, installation_id, "sync worker error {err}");
// Wait 2 seconds before restarting.
xmtp_common::time::sleep(Duration::from_secs(2)).await;
// Create a new runtime for the worker
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create runtime");

// Spawn on a dedicated thread
std::thread::Builder::new()
.name("sync_worker".into())
.spawn(move || {
runtime.block_on(async move {
let inbox_id = self.client.inbox_id().to_string();
let installation_id = hex::encode(self.client.installation_public_key());

while let Err(err) = self.run().await {
match err {
DeviceSyncError::Client(ClientError::Storage(
StorageError::PoolNeedsConnection,
)) => {
tracing::warn!(
inbox_id,
installation_id,
"Pool disconnected. task will restart on reconnect"
);
break;
}
_ => {
tracing::error!(
inbox_id,
installation_id,
"sync worker error {err}"
);
xmtp_common::time::sleep(Duration::from_secs(2)).await;
}
}
}
}
}
});
});
})
.expect("Failed to spawn sync worker thread");
}
}

Expand Down
18 changes: 9 additions & 9 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1250,15 +1250,15 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
);
conn.insert_or_replace_consent_records(&[consent_record.clone()])?;

// if self.client.history_sync_url().is_some() {
// // Dispatch an update event so it can be synced across devices
// let _ = self
// .client
// .local_events()
// .send(LocalEvents::OutgoingPreferenceUpdates(vec![
// UserPreferenceUpdate::ConsentUpdate(consent_record),
// ]));
// }
if self.client.history_sync_url().is_some() {
// Dispatch an update event so it can be synced across devices
let _ = self
.client
.local_events()
.send(LocalEvents::OutgoingPreferenceUpdates(vec![
UserPreferenceUpdate::ConsentUpdate(consent_record),
]));
}

Ok(())
}
Expand Down

0 comments on commit a5a531e

Please sign in to comment.