diff --git a/.github/workflows/kuksa_databroker_build.yml b/.github/workflows/kuksa_databroker_build.yml index 9d204866..7765acab 100644 --- a/.github/workflows/kuksa_databroker_build.yml +++ b/.github/workflows/kuksa_databroker_build.yml @@ -24,6 +24,9 @@ on: QUAY_IO_USERNAME: required: true workflow_dispatch: + # Run every Sunday night to check regressions, for example from clippy + schedule: + - cron: "0 4 * * 0" # suffix to avoid cancellation when running from release workflow concurrency: diff --git a/.gitignore b/.gitignore index d22c11d0..33628b2d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ __pycache__ databroker/thirdparty databroker-cli/thirdparty .venv/ +databroker/databroker_bin.cdx.json +databroker-cli/databroker-cli_bin.cdx.json diff --git a/Cargo.lock b/Cargo.lock index df1ef1ac..cc557db0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -603,7 +603,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "databroker" -version = "0.5.0" +version = "0.5.0-dev.1" dependencies = [ "anyhow", "async-trait", @@ -640,7 +640,7 @@ dependencies = [ [[package]] name = "databroker-cli" -version = "0.5.0" +version = "0.5.0-dev.1" dependencies = [ "ansi_term", "clap", @@ -660,7 +660,7 @@ dependencies = [ [[package]] name = "databroker-proto" -version = "0.5.0" +version = "0.5.0-dev.1" dependencies = [ "prost", "prost-types", @@ -1890,9 +1890,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "jemalloc-sys" @@ -1940,7 +1940,7 @@ dependencies = [ [[package]] name = "kuksa" -version = "0.5.0" +version = "0.5.0-dev.1" dependencies = [ "databroker-proto", "http", @@ -1952,7 +1952,7 @@ dependencies = [ [[package]] name = "kuksa-common" -version = "0.5.0" +version = "0.5.0-dev.1" dependencies = [ "databroker-proto", "http", @@ -1963,7 +1963,7 @@ dependencies = [ [[package]] name = "kuksa-sdv" -version = "0.5.0" +version = "0.5.0-dev.1" dependencies = [ "databroker-proto", "http", @@ -2004,9 +2004,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.164" +version = "0.2.165" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "fcb4d3d38eab6c5239a362fa8bae48c03baf980a6e7079f063942d563ef3533e" [[package]] name = "libredox" @@ -3405,9 +3405,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", ] diff --git a/databroker-cli/Cargo.toml b/databroker-cli/Cargo.toml index 8159edf5..d5f0b6b0 100644 --- a/databroker-cli/Cargo.toml +++ b/databroker-cli/Cargo.toml @@ -13,7 +13,7 @@ [package] name = "databroker-cli" -version = "0.5.0" +version = "0.5.0-dev.1" authors = ["Eclipse KUKSA Project"] edition = "2021" license = "Apache-2.0" diff --git a/databroker-proto/Cargo.toml b/databroker-proto/Cargo.toml index 24754125..dcc0e7c8 100644 --- a/databroker-proto/Cargo.toml +++ b/databroker-proto/Cargo.toml @@ -13,7 +13,7 @@ [package] name = "databroker-proto" -version = "0.5.0" +version = "0.5.0-dev.1" authors = ["Eclipse KUKSA Project"] edition = "2021" license = "Apache-2.0" diff --git a/databroker/Cargo.toml b/databroker/Cargo.toml index 6b13c456..a7d190c2 100644 --- a/databroker/Cargo.toml +++ b/databroker/Cargo.toml @@ -13,7 +13,7 @@ [package] name = "databroker" -version = "0.5.0" +version = "0.5.0-dev.1" authors = ["Eclipse KUKSA Project"] edition = "2021" license = "Apache-2.0" diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 8c777f0e..c0e05788 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -1111,7 +1111,7 @@ pub enum EntryReadAccess<'a> { Err(&'a Metadata, ReadError), } -impl<'a> EntryReadAccess<'a> { +impl EntryReadAccess<'_> { pub fn datapoint(&self) -> Result<&Datapoint, ReadError> { match self { Self::Entry(entry) => Ok(&entry.datapoint), @@ -1151,7 +1151,7 @@ pub struct EntryReadIterator<'a, 'b> { permissions: &'b Permissions, } -impl<'a, 'b> Iterator for EntryReadIterator<'a, 'b> { +impl<'a> Iterator for EntryReadIterator<'a, '_> { type Item = EntryReadAccess<'a>; #[inline] @@ -1167,7 +1167,7 @@ impl<'a, 'b> Iterator for EntryReadIterator<'a, 'b> { } } -impl<'a, 'b> DatabaseReadAccess<'a, 'b> { +impl DatabaseReadAccess<'_, '_> { pub fn get_entry_by_id(&self, id: i32) -> Result<&Entry, ReadError> { match self.db.entries.get(&id) { Some(entry) => match self.permissions.can_read(&entry.metadata.path) { @@ -1203,7 +1203,7 @@ impl<'a, 'b> DatabaseReadAccess<'a, 'b> { } } -impl<'a, 'b> DatabaseWriteAccess<'a, 'b> { +impl DatabaseWriteAccess<'_, '_> { pub fn update_by_path( &mut self, path: &str, @@ -1397,7 +1397,7 @@ impl Database { } } -impl<'a, 'b> query::CompilationInput for DatabaseReadAccess<'a, 'b> { +impl query::CompilationInput for DatabaseReadAccess<'_, '_> { fn get_datapoint_type(&self, path: &str) -> Result { match self.get_metadata_by_path(path) { Some(metadata) => Ok(metadata.data_type.to_owned()), @@ -1670,10 +1670,13 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .await .add_change_subscription(subscription); - let stream = BroadcastStream::new(receiver).filter_map(|result| match result { + let stream = BroadcastStream::new(receiver).filter_map(move |result| match result { Ok(message) => Some(message), Err(err) => { - debug!("Lagged entries: {}", err); + warn!( + "Slow subscriber with capacity {} lagged and missed signal updates: {}", + channel_capacity, err + ); None } }); diff --git a/lib/Cargo.lock b/lib/Cargo.lock index 2a147ff0..b792db35 100644 --- a/lib/Cargo.lock +++ b/lib/Cargo.lock @@ -185,9 +185,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "databroker-examples" +version = "0.1.0" +dependencies = [ + "kuksa", + "kuksa-common", + "kuksa-sdv", + "tokio", + "tokio-stream", +] + [[package]] name = "databroker-proto" -version = "0.5.0" +version = "0.5.0-dev.1" dependencies = [ "prost", "prost-types", @@ -436,13 +447,13 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "kuksa" -version = "0.5.0" +version = "0.5.0-dev.1" dependencies = [ "databroker-proto", "http", @@ -454,7 +465,7 @@ dependencies = [ [[package]] name = "kuksa-common" -version = "0.5.0" +version = "0.5.0-dev.1" dependencies = [ "databroker-proto", "http", @@ -465,7 +476,7 @@ dependencies = [ [[package]] name = "kuksa-sdv" -version = "0.5.0" +version = "0.5.0-dev.1" dependencies = [ "databroker-proto", "http", @@ -477,9 +488,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.164" +version = "0.2.165" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "fcb4d3d38eab6c5239a362fa8bae48c03baf980a6e7079f063942d563ef3533e" [[package]] name = "linux-raw-sys" @@ -487,6 +498,16 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.22" @@ -553,6 +574,29 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -730,6 +774,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "regex" version = "1.11.1" @@ -839,6 +892,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.215" @@ -865,6 +924,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -874,6 +942,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + [[package]] name = "socket2" version = "0.5.7" @@ -936,7 +1010,9 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -1098,9 +1174,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", ] diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 0aa82b4c..0f83dbca 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -4,7 +4,8 @@ resolver = "2" members = [ "common", "kuksa", - "sdv" + "sdv", + "databroker-examples" ] [workspace.dependencies] diff --git a/lib/common/Cargo.toml b/lib/common/Cargo.toml index 47fb915b..45eeb90c 100644 --- a/lib/common/Cargo.toml +++ b/lib/common/Cargo.toml @@ -13,7 +13,7 @@ [package] name = "kuksa-common" -version = "0.5.0" +version = "0.5.0-dev.1" authors = ["Eclipse KUKSA Project"] edition = "2021" license = "Apache-2.0" diff --git a/lib/databroker-examples/Cargo.toml b/lib/databroker-examples/Cargo.toml new file mode 100644 index 00000000..fef04772 --- /dev/null +++ b/lib/databroker-examples/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "databroker-examples" +version = "0.1.0" +edition = "2021" + +[dependencies] +kuksa-common = { path = "../common"} +kuksa = { path = "../kuksa"} +kuksa-sdv = { path = "../sdv"} +tokio = {version = "1.17.0", features = ["full"]} +tokio-stream = "0.1.8" diff --git a/lib/databroker-examples/examples/slow_subscriber.rs b/lib/databroker-examples/examples/slow_subscriber.rs new file mode 100644 index 00000000..2575f834 --- /dev/null +++ b/lib/databroker-examples/examples/slow_subscriber.rs @@ -0,0 +1,49 @@ +/******************************************************************************** +* Copyright (c) 2024 Contributors to the Eclipse Foundation +* +* See the NOTICE file(s) distributed with this work for additional +* information regarding copyright ownership. +* +* This program and the accompanying materials are made available under the +* terms of the Apache License 2.0 which is available at +* http://www.apache.org/licenses/LICENSE-2.0 +* +* SPDX-License-Identifier: Apache-2.0 +********************************************************************************/ + +use kuksa::KuksaClient; +use tokio::time::{sleep, Duration}; +use kuksa_common::to_uri; +use std::thread; + +#[tokio::main] +async fn main() { + + // Paths to subscribe + let paths = vec!["Vehicle.Speed"]; + + // Initialize the KuksaClient + let mut client: KuksaClient = KuksaClient::new(to_uri("127.0.0.1:55555").unwrap()); + + // Subscribe to paths + let mut stream = client.subscribe(paths.clone()).await.unwrap(); + + println!("Subscribed to {:?}", paths); + + loop { + match stream.message().await { + Ok(msg) => { + println!("Got message, will wait 5 seconds: {:?}", msg); + // Simulate slow processing by sleeping + sleep(Duration::from_secs(1)).await; + thread::sleep(Duration::from_secs(5)); + } + Err(e) => { + println!("Error while receiving message: {:?}", e); + break; // Exit loop on error + } + } + } + + println!("Exiting subscriber..."); +} \ No newline at end of file diff --git a/lib/kuksa/Cargo.toml b/lib/kuksa/Cargo.toml index ab73e83e..013f2aa2 100644 --- a/lib/kuksa/Cargo.toml +++ b/lib/kuksa/Cargo.toml @@ -13,7 +13,7 @@ [package] name = "kuksa" -version = "0.5.0" +version = "0.5.0-dev.1" authors = ["Eclipse KUKSA Project"] edition = "2021" license = "Apache-2.0" diff --git a/lib/sdv/Cargo.toml b/lib/sdv/Cargo.toml index e39e6f29..02cacc36 100644 --- a/lib/sdv/Cargo.toml +++ b/lib/sdv/Cargo.toml @@ -13,7 +13,7 @@ [package] name = "kuksa-sdv" -version = "0.5.0" +version = "0.5.0-dev.1" authors = ["Eclipse KUKSA Project"] edition = "2021" license = "Apache-2.0"