Skip to content

Commit

Permalink
feat: cargo clippy and allow lint clippy:to_string_trait_impl (#737)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuliquan authored Sep 10, 2024
1 parent fa7bc5e commit 18283b8
Show file tree
Hide file tree
Showing 47 changed files with 165 additions and 177 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ jobs:
run: |
cd webui
pnpm build
- name: Run Clippy
run: cargo clippy --all-targets --workspace -- -D warnings
- name: Build
run: cargo build --all-features
- name: Test
Expand Down
16 changes: 7 additions & 9 deletions crates/arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,10 @@ pub(crate) async fn get_all_connection_tables(
let vec: Vec<ConnectionTable> = tables
.into_iter()
.map(|t| t.try_into())
.map(|result| {
if let Err(err) = &result {
.inspect(|result| {
if let Err(err) = result {
debug!("Error building connection table: {}", err);
}
result
})
.filter_map(Result::ok)
.collect();
Expand Down Expand Up @@ -414,11 +413,10 @@ pub(crate) async fn get_connection_tables(
let tables: Vec<ConnectionTable> = tables
.into_iter()
.map(|t| t.try_into())
.map(|result| {
if let Err(err) = &result {
.inspect(|result| {
if let Err(err) = result {
debug!("Error building connection table: {}", err);
}
result
})
.filter_map(Result::ok)
.collect();
Expand Down Expand Up @@ -580,10 +578,10 @@ async fn expand_proto_schema(
.into_iter()
.map(|(name, s)| {
if s.schema_type != ConfluentSchemaType::Protobuf {
return Err(bad_request(format!(
Err(bad_request(format!(
"Schema reference {} has type {:?}, but must be protobuf",
name, s.schema_type
)));
)))
} else {
Ok((name, s.schema))
}
Expand Down Expand Up @@ -818,7 +816,7 @@ pub(crate) async fn test_schema(

match schema_def {
SchemaDefinition::JsonSchema(schema) => {
if let Err(e) = json::schema::to_arrow("test", &schema) {
if let Err(e) = json::schema::to_arrow("test", schema) {
Err(bad_request(e.to_string()))
} else {
Ok(())
Expand Down
5 changes: 3 additions & 2 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ async fn register_schemas(compiled_sql: &mut CompiledSql) -> anyhow::Result<()>
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub(crate) async fn create_pipeline_int<'a>(
name: String,
query: String,
Expand Down Expand Up @@ -388,7 +389,7 @@ pub(crate) async fn create_pipeline_int<'a>(
api_queries::fetch_get_pipeline_id(&db.client().await?, &pub_id, &auth.organization_id)
.await
.map_err(log_and_map)?
.get(0)
.first()
.unwrap()
.id;

Expand All @@ -410,7 +411,7 @@ pub(crate) async fn create_pipeline_int<'a>(
checkpoint_interval,
is_preview,
&auth,
&db,
db,
)
.await?;

Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-api/src/rest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ pub enum ApiError {

pub fn map_insert_err(name: &str, error: DbError) -> ErrorResp {
if error == DbError::DuplicateViolation {
return bad_request(format!("{} with that name already exists", name));
bad_request(format!("{} with that name already exists", name))
} else {
error.into()
}
}

pub fn map_delete_err(name: &str, user: &str, error: DbError) -> ErrorResp {
if error == DbError::ForeignKeyViolation {
return bad_request(format!(
bad_request(format!(
"Cannot delete {}; it is still being used by {}",
name, user
));
))
} else {
error.into()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-compiler-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn start_service() -> anyhow::Result<()> {

wrap_start(
"compiler service",
addr.clone(),
addr,
arroyo_server_common::grpc_server()
.add_service(CompilerGrpcServer::new(service))
.serve(addr),
Expand Down
3 changes: 3 additions & 0 deletions crates/arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ name = "arroyo-connectors"
version = "0.12.0-dev"
edition = "2021"

[lints.clippy]
to_string_trait_impl = "allow"

[features]
default = []

Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Connector for FileSystemConnector {
let message = TestSourceMessage {
error: failed,
done: true,
message: message,
message,
};
tx.send(message).await.unwrap();
});
Expand Down
10 changes: 5 additions & 5 deletions crates/arroyo-connectors/src/filesystem/sink/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ pub(crate) async fn commit_files_to_delta(
return Ok(None);
}

let add_actions = create_add_actions(&finished_files, &relative_table_path)?;
let table_path = build_table_path(&storage_provider, &relative_table_path);
let storage_options = configure_storage_options(&table_path, &storage_provider).await?;
let add_actions = create_add_actions(finished_files, relative_table_path)?;
let table_path = build_table_path(storage_provider, relative_table_path);
let storage_options = configure_storage_options(&table_path, storage_provider).await?;
let mut table = load_or_create_table(&table_path, storage_options, &schema).await?;

if let Some(new_version) = check_existing_files(
&mut table,
last_version,
&finished_files,
&relative_table_path,
finished_files,
relative_table_path,
)
.await?
{
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/filesystem/sink/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl<V: LocalWriter + Send + 'static> TwoPhaseCommitter for LocalFileSystemWrite
);
tokio::fs::rename(tmp_file, destination).await?;
finished_files.push(FinishedFile {
filename: object_store::path::Path::parse(&destination.to_string_lossy())?
filename: object_store::path::Path::parse(destination.to_string_lossy())?
.to_string(),
partition: None,
size: destination.metadata()?.len() as usize,
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/impulse/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl ImpulseSourceFunc {
return 8192;
}
let batch_size = Duration::from_millis(100).as_micros() / duration_micros;
batch_size.max(1).min(8192) as usize
batch_size.clamp(1, 8192) as usize
}

fn delay(&self, ctx: &mut ArrowContext) -> Duration {
Expand Down
3 changes: 2 additions & 1 deletion crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import_types!(
{type = "string", format = "var-str"} = VarStr
}
);

import_types!(schema = "src/kafka/table.json");

impl KafkaTable {
Expand Down Expand Up @@ -421,7 +422,7 @@ impl KafkaTester {
client_config
.set(
"bootstrap.servers",
&self.connection.bootstrap_servers.to_string(),
self.connection.bootstrap_servers.to_string(),
)
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
Expand Down
12 changes: 5 additions & 7 deletions crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl KafkaSinkFunc {
rec = rec.timestamp(ts);
}
if let Some(k) = k.as_ref() {
rec = rec.key(&k);
rec = rec.key(k);
}

rec.payload(&v)
Expand Down Expand Up @@ -260,15 +260,13 @@ impl ArrowOperator for KafkaSinkFunc {

for (i, v) in values.enumerate() {
// kafka timestamp as unix millis
let timestamp = if let Some(ts) = timestamps {
Some(if ts.is_null(i) {
let timestamp = timestamps.map(|ts| {
if ts.is_null(i) {
0
} else {
ts.value(i) / 1_000_000
})
} else {
None
};
}
});
// TODO: this copy should be unnecessary but likely needs a custom trait impl
let key = keys.map(|k| k.value(i).as_bytes().to_vec());
self.publish(timestamp, key, v, ctx).await;
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import_types!(
{type = "string", format = "var-str"} = VarStr
}
);

import_types!(schema = "src/mqtt/table.json");
pub struct MqttConnector {}

Expand Down
38 changes: 19 additions & 19 deletions crates/arroyo-connectors/src/nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import_types!(
{type = "string", format = "var-str"} = VarStr
}
);

import_types!(schema = "src/nats/table.json");

#[derive(Clone, Debug, Encode, Decode, PartialEq, PartialOrd)]
Expand All @@ -48,7 +49,7 @@ impl NatsConnector {
) -> anyhow::Result<NatsConfig> {
let nats_servers = VarStr::new(pull_opt("servers", options)?);
let nats_auth = options.remove("auth.type");
let nats_auth: NatsConfigAuthentication = match nats_auth.as_ref().map(|t| t.as_str()) {
let nats_auth: NatsConfigAuthentication = match nats_auth.as_deref() {
Some("none") | None => NatsConfigAuthentication::None {},
Some("credentials") => NatsConfigAuthentication::Credentials {
username: VarStr::new(pull_opt("auth.username", options)?),
Expand Down Expand Up @@ -78,71 +79,70 @@ impl NatsConnector {
.remove("consumer.ack_policy")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| AcknowledgmentPolicy::Explicit),
.unwrap_or(AcknowledgmentPolicy::Explicit),
replay_policy: options
.remove("consumer.replay_policy")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| ReplayPolicy::Instant),
.unwrap_or(ReplayPolicy::Instant),
ack_wait: options
.remove("consumer.ack_wait")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 30),
filter_subjects: options.remove("consumer.filter_subjects").map_or_else(
|| Vec::new(),
|s| s.split(',').map(String::from).collect(),
),
.unwrap_or(30),
filter_subjects: options
.remove("consumer.filter_subjects")
.map_or_else(Vec::new, |s| s.split(',').map(String::from).collect()),
sample_frequency: options
.remove("consumer.sample_frequency")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 0),
.unwrap_or(0),
num_replicas: options
.remove("consumer.num_replicas")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 1),
.unwrap_or(1),
inactive_threshold: options
.remove("consumer.inactive_threshold")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 600),
.unwrap_or(600),
rate_limit: options
.remove("consumer.rate_limit")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| -1),
.unwrap_or(-1),
max_ack_pending: options
.remove("consumer.max_ack_pending")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| -1),
.unwrap_or(-1),
max_deliver: options
.remove("consumer.max_deliver")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| -1),
.unwrap_or(-1),
max_waiting: options
.remove("consumer.max_waiting")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 1000000),
.unwrap_or(1000000),
max_batch: options
.remove("consumer.max_batch")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 10000),
.unwrap_or(10000),
max_bytes: options
.remove("consumer.max_bytes")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 104857600),
.unwrap_or(104857600),
max_expires: options
.remove("consumer.max_expires")
.unwrap_or_default()
.parse()
.unwrap_or_else(|_| 300000),
.unwrap_or(300000),
}),
(None, Some(subject)) => Some(SourceType::Core { subject }),
(Some(_), Some(_)) => bail!("Exactly one of `stream` or `subject` must be set"),
Expand Down Expand Up @@ -325,7 +325,7 @@ impl Connector for NatsConnector {

let table = Self::table_from_options(options)?;

Self::from_config(&self, None, name, connection, table, schema)
Self::from_config(self, None, name, connection, table, schema)
}

fn make_operator(
Expand Down
6 changes: 2 additions & 4 deletions crates/arroyo-connectors/src/nats/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ impl ArrowOperator for NatsSinkFunc {
}

async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut ArrowContext) {
let s = match &self.sink_type {
SinkType::Subject(s) => s,
};
let SinkType::Subject(s) = &self.sink_type;
let nats_subject = async_nats::Subject::from(s.clone());
for msg in self.serializer.serialize(&batch) {
let publisher = self
Expand All @@ -97,7 +95,7 @@ impl ArrowOperator for NatsSinkFunc {
})
.await
.expect("Something went wrong, data will never be received.");
panic!("Panicked while processing element: {}", e.to_string());
panic!("Panicked while processing element: {}", e);
}
}
}
Expand Down
Loading

0 comments on commit 18283b8

Please sign in to comment.