Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: Reset connection when it's returned to a pool #1100

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings/c/include/libsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ void libsql_close(libsql_database_t db);

int libsql_connect(libsql_database_t db, libsql_connection_t *out_conn, const char **out_err_msg);

int libsql_reset(libsql_connection_t conn, const char **out_err_msg);

void libsql_disconnect(libsql_connection_t conn);

int libsql_prepare(libsql_connection_t conn, const char *sql, libsql_stmt_t *out_stmt, const char **out_err_msg);
Expand Down
18 changes: 16 additions & 2 deletions bindings/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,20 @@ pub unsafe extern "C" fn libsql_connect(
0
}

#[no_mangle]
pub unsafe extern "C" fn libsql_reset(
conn: libsql_connection_t,
out_err_msg: *mut *const std::ffi::c_char,
) -> std::ffi::c_int {
if conn.is_null() {
set_err_msg("Null connection".to_string(), out_err_msg);
return 1;
}
let conn = conn.get_ref();
RT.block_on(conn.reset());
0
}

#[no_mangle]
pub unsafe extern "C" fn libsql_disconnect(conn: libsql_connection_t) {
if conn.is_null() {
Expand Down Expand Up @@ -564,13 +578,13 @@ pub unsafe extern "C" fn libsql_column_type(
#[no_mangle]
pub unsafe extern "C" fn libsql_changes(conn: libsql_connection_t) -> u64 {
let conn = conn.get_ref();
conn.changes()
RT.block_on(conn.changes())
}

#[no_mangle]
pub unsafe extern "C" fn libsql_last_insert_rowid(conn: libsql_connection_t) -> i64 {
let conn = conn.get_ref();
conn.last_insert_rowid()
RT.block_on(conn.last_insert_rowid())
}

#[no_mangle]
Expand Down
6 changes: 3 additions & 3 deletions libsql-server/tests/hrana/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,17 @@ fn affected_rows_and_last_rowid() {

let r = conn.execute("insert into t(x) values('a');", ()).await?;
assert_eq!(r, 1, "1st row inserted");
assert_eq!(conn.last_insert_rowid(), 1, "1st row id");
assert_eq!(conn.last_insert_rowid().await, 1, "1st row id");

let r = conn
.execute("insert into t(x) values('b'),('c');", ())
.await?;
assert_eq!(r, 2, "2nd and 3rd rows inserted");
assert_eq!(conn.last_insert_rowid(), 3, "3rd row id");
assert_eq!(conn.last_insert_rowid().await, 3, "3rd row id");

let r = conn.execute("update t set x = 'd';", ()).await?;
assert_eq!(r, 3, "all three rows updated");
assert_eq!(conn.last_insert_rowid(), 3, "last row id unchanged");
assert_eq!(conn.last_insert_rowid().await, 3, "last row id unchanged");

Ok(())
});
Expand Down
12 changes: 6 additions & 6 deletions libsql-server/tests/standalone/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,26 +336,26 @@ fn is_autocommit() {
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;

assert!(conn.is_autocommit());
assert!(conn.is_autocommit().await);
conn.execute("create table test (x)", ()).await?;

conn.execute("begin;", ()).await?;
assert!(!conn.is_autocommit());
assert!(!conn.is_autocommit().await);
conn.execute("insert into test values (12);", ()).await?;
conn.execute("commit;", ()).await?;
assert!(conn.is_autocommit());
assert!(conn.is_autocommit().await);

// make an explicit transaction
{
let tx = conn.transaction().await?;
assert!(!tx.is_autocommit());
assert!(conn.is_autocommit()); // connection is still autocommit
assert!(!tx.is_autocommit().await);
assert!(conn.is_autocommit().await); // connection is still autocommit

tx.execute("insert into test values (12);", ()).await?;
// transaction rolls back
}

assert!(conn.is_autocommit());
assert!(conn.is_autocommit().await);

let mut rows = conn.query("select count(*) from test", ()).await?;
assert_eq!(
Expand Down
24 changes: 15 additions & 9 deletions libsql/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ pub(crate) trait Conn {

async fn transaction(&self, tx_behavior: TransactionBehavior) -> Result<Transaction>;

fn is_autocommit(&self) -> bool;
async fn is_autocommit(&self) -> bool;

fn changes(&self) -> u64;
async fn changes(&self) -> u64;

fn last_insert_rowid(&self) -> i64;
async fn last_insert_rowid(&self) -> i64;

async fn reset(&self);
}

/// A connection to some libsql database, this can be a remote one or a local one.
Expand Down Expand Up @@ -98,17 +100,21 @@ impl Connection {
}

/// Check weather libsql is in `autocommit` or not.
pub fn is_autocommit(&self) -> bool {
self.conn.is_autocommit()
pub async fn is_autocommit(&self) -> bool {
self.conn.is_autocommit().await
}

/// Check the amount of changes the last query created.
pub fn changes(&self) -> u64 {
self.conn.changes()
pub async fn changes(&self) -> u64 {
self.conn.changes().await
}

/// Check the last inserted row id.
pub fn last_insert_rowid(&self) -> i64 {
self.conn.last_insert_rowid()
pub async fn last_insert_rowid(&self) -> i64 {
self.conn.last_insert_rowid().await
}

pub async fn reset(&self) {
self.conn.reset().await
}
}
20 changes: 14 additions & 6 deletions libsql/src/hrana/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,21 @@ impl Conn for HttpConnection<HttpSender> {
})
}

fn is_autocommit(&self) -> bool {
async fn is_autocommit(&self) -> bool {
self.is_autocommit()
}

fn changes(&self) -> u64 {
async fn changes(&self) -> u64 {
self.affected_row_count()
}

fn last_insert_rowid(&self) -> i64 {
async fn last_insert_rowid(&self) -> i64 {
self.last_insert_rowid()
}

async fn reset(&self) {
self.current_stream().reset().await;
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -273,15 +277,19 @@ impl Conn for HranaStream<HttpSender> {
todo!("sounds like nested transactions innit?")
}

fn is_autocommit(&self) -> bool {
async fn is_autocommit(&self) -> bool {
false // for streams this method is callable only when we're within explicit transaction
}

fn changes(&self) -> u64 {
async fn changes(&self) -> u64 {
self.affected_row_count()
}

fn last_insert_rowid(&self) -> i64 {
async fn last_insert_rowid(&self) -> i64 {
self.last_insert_rowid()
}

async fn reset(&self) {
self.reset().await;
}
}
4 changes: 4 additions & 0 deletions libsql/src/hrana/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ where
pub fn is_autocommit(&self) -> bool {
self.inner.is_autocommit.load(Ordering::SeqCst)
}

pub async fn reset(&self) {
(*self.inner).stream.lock().await.reset();
}
}

#[derive(Debug)]
Expand Down
8 changes: 5 additions & 3 deletions libsql/src/local/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,19 @@ impl Conn for LibsqlConnection {
})
}

fn is_autocommit(&self) -> bool {
async fn is_autocommit(&self) -> bool {
self.conn.is_autocommit()
}

fn changes(&self) -> u64 {
async fn changes(&self) -> u64 {
self.conn.changes()
}

fn last_insert_rowid(&self) -> i64 {
async fn last_insert_rowid(&self) -> i64 {
self.conn.last_insert_rowid()
}

async fn reset(&self) {}
}

impl Drop for LibsqlConnection {
Expand Down
10 changes: 6 additions & 4 deletions libsql/src/replication/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl RemoteConnection {
// and will return false if no rollback happened and the
// execute was valid.
pub(self) async fn maybe_execute_rollback(&self) -> Result<bool> {
if self.inner.lock().state != State::TxnReadOnly && !self.local.is_autocommit() {
if self.inner.lock().state != State::TxnReadOnly && !self.local.is_autocommit().await {
self.local.execute("ROLLBACK", Params::None).await?;
Ok(true)
} else {
Expand Down Expand Up @@ -332,17 +332,19 @@ impl Conn for RemoteConnection {
})
}

fn is_autocommit(&self) -> bool {
async fn is_autocommit(&self) -> bool {
self.is_state_init()
}

fn changes(&self) -> u64 {
async fn changes(&self) -> u64 {
self.inner.lock().changes
}

fn last_insert_rowid(&self) -> i64 {
async fn last_insert_rowid(&self) -> i64 {
self.inner.lock().last_insert_rowid
}

async fn reset(&self) {}
}

pub struct ColumnMeta {
Expand Down
Loading