Skip to content

Commit

Permalink
fix: build fix for Kafka (#1079)
Browse files Browse the repository at this point in the history
Signed-off-by: parmesant <[email protected]>
Co-authored-by: Nikhil Sinha <[email protected]>
Co-authored-by: Devdutt Shenoi <[email protected]>
  • Loading branch information
3 people authored Jan 8, 2025
1 parent 8a9448d commit ddef8ee
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 16 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ once_cell = "1.17.1"
opentelemetry-proto = {git = "https://github.com/parseablehq/opentelemetry-rust", branch="fix-metrics-u64-serialization"}
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Expand Down Expand Up @@ -136,3 +135,7 @@ debug = []
inherits = "release"
lto = "fat"
codegen-units = 1

# adding rdkafka here because, for unsupported platforms, cargo skips other deps which come after this
[target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies]
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }
30 changes: 18 additions & 12 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ use std::path::PathBuf;
use url::Url;

use crate::{
kafka::SslProtocol,
oidc::{self, OpenidConfig},
option::{validation, Compression, Mode},
};

#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
use crate::kafka::SslProtocol as KafkaSslProtocol;

#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))]
use std::string::String as KafkaSslProtocol;

#[derive(Debug, Default)]
pub struct Cli {
/// The location of TLS Cert file
Expand Down Expand Up @@ -107,7 +112,7 @@ pub struct Cli {
pub kafka_host: Option<String>,
pub kafka_group: Option<String>,
pub kafka_client_id: Option<String>,
pub kafka_security_protocol: Option<SslProtocol>,
pub kafka_security_protocol: Option<KafkaSslProtocol>,
pub kafka_partitions: Option<String>,

// Audit Logging env vars
Expand Down Expand Up @@ -502,16 +507,17 @@ impl FromArgMatches for Cli {
}

fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
self.kafka_topics = m.get_one::<String>(Self::KAFKA_TOPICS).cloned();
self.kafka_security_protocol = m
.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
.cloned();
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
self.kafka_security_protocol = m
.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
.cloned();
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
{
self.kafka_topics = m.get_one::<String>(Self::KAFKA_TOPICS).cloned();
self.kafka_security_protocol = m
.get_one::<KafkaSslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
.cloned();
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();
}

self.audit_logger = m.get_one::<Url>(Self::AUDIT_LOGGER).cloned();
self.audit_username = m.get_one::<String>(Self::AUDIT_USERNAME).cloned();
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub async fn handle_signals(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()
{
tokio::select! {
_ = ctrl_c() => {
log::info!("Received SIGINT signal at Readiness Probe Handler");
info!("Received SIGINT signal at Readiness Probe Handler");
shutdown(shutdown_signal).await;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod correlation;
mod event;
pub mod handlers;
pub mod hottier;
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
pub mod kafka;
mod livetail;
mod metadata;
Expand Down
6 changes: 5 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
*/

use parseable::{
banner, kafka,
banner,
option::{Mode, CONFIG},
rbac, storage, IngestServer, ParseableServer, QueryServer, Server,
};
use tracing_subscriber::EnvFilter;

#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
use parseable::kafka;

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
Expand All @@ -46,6 +49,7 @@ async fn main() -> anyhow::Result<()> {
// keep metadata info in mem
metadata.set_global();

#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
// load kafka server
if CONFIG.parseable.mode != Mode::Query {
tokio::task::spawn(kafka::setup_integration());
Expand Down
4 changes: 3 additions & 1 deletion src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,9 @@ impl StandardTableProvider {
#[cfg(windows)]
{
if CONFIG.storage_name.eq("drive") {
file_path = object_store::path::Path::from_absolute_path(file_path).unwrap();
file_path = object_store::path::Path::from_absolute_path(file_path)
.unwrap()
.to_string();
}
}
let pf = PartitionedFile::new(file_path, file.file_size);
Expand Down

0 comments on commit ddef8ee

Please sign in to comment.