Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make pool read-only, with a single write connection. #1517

Merged
merged 46 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
4dca047
wip
codabrink Jan 16, 2025
d1dc212
cleanup
codabrink Jan 16, 2025
b5205c1
all should be well
codabrink Jan 16, 2025
895587c
all should be well
codabrink Jan 16, 2025
cb4b5a0
all should not be well
codabrink Jan 16, 2025
0889db8
all should not be well
codabrink Jan 16, 2025
521aed9
all should be better
codabrink Jan 16, 2025
cf01439
move the pragma
codabrink Jan 16, 2025
d982f5d
guard
codabrink Jan 16, 2025
b73f318
Merge remote-tracking branch 'origin/main' into coda/write-flag
codabrink Jan 16, 2025
efd639e
cleanup
codabrink Jan 16, 2025
0282c31
cleanup
codabrink Jan 16, 2025
5b8cd12
test
codabrink Jan 23, 2025
d3fa5b9
funnel
codabrink Jan 23, 2025
139ae1b
undo funneling in the raw query read
codabrink Jan 23, 2025
a4a521c
cleanup
codabrink Jan 23, 2025
93544a6
Merge branch 'main' into coda/write-flag
codabrink Jan 23, 2025
980c661
clone
codabrink Jan 23, 2025
9c9ddd9
Merge branch 'coda/write-flag' of github.com:xmtp/libxmtp into coda/w…
codabrink Jan 23, 2025
c743f67
Merge remote-tracking branch 'origin/main' into coda/write-flag
codabrink Feb 3, 2025
58af868
lint
codabrink Feb 3, 2025
dc76452
cleanup
codabrink Feb 3, 2025
b83e183
Merge branch 'main' into coda/write-flag
codabrink Feb 3, 2025
7971689
fix wasm
codabrink Feb 3, 2025
530fec9
Merge branch 'coda/write-flag' of github.com:xmtp/libxmtp into coda/w…
codabrink Feb 3, 2025
1eabf95
lint
codabrink Feb 3, 2025
853db39
test cleanup
codabrink Feb 3, 2025
446354a
add comment
codabrink Feb 3, 2025
f637c2e
Merge branch 'main' into coda/write-flag
codabrink Feb 3, 2025
6afe56e
cleanup
codabrink Feb 3, 2025
80c4fea
Merge branch 'coda/write-flag' of github.com:xmtp/libxmtp into coda/w…
codabrink Feb 3, 2025
1f0a2f8
undo ignore
codabrink Feb 3, 2025
b75faed
should never happen, but lets handle gracefully anyway
codabrink Feb 3, 2025
de2fa02
bump the ephemeral connection count to 2
codabrink Feb 3, 2025
17a8d80
cleanup
codabrink Feb 3, 2025
484dafd
cleanup wasm
codabrink Feb 3, 2025
9741ef8
test fix
codabrink Feb 3, 2025
4fb6475
wasm fix
codabrink Feb 3, 2025
bd90ca6
global transaction lock
codabrink Feb 3, 2025
3687bbf
comments
codabrink Feb 3, 2025
fe0c8eb
lint
codabrink Feb 3, 2025
39230c7
fix wasm
codabrink Feb 3, 2025
0cd2375
cleanup clone
codabrink Feb 3, 2025
6ae423f
cleanup unused error
codabrink Feb 3, 2025
3ce7d1d
a little more sync work
codabrink Feb 3, 2025
de95bab
naming
codabrink Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ pub(crate) mod tests {
.unwrap();

let conn = amal.store().conn().unwrap();
conn.raw_query(|conn| diesel::delete(identity_updates::table).execute(conn))
conn.raw_query_write(|conn| diesel::delete(identity_updates::table).execute(conn))
.unwrap();

let members = group.members().await.unwrap();
Expand Down Expand Up @@ -1424,6 +1424,7 @@ pub(crate) mod tests {
.unwrap();
assert_eq!(amal_group.members().await.unwrap().len(), 1);
tracing::info!("Syncing bolas welcomes");

// See if Bola can see that they were added to the group
bola.sync_welcomes(&bola.mls_provider().unwrap())
.await
Expand Down
6 changes: 4 additions & 2 deletions xmtp_mls/src/groups/intents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
intent_kind: IntentKind,
intent_data: Vec<u8>,
) -> Result<StoredGroupIntent, GroupError> {
provider.transaction(|provider| {
let res = provider.transaction(|provider| {
let conn = provider.conn_ref();
self.queue_intent_with_conn(conn, intent_kind, intent_data)
})
});

res
}

fn queue_intent_with_conn(
Expand Down
14 changes: 8 additions & 6 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,8 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
intent_data.into(),
)?;

tracing::warn!("This makes it here?");
codabrink marked this conversation as resolved.
Show resolved Hide resolved

self.sync_until_intent_resolved(provider, intent.id).await
}

