From 13f9ecd05e3780618d289b5cdb83bab73a80e317 Mon Sep 17 00:00:00 2001 From: carter Date: Wed, 4 Dec 2024 11:46:01 -0700 Subject: [PATCH] Move to updated version of roslibrust_serde_rosmsg so that we can avoid a lot of byte shuffling --- Cargo.lock | 5 ++- roslibrust/Cargo.toml | 2 +- roslibrust_zenoh/Cargo.toml | 2 +- roslibrust_zenoh/src/lib.rs | 71 +++++++++++++++---------------------- 4 files changed, 33 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8d8de8..ea02e78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2493,14 +2493,13 @@ dependencies = [ [[package]] name = "roslibrust_serde_rosmsg" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee7ea0fc21625dd94a69e126b9f2c1eab4b60dd80a6abf3e0a3e284f9d648586" +checksum = "dfcb4a6411b4969947dd4fcc23488c4fd7e2e3c180c8d167bece0661df5c0fae" dependencies = [ "byteorder", "error-chain", "serde", - "serde_derive", ] [[package]] diff --git a/roslibrust/Cargo.toml b/roslibrust/Cargo.toml index b881043..7bc8db0 100644 --- a/roslibrust/Cargo.toml +++ b/roslibrust/Cargo.toml @@ -38,7 +38,7 @@ roslibrust_codegen_macro = { path = "../roslibrust_codegen_macro", version = "0. roslibrust_codegen = { path = "../roslibrust_codegen", version = "0.11.1" } reqwest = { version = "0.11", optional = true } # Only used with native ros1 serde_xmlrpc = { version = "0.2", optional = true } # Only used with native ros1 -roslibrust_serde_rosmsg = { version = "0.3", optional = true } # Only used with native ros1 +roslibrust_serde_rosmsg = { version = "0.4", optional = true } # Only used with native ros1 hyper = { version = "0.14", features = [ "server", ], optional = true } # Only used with native ros1 diff --git a/roslibrust_zenoh/Cargo.toml b/roslibrust_zenoh/Cargo.toml index 53d1e43..729b2f9 100644 --- a/roslibrust_zenoh/Cargo.toml +++ b/roslibrust_zenoh/Cargo.toml @@ -15,7 +15,7 @@ roslibrust_codegen = { path = "../roslibrust_codegen" } zenoh = "1.0" hex = "0.4" anyhow = "1.0" -roslibrust_serde_rosmsg = "0.3" +roslibrust_serde_rosmsg = "0.4" log = "0.4" [dev-dependencies] diff --git a/roslibrust_zenoh/src/lib.rs b/roslibrust_zenoh/src/lib.rs index 523cdae..64ad1ac 100644 --- a/roslibrust_zenoh/src/lib.rs +++ b/roslibrust_zenoh/src/lib.rs @@ -25,13 +25,11 @@ pub struct ZenohPublisher { impl Publish for ZenohPublisher { async fn publish(&self, data: &T) -> RosLibRustResult<()> { - let bytes = roslibrust_serde_rosmsg::to_vec(data).map_err(|e| { + let bytes = roslibrust_serde_rosmsg::to_vec_skip_length(data).map_err(|e| { RosLibRustError::SerializationError(format!("Failed to serialize message: {e:?}")) })?; - // Note: serde_rosmsg places the length of the message as the first four bytes - // Which zenoh ros1 bridge does not expect, so we need to strip it off. - match self.publisher.put(&bytes[4..]).await { + match self.publisher.put(&bytes).await { Ok(()) => Ok(()), Err(e) => Err(RosLibRustError::Unexpected(anyhow::anyhow!( "Failed to publish message to zenoh: {e:?}" @@ -63,16 +61,12 @@ impl Subscribe for ZenohSubscriber { }; let bytes = sample.payload().to_bytes(); - - // This is messy roslibrust expects the starting bytes to be total message size - // which Zenoh or the bridge is stripping somewhere, so I'm just manually sticking them back for now - // This is very inefficient, but it works for now. - let starting_bytes = (bytes.len() as u32).to_le_bytes(); - let bytes = [&starting_bytes, &bytes[..]].concat(); - - let msg = roslibrust_serde_rosmsg::from_slice(&bytes).map_err(|e| { - RosLibRustError::SerializationError(format!("Failed to deserialize sample: {e:?}")) - })?; + // Note: Zenoh decided to not make the 4 byte length header part of the payload + // So we use the known length version of the deserialization + let msg = roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32) + .map_err(|e| { + RosLibRustError::SerializationError(format!("Failed to deserialize sample: {e:?}")) + })?; Ok(msg) } } @@ -157,7 +151,8 @@ pub struct ZenohServiceClient { impl Service for ZenohServiceClient { async fn call(&self, request: &T::Request) -> RosLibRustResult { - let request_bytes = roslibrust_serde_rosmsg::to_vec(request).map_err(|e| { + // Note: Zenoh decided the 4 byte length header is not part of the payload + let request_bytes = roslibrust_serde_rosmsg::to_vec_skip_length(request).map_err(|e| { RosLibRustError::SerializationError(format!("Failed to serialize message: {e:?}")) })?; debug!("request bytes: {request_bytes:?}"); @@ -165,8 +160,7 @@ impl Service for ZenohServiceClient { let query = match self .session .get(&self.zenoh_query) - .payload(&request_bytes[4..]) - // .timeout(tokio::time::Duration::from_secs(1)) + .payload(&request_bytes) .await { Ok(query) => query, @@ -199,15 +193,11 @@ impl Service for ZenohServiceClient { let bytes = sample.payload().to_bytes(); - // This is messy roslibrust expects the starting bytes to be total message size - // which Zenoh or the bridge is stripping somewhere, so I'm just manually sticking them back for now - // This is very inefficient, but it works for now. - let starting_bytes = (bytes.len() as u32).to_le_bytes(); - let bytes = [&starting_bytes, &bytes[..]].concat(); - - let msg = roslibrust_serde_rosmsg::from_slice(&bytes).map_err(|e| { - RosLibRustError::SerializationError(format!("Failed to deserialize sample: {e:?}")) - })?; + // Note: Zenoh decided to not make the 4 byte length header part of the payload + let msg = roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32) + .map_err(|e| { + RosLibRustError::SerializationError(format!("Failed to deserialize sample: {e:?}")) + })?; Ok(msg) } } @@ -275,15 +265,14 @@ impl ServiceProvider for ZenohClient { }; let bytes = payload.to_bytes(); debug!("Got bytes: {bytes:?}"); - // TODO MAJOR HACK HERE STILL - // Our deserialization still expects the first four bytes to be the total message size - // So we're just going to manually add the bytes back in - let starting_bytes = (bytes.len() as u32).to_le_bytes(); - let bytes = [&starting_bytes, &bytes[..]].concat(); - - let Ok(request) = roslibrust_serde_rosmsg::from_slice(&bytes).map_err(|e| { - error!("Failed to deserialize request: {e:?}"); - }) else { + + // Note: Zenoh decided the 4 byte length header is not part of the payload + let Ok(request) = + roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32) + .map_err(|e| { + error!("Failed to deserialize request: {e:?}"); + }) + else { continue; }; @@ -293,16 +282,14 @@ impl ServiceProvider for ZenohClient { continue; }; - let Ok(response_bytes) = roslibrust_serde_rosmsg::to_vec(&response).map_err(|e| { - error!("Failed to serialize response: {e:?}"); - }) else { + let Ok(response_bytes) = roslibrust_serde_rosmsg::to_vec_skip_length(&response) + .map_err(|e| { + error!("Failed to serialize response: {e:?}"); + }) + else { continue; }; - // TODO HACK HERE STILL - // Zenoh doesn't want the first four bytes that are the overall message size: - let response_bytes = &response_bytes[4..]; - let _ = query .reply(query.key_expr(), response_bytes) .await