Skip to content

Commit

Permalink
Surface error messages in Python client
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Apr 9, 2024
1 parent 2895836 commit 00ed155
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 19 deletions.
2 changes: 1 addition & 1 deletion locustdb-compression-utils/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 54 additions & 15 deletions src/logging_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use std::mem;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::fmt;

use reqwest::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::time::{self, MissedTickBehavior};
use tokio_util::sync::CancellationToken;

use crate::server::{ColumnNameRequest, EncodingOpts, MultiQueryRequest, QueryResponse};
use locustdb_compression_utils::column;
use crate::server::{EncodingOpts, MultiQueryRequest, QueryResponse, ColumnNameRequest};

#[derive(Default, Serialize, Deserialize, Clone)]
pub struct EventBuffer {
Expand Down Expand Up @@ -67,6 +68,13 @@ struct BackgroundWorker {
request_data: Arc<Mutex<Option<Vec<u8>>>>,
}

#[derive(Debug)]
pub enum Error {
Reqwest(reqwest::Error),
Bincode(bincode::Error),
Client { status_code: u16, msg: String },
}

impl LoggingClient {
pub fn new(
flush_interval: Duration,
Expand Down Expand Up @@ -103,10 +111,7 @@ impl LoggingClient {
}
}

pub async fn multi_query(
&self,
queries: Vec<String>,
) -> Result<Vec<QueryResponse>, reqwest::Error> {
pub async fn multi_query(&self, queries: Vec<String>) -> Result<Vec<QueryResponse>, Error> {
let request_body = MultiQueryRequest {
queries,
encoding_opts: Some(EncodingOpts {
Expand All @@ -115,17 +120,19 @@ impl LoggingClient {
full_precision_cols: Default::default(),
}),
};
let bytes = self
let response = self
.query_client
.post(&self.query_url)
.header(CONTENT_TYPE, "application/json")
.json(&request_body)
.send()
.await?
.error_for_status()?
.bytes()
.await?
.to_vec();
.await?;
if response.status().is_client_error() {
let status_code = response.status().as_u16();
let msg = response.text().await?;
return Err(Error::Client { status_code, msg });
}
let bytes = response.bytes().await?.to_vec();
let mut rsps: Vec<QueryResponse> = bincode::deserialize(&bytes).unwrap();
rsps.iter_mut().for_each(|rsp| {
rsp.columns.iter_mut().for_each(|(_, col)| {
Expand Down Expand Up @@ -175,7 +182,11 @@ impl LoggingClient {
self.total_events += 1;
}

pub async fn columns(&self, table: String, pattern: Option<String>) -> Result<ColumnNameResponse, reqwest::Error> {
pub async fn columns(
&self,
table: String,
pattern: Option<String>,
) -> Result<ColumnNameResponse, Error> {
let request_body = ColumnNameRequest {
tables: vec![table],
pattern,
Expand All @@ -188,9 +199,13 @@ impl LoggingClient {
.header(CONTENT_TYPE, "application/json")
.json(&request_body)
.send()
.await
.unwrap();
response.json::<ColumnNameResponse>().await
.await?;
if response.status().is_client_error() {
let status_code = response.status().as_u16();
let msg = response.text().await?;
return Err(Error::Client { status_code, msg });
}
Ok(response.json::<ColumnNameResponse>().await?)
}
}

Expand Down Expand Up @@ -341,3 +356,27 @@ impl Default for ColumnData {
ColumnData::Dense(Vec::new())
}
}

impl From<reqwest::Error> for Error {
fn from(err: reqwest::Error) -> Self {
Error::Reqwest(err)
}
}

impl From<bincode::Error> for Error {
fn from(err: bincode::Error) -> Self {
Error::Bincode(err)
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::Reqwest(err) => write!(f, "Reqwest error: {}", err),
Error::Bincode(err) => write!(f, "Bincode error: {}", err),
Error::Client { status_code, msg } => {
write!(f, "Client error ({}): {}", status_code, msg)
}
}
}
}
6 changes: 3 additions & 3 deletions src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Client {
fn multi_query(&self, py: Python, queries: Vec<String>) -> PyResult<PyObject> {
let results = RT
.block_on(self.client.multi_query(queries))
.map_err(|e| PyErr::new::<PyException, _>(format!("{:?}", e)))?;
.map_err(|e| PyErr::new::<PyException, _>(format!("{}", e)))?;
let py_result = PyList::new_bound(
py,
results.into_iter().map(|result| {
Expand All @@ -70,7 +70,7 @@ impl Client {
fn query(&self, py: Python, query: String) -> PyResult<PyObject> {
let result = RT
.block_on(self.client.multi_query(vec![query]))
.map_err(|e| PyErr::new::<PyException, _>(format!("{:?}", e)))?;
.map_err(|e| PyErr::new::<PyException, _>(format!("{}", e)))?;
assert_eq!(result.len(), 1);
let columns = PyDict::new_bound(py);
for (key, value) in result.into_iter().next().unwrap().columns {
Expand All @@ -83,7 +83,7 @@ impl Client {
fn columns(&self, py: Python, table: String, pattern: Option<String>) -> PyResult<PyObject> {
let response = RT
.block_on(self.client.columns(table, pattern))
.map_err(|e| PyErr::new::<PyException, _>(format!("{:?}", e)))?;
.map_err(|e| PyErr::new::<PyException, _>(format!("{}", e)))?;
Ok(response.columns.into_py(py))
}
}
Expand Down

0 comments on commit 00ed155

Please sign in to comment.