Expand Down Expand Up @@ -2216,7 +2218,7 @@ pub(crate) mod tests {

// The dm shows up
let alix_groups = alix_conn
.raw_query(|conn| groups::table.load::<StoredGroup>(conn))
.raw_query_read(|conn| groups::table.load::<StoredGroup>(conn))
.unwrap();
assert_eq!(alix_groups.len(), 2);
// They should have the same ID
Expand Down Expand Up @@ -3787,18 +3789,18 @@ pub(crate) mod tests {
let conn_1: XmtpOpenMlsProvider = bo.store().conn().unwrap().into();
let conn_2 = bo.store().conn().unwrap();
conn_2
.raw_query(|c| {
.raw_query_write(|c| {
c.batch_execute("BEGIN EXCLUSIVE").unwrap();
Ok::<_, diesel::result::Error>(())
})
.unwrap();

let process_result = bo_group.process_messages(bo_messages, &conn_1).await;
if let Some(GroupError::ReceiveErrors(errors)) = process_result.err() {
assert_eq!(errors.len(), 2);
assert!(errors
.iter()
.any(|err| err.to_string().contains("database is locked")));
assert_eq!(errors.len(), 1);
assert!(errors.iter().any(|err| err
.to_string()
.contains("cannot start a transaction within a transaction")));
} else {
panic!("Expected error")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl StoredAssociationState {
);

let association_states =
conn.raw_query(|query_conn| query.load::<StoredAssociationState>(query_conn))?;
conn.raw_query_read(|query_conn| query.load::<StoredAssociationState>(query_conn))?;

association_states
.into_iter()
Expand Down
6 changes: 3 additions & 3 deletions xmtp_mls/src/storage/encrypted_store/consent_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl DbConnection {
entity: String,
entity_type: ConsentType,
) -> Result<Option<StoredConsentRecord>, StorageError> {
Ok(self.raw_query(|conn| -> diesel::QueryResult<_> {
Ok(self.raw_query_read(|conn| -> diesel::QueryResult<_> {
dsl::consent_records
.filter(dsl::entity.eq(entity))
.filter(dsl::entity_type.eq(entity_type))
Expand Down Expand Up @@ -77,7 +77,7 @@ impl DbConnection {
);
}

let changed = self.raw_query(|conn| -> diesel::QueryResult<_> {
let changed = self.raw_query_write(|conn| -> diesel::QueryResult<_> {
let existing: Vec<StoredConsentRecord> = query.load(conn)?;
let changed: Vec<_> = records
.iter()
Expand Down Expand Up @@ -107,7 +107,7 @@ impl DbConnection {
&self,
record: &StoredConsentRecord,
) -> Result<Option<StoredConsentRecord>, StorageError> {
self.raw_query(|conn| {
self.raw_query_write(|conn| {
let maybe_inserted_consent_record: Option<StoredConsentRecord> =
diesel::insert_into(dsl::consent_records)
.values(record)
Expand Down
8 changes: 4 additions & 4 deletions xmtp_mls/src/storage/encrypted_store/conversation_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl DbConnection {
.select(conversation_list::all_columns())
.order(conversation_list_dsl::created_at_ns.asc());

self.raw_query(|conn| query.load::<ConversationListItem>(conn))?
self.raw_query_read(|conn| query.load::<ConversationListItem>(conn))?
} else {
// Only include the specified states
let query = query
Expand All @@ -153,19 +153,19 @@ impl DbConnection {
.select(conversation_list::all_columns())
.order(conversation_list_dsl::created_at_ns.asc());

self.raw_query(|conn| query.load::<ConversationListItem>(conn))?
self.raw_query_read(|conn| query.load::<ConversationListItem>(conn))?
}
} else {
// Handle the case where `consent_states` is `None`
self.raw_query(|conn| query.load::<ConversationListItem>(conn))?
self.raw_query_read(|conn| query.load::<ConversationListItem>(conn))?
};

// Were sync groups explicitly asked for? Was the include_sync_groups flag set to true?
// Then query for those separately
if matches!(conversation_type, Some(ConversationType::Sync)) || *include_sync_groups {
let query = conversation_list_dsl::conversation_list
.filter(conversation_list_dsl::conversation_type.eq(ConversationType::Sync));
let mut sync_groups = self.raw_query(|conn| query.load(conn))?;
let mut sync_groups = self.raw_query_read(|conn| query.load(conn))?;
conversations.append(&mut sync_groups);
}

Expand Down
89 changes: 75 additions & 14 deletions xmtp_mls/src/storage/encrypted_store/db_connection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use crate::storage::{xmtp_openmls_provider::XmtpOpenMlsProvider, StorageError};
use diesel::connection::TransactionManager;
use parking_lot::Mutex;
use std::fmt;
use std::sync::Arc;
use std::{
fmt,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use crate::storage::xmtp_openmls_provider::XmtpOpenMlsProvider;
use super::XmtpDb;

#[cfg(not(target_arch = "wasm32"))]
pub type DbConnection = DbConnectionPrivate<super::RawDbConnection>;
Expand All @@ -18,38 +25,83 @@ pub type DbConnection = DbConnectionPrivate<sqlite_web::connection::WasmSqliteCo
// Do not derive clone here.
// callers should be able to accomplish everything with one conn/reference.
#[doc(hidden)]
#[derive(Clone)]
codabrink marked this conversation as resolved.
Show resolved Hide resolved
pub struct DbConnectionPrivate<C> {
inner: Arc<Mutex<C>>,
read: Arc<Mutex<C>>,
write: Option<Arc<Mutex<C>>>,
codabrink marked this conversation as resolved.
Show resolved Hide resolved
// This field will funnel all reads / writes to the write connection if true.
pub(super) in_transaction: Arc<AtomicBool>,
}

/// Owned DBConnection Methods
impl<C> DbConnectionPrivate<C> {
/// Create a new [`DbConnectionPrivate`] from an existing Arc<Mutex<C>>
pub(super) fn from_arc_mutex(conn: Arc<Mutex<C>>) -> Self {
Self { inner: conn }
pub(super) fn from_arc_mutex(read: Arc<Mutex<C>>, write: Option<Arc<Mutex<C>>>) -> Self {
Self {
read,
write,
in_transaction: Arc::new(AtomicBool::new(false)),
}
}
}

impl<C> DbConnectionPrivate<C>
where
C: diesel::Connection,
{
pub(crate) fn start_transaction<Db: XmtpDb<Connection = C>>(
&self,
) -> Result<TransactionGuard, StorageError> {
let mut write = self
.write
.as_ref()
.expect("Tried to open transaction on read-only connection")
codabrink marked this conversation as resolved.
Show resolved Hide resolved
.lock();
<Db as XmtpDb>::TransactionManager::begin_transaction(&mut *write)?;

if self.in_transaction.swap(true, Ordering::SeqCst) {
panic!("Already in transaction.");
codabrink marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(TransactionGuard {
in_transaction: self.in_transaction.clone(),
})
}

fn in_transaction(&self) -> bool {
self.in_transaction.load(Ordering::SeqCst)
}

/// Do a scoped query with a mutable [`diesel::Connection`]
/// reference
pub(crate) fn raw_query<T, E, F>(&self, fun: F) -> Result<T, E>
pub(crate) fn raw_query_read<T, E, F>(&self, fun: F) -> Result<T, E>
where
F: FnOnce(&mut C) -> Result<T, E>,
{
let mut lock = self.inner.lock();
if self.in_transaction() {
if let Some(write) = &self.write {
let mut lock = write.lock();
return fun(&mut lock);
};
}

let mut lock = self.read.lock();
fun(&mut lock)
}

/// Internal-only API to get the underlying `diesel::Connection` reference
/// without a scope
/// Must be used with care. holding this reference while calling `raw_query`
/// will cause a deadlock.
pub(super) fn inner_mut_ref(&self) -> parking_lot::MutexGuard<'_, C> {
self.inner.lock()
/// Do a scoped query with a mutable [`diesel::Connection`]
/// reference
pub(crate) fn raw_query_write<T, E, F>(&self, fun: F) -> Result<T, E>
where
F: FnOnce(&mut C) -> Result<T, E>,
{
if let Some(write_conn) = &self.write {
let mut lock = write_conn.lock();
return fun(&mut lock);
}

let mut lock = self.read.lock();
fun(&mut lock)
}
}

Expand All @@ -71,3 +123,12 @@ impl<C> fmt::Debug for DbConnectionPrivate<C> {
.finish()
}
}

pub struct TransactionGuard {
in_transaction: Arc<AtomicBool>,
}
impl Drop for TransactionGuard {
fn drop(&mut self) {
self.in_transaction.store(false, Ordering::SeqCst);
}
}
Loading
Loading