Skip to content

Commit

Permalink
Merge pull request #1923 from levydsa/main
Browse files Browse the repository at this point in the history
Prevent reporting HttpDispatch errors in `sync_offline`
  • Loading branch information
penberg authored Jan 23, 2025
2 parents 89cc8cd + 111c277 commit e3d2414
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 25 deletions.
30 changes: 27 additions & 3 deletions libsql/src/local/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ impl Connection {
Ok(buf)
}

pub(crate) fn wal_insert_begin(&self) -> Result<()> {
fn wal_insert_begin(&self) -> Result<()> {
let rc = unsafe { libsql_sys::ffi::libsql_wal_insert_begin(self.handle()) };
if rc != 0 {
return Err(crate::errors::Error::SqliteFailure(
Expand All @@ -506,7 +506,7 @@ impl Connection {
Ok(())
}

pub(crate) fn wal_insert_end(&self) -> Result<()> {
fn wal_insert_end(&self) -> Result<()> {
let rc = unsafe { libsql_sys::ffi::libsql_wal_insert_end(self.handle()) };
if rc != 0 {
return Err(crate::errors::Error::SqliteFailure(
Expand All @@ -517,7 +517,7 @@ impl Connection {
Ok(())
}

pub(crate) fn wal_insert_frame(&self, frame: &[u8]) -> Result<()> {
fn wal_insert_frame(&self, frame: &[u8]) -> Result<()> {
let rc = unsafe {
libsql_sys::ffi::libsql_wal_insert_frame(
self.handle(),
Expand All @@ -534,6 +534,30 @@ impl Connection {
}
Ok(())
}

pub(crate) fn wal_insert_handle(&self) -> Result<WalInsertHandle<'_>> {
self.wal_insert_begin()?;
Ok(WalInsertHandle { conn: self })
}
}

pub(crate) struct WalInsertHandle<'a> {
conn: &'a Connection,
}

impl WalInsertHandle<'_> {
pub fn insert(&self, frame: &[u8]) -> Result<()> {
self.conn.wal_insert_frame(frame)
}
}

impl Drop for WalInsertHandle<'_> {
fn drop(&mut self) {
if let Err(err) = self.conn.wal_insert_end() {
tracing::error!("{:?}", err);
Err(err).unwrap()
}
}
}

impl fmt::Debug for Connection {
Expand Down
50 changes: 28 additions & 22 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,7 @@ impl Database {
Err(Error::Sync(err)) => {
// Retry the sync because we are ahead of the server and we need to push some older
// frames.
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) =
err.downcast_ref::<SyncError>()
{
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) = err.downcast_ref() {
tracing::debug!("got InvalidPushFrameNo, retrying push");
self.try_push(&mut sync_ctx, &conn).await
} else {
Expand All @@ -492,6 +490,22 @@ impl Database {
} else {
self.try_pull(&mut sync_ctx, &conn).await
}
.or_else(|err| {
let Error::Sync(err) = err else {
return Err(err);
};

// TODO(levy): upcasting should be done *only* at the API boundary, doing this in
// internal code just sucks.
let Some(SyncError::HttpDispatch(_)) = err.downcast_ref() else {
return Err(Error::Sync(err));
};

Ok(crate::database::Replicated {
frame_no: None,
frames_synced: 0,
})
})
}

#[cfg(feature = "sync")]
Expand Down Expand Up @@ -557,37 +571,29 @@ impl Database {
) -> Result<crate::database::Replicated> {
let generation = sync_ctx.generation();
let mut frame_no = sync_ctx.durable_frame_num() + 1;
conn.wal_insert_begin()?;

let mut err = None;
let insert_handle = conn.wal_insert_handle()?;

loop {
match sync_ctx.pull_one_frame(generation, frame_no).await {
Ok(Some(frame)) => {
conn.wal_insert_frame(&frame)?;
insert_handle.insert(&frame)?;
frame_no += 1;
}
Ok(None) => {
break;
sync_ctx.write_metadata().await?;
return Ok(crate::database::Replicated {
frame_no: None,
frames_synced: 1,
});
}
Err(e) => {
tracing::debug!("pull_one_frame error: {:?}", e);
err.replace(e);
break;
Err(err) => {
tracing::debug!("pull_one_frame error: {:?}", err);
sync_ctx.write_metadata().await?;
return Err(err);
}
}
}
conn.wal_insert_end()?;
sync_ctx.write_metadata().await?;

if let Some(err) = err {
Err(err)
} else {
Ok(crate::database::Replicated {
frame_no: None,
frames_synced: 1,
})
}
}

pub(crate) fn path(&self) -> &str {
Expand Down

0 comments on commit e3d2414

Please sign in to comment.