Skip to content

Commit

Permalink
Improve CLI (#136)
Browse files Browse the repository at this point in the history
* Improve CLI

* Fix doc-tests for rust sdk

* Format hyvectl files

* Apply review suggestion

Co-authored-by: Linus Meierhöfer <[email protected]>

---------

Co-authored-by: Linus Meierhöfer <[email protected]>
  • Loading branch information
Zollerboy1 and Lxixnxuxs authored Jan 14, 2025
1 parent 8f87756 commit d39223f
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 92 deletions.
6 changes: 4 additions & 2 deletions crates/core/src/req_resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ impl From<Response> for grpc::Response {
}
}

impl From<Response> for Result<Vec<u8>, ResponseError> {
fn from(response: Response) -> Self {
impl TryFrom<Response> for Vec<u8> {
type Error = ResponseError;

fn try_from(response: Response) -> Result<Self, Self::Error> {
match response {
Response::Data(data) => Ok(data),
Response::Error(e) => Err(e),
Expand Down
6 changes: 5 additions & 1 deletion crates/hyvectl-commands/src/families/reqres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ pub enum ReqRes {
},

/// Retrieve a stream of messages from peers
Receive {},
Receive {
/// Topic to listen for
#[arg(long)]
topic: Option<String>,
},

/// Respond to messages from peers
Respond {
Expand Down
6 changes: 3 additions & 3 deletions crates/hyvectl/src/families/file.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::{stream::BoxStream, StreamExt};
use futures::{stream::BoxStream, TryStreamExt as _};
use hyvectl_commands::families::file::File;
use hyveos_core::file_transfer::{Cid, DownloadEvent};
use hyveos_sdk::Connection;
Expand Down Expand Up @@ -36,8 +36,8 @@ impl CommandFamily for File {

yield CommandOutput::message("Starting Download...");

while let Some(event) = download_stream.next().await {
match event? {
while let Some(event) = download_stream.try_next().await? {
match event {
DownloadEvent::Progress(p) => {
yield CommandOutput::progress(p)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/hyvectl/src/families/hyve.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::{stream::BoxStream, StreamExt};
use futures::stream::BoxStream;
use hyvectl_commands::families::hyve::Hyve;
use hyveos_sdk::{services::ScriptingConfig, Connection, PeerId};
use ulid::Ulid;
Expand Down
51 changes: 20 additions & 31 deletions crates/hyvectl/src/families/inspect.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use futures::{stream::BoxStream, StreamExt};
use futures::{stream::BoxStream, TryStreamExt as _};
use hyvectl_commands::families::inspect::Inspect;
use hyveos_core::{
debug::{MessageDebugEvent, MessageDebugEventType},
discovery::NeighbourEvent,
req_resp::Response,
};
use hyveos_sdk::Connection;

Expand All @@ -18,9 +17,9 @@ impl TryFrom<MessageDebugEvent> for CommandOutput {
type Error = HyveCtlError;

fn try_from(event: MessageDebugEvent) -> Result<Self, Self::Error> {
let mut out = CommandOutput::result();
let out = CommandOutput::result();

out = match event.event {
Ok(match event.event {
MessageDebugEventType::Request(req) => out
.with_field("service", "req-res/request".to_string())
.with_field("receiver", req.receiver.to_string())
Expand All @@ -29,28 +28,19 @@ impl TryFrom<MessageDebugEvent> for CommandOutput {
.with_field("data", String::from_utf8(req.msg.data)?)
.with_tty_template("💬 {{ receiver: {receiver}, id: {id}, data: {data} }}")
.with_non_tty_template("{service},{receiver},{id},{topic},{data}"),
MessageDebugEventType::Response(res) => {
out = out
.with_field("service", "resp-res/response".to_string())
.with_field("id", res.req_id.to_string());

match res.response {
Response::Data(data) => out
.with_field("data", String::from_utf8(data)?)
.with_tty_template("🗨️ {{ id: {id}, data: {data} }}")
.with_non_tty_template("{service},{id},{data}"),
Response::Error(e) => Err(e)?,
}
}
MessageDebugEventType::Response(res) => out
.with_field("service", "resp-res/response".to_string())
.with_field("id", res.req_id.to_string())
.with_field("data", String::from_utf8(res.response.try_into()?)?)
.with_tty_template("🗨️ {{ id: {id}, data: {data} }}")
.with_non_tty_template("{service},{id},{data}"),
MessageDebugEventType::GossipSub(msg) => out
.with_field("service", "pub-sub".to_string())
.with_field("topic", msg.topic.to_string())
.with_field("data", String::from_utf8(msg.data)?)
.with_tty_template("📨 {{ topic: {topic}, data: {data} }}")
.with_non_tty_template("{service},{topic},{data}"),
};

Ok(out)
})
}
}

Expand All @@ -64,23 +54,22 @@ impl CommandFamily for Inspect {
match self {
Inspect::Mesh { .. } => {
boxed_try_stream! {
let mut stream = debug.subscribe_mesh_topology().await?;

yield CommandOutput::spinner("Waiting for Topology Events...", &["◐", "◒", "◑", "◓"]);

while let Some(event) = stream.next().await {
let event = event?;
let mut stream = debug.subscribe_mesh_topology().await?;

while let Some(event) = stream.try_next().await? {
let out = CommandOutput::result()
.with_field("source", event.peer_id.to_string());
.with_field("source", event.peer_id.to_string());

match event.event {
NeighbourEvent::Init(peers) => {
for peer in peers {
yield out.clone().with_field("type", "connected".to_string())
.with_field("peer", peer.to_string())
.with_tty_template("📡 Connected { {peer} } to { {source} }")
.with_non_tty_template("{peer},{source}")
yield out.clone()
.with_field("type", "connected".to_string())
.with_field("peer", peer.to_string())
.with_tty_template("📡 Connected { {peer} } to { {source} }")
.with_non_tty_template("{peer},{source}")
}
},
NeighbourEvent::Discovered(peer) => {
Expand All @@ -105,8 +94,8 @@ impl CommandFamily for Inspect {

yield CommandOutput::spinner("Waiting for Service Events...", &["◐", "◑", "◒", "◓"]);

while let Some(event) = stream.next().await {
yield event?.try_into()?
while let Some(event) = stream.try_next().await? {
yield event.try_into()?
}
}
}
Expand Down
28 changes: 12 additions & 16 deletions crates/hyvectl/src/families/kv.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use futures::{stream::BoxStream, StreamExt};
use futures::{stream::BoxStream, TryStreamExt as _};
use hyvectl_commands::families::kv::Kv;
use hyveos_sdk::Connection;

use crate::{boxed_try_stream, error::HyveCtlResult, out::CommandOutput, util::CommandFamily};

impl CommandFamily for Kv {
async fn run(
self,
Expand All @@ -26,7 +27,7 @@ impl CommandFamily for Kv {
Some(res) => yield CommandOutput::result()
.with_field("topic", topic)
.with_field("key", key)
.with_field("value", String::from_utf8(res)?.into())
.with_field("value", String::from_utf8(res)?)
.with_tty_template(template)
.with_non_tty_template("{value}"),
None => yield CommandOutput::result()
Expand Down Expand Up @@ -65,7 +66,6 @@ impl CommandFamily for Kv {

let topic = topic.unwrap_or_default();


dht.provide(&topic, key.clone()).await?;

yield CommandOutput::result()
Expand All @@ -79,21 +79,17 @@ impl CommandFamily for Kv {
boxed_try_stream! {
let topic = topic.clone().unwrap_or_default();

let mut providers_stream = dht.get_providers(topic.clone(), key.clone()).await?;

yield CommandOutput::spinner("Fetching Providers...", &["◐", "◑", "◒", "◓"]);

while let Some(event) = providers_stream.next().await {

match event {
Ok(provider) => yield CommandOutput::result()
.with_field("topic", topic.clone())
.with_field("key", key.clone())
.with_field("provider", provider.to_string())
.with_tty_template("🤖 { {provider} }")
.with_non_tty_template("{provider}"),
Err(e) => Err(e)?
}
let mut providers_stream = dht.get_providers(topic.clone(), key.clone()).await?;

while let Some(provider) = providers_stream.try_next().await? {
yield CommandOutput::result()
.with_field("topic", topic.clone())
.with_field("key", key.clone())
.with_field("provider", provider.to_string())
.with_tty_template("🤖 { {provider} }")
.with_non_tty_template("{provider}");
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions crates/hyvectl/src/families/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::{stream::BoxStream, StreamExt};
use futures::{stream::BoxStream, TryStreamExt as _};
use hyvectl_commands::families::pubsub::PubSub;
use hyveos_core::gossipsub::ReceivedMessage;
use hyveos_sdk::Connection;
Expand All @@ -19,12 +19,13 @@ impl TryFrom<ReceivedMessage> for CommandOutput {
.with_field("topic", value.message.topic)
.with_field("message", String::from_utf8(value.message.data)?)
.with_field("message_id", String::from_utf8(value.message_id.0)?)
.with_tty_template("📨 {{ topic: {topic}, message: {message} }}")
.with_tty_template("📨 { topic: {topic}, message: {message} }")
.with_non_tty_template("{topic},{message}");

match value.source {
Some(source) => Ok(output
.with_field("source", source.to_string())
.with_tty_template("📨 { topic: {topic}, message: {message}, source: {source} }")
.with_non_tty_template("{topic},{message},{source}")),
None => Ok(output),
}
Expand Down Expand Up @@ -52,12 +53,12 @@ impl CommandFamily for PubSub {
}
PubSub::Get { topic } => {
boxed_try_stream! {
let mut message_stream = pubsub.subscribe(&topic).await?;

yield CommandOutput::spinner("Waiting for Messages...", &["◐", "◒", "◑", "◓"]);

while let Some(event) = message_stream.next().await {
yield event?.try_into()?
let mut message_stream = pubsub.subscribe(&topic).await?;

while let Some(event) = message_stream.try_next().await? {
yield event.try_into()?
}
}
}
Expand Down
36 changes: 20 additions & 16 deletions crates/hyvectl/src/families/reqres.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use futures::{stream::BoxStream, TryStreamExt as _};
use hyvectl_commands::families::reqres::ReqRes;
use hyveos_core::req_resp::Response;
use hyveos_core::req_resp::{Response, TopicQuery};
use hyveos_sdk::{Connection, PeerId};

use crate::{boxed_try_stream, error::HyveCtlResult, out::CommandOutput, util::CommandFamily};
Expand All @@ -13,18 +13,22 @@ impl CommandFamily for ReqRes {
let mut reqres = connection.req_resp();

match self {
ReqRes::Receive {} => {
ReqRes::Receive { topic } => {
boxed_try_stream! {
yield CommandOutput::spinner("Waiting for Requests...", &["◐", "◒", "◑", "◓"]);

while let Some(request) = reqres.recv(None).await?.try_next().await? {
let query = topic.map(Into::into).map(TopicQuery::String);

let mut stream = reqres.recv(query).await?;

while let Some(request) = stream.try_next().await? {
yield CommandOutput::result()
.with_field("peer_id", request.0.peer_id.to_string())
.with_field("topic", request.0.topic.unwrap_or_default())
.with_field("data", String::from_utf8(request.0.data)?.into())
.with_field("id", request.1.id().to_string())
.with_tty_template("💬 [ID: {id}] { peer: {peer_id}, topic: {topic}, data: {data} }")
.with_non_tty_template("{id},{peer_id},{topic},{data}");
.with_field("peer_id", request.0.peer_id.to_string())
.with_field("topic", request.0.topic.unwrap_or_default())
.with_field("data", String::from_utf8(request.0.data)?.into())
.with_field("id", request.1.id().to_string())
.with_tty_template("💬 [ID: {id}] { peer: {peer_id}, topic: {topic}, data: {data} }")
.with_non_tty_template("{id},{peer_id},{topic},{data}");
}
}
}
Expand Down Expand Up @@ -53,8 +57,8 @@ impl CommandFamily for ReqRes {
};

yield output
.with_tty_template("🗨 {response}")
.with_non_tty_template("{response}")
.with_tty_template("🗨 {response}")
.with_non_tty_template("{response}")
}
}
ReqRes::Respond {
Expand All @@ -65,10 +69,10 @@ impl CommandFamily for ReqRes {
reqres.respond(id, Response::Data(message.clone().into())).await?;

yield CommandOutput::result()
.with_field("id", id.to_string())
.with_field("response", message)
.with_tty_template("Sent { {response} } for { {id} }")
.with_non_tty_template("{id},{response}")
.with_field("id", id.to_string())
.with_field("response", message)
.with_tty_template("Sent { {response} } for { {id} }")
.with_non_tty_template("{id},{response}")
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/hyvectl/src/families/whoami.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use futures::{stream::BoxStream, StreamExt};
use futures::stream::BoxStream;
use hyvectl_commands::families::whoami::Whoami;
use hyveos_sdk::Connection;

use crate::{boxed_try_stream, error::HyveCtlResult, out::CommandOutput, util::CommandFamily};

impl CommandFamily for Whoami {
async fn run(
self,
Expand Down
18 changes: 8 additions & 10 deletions crates/hyvectl/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
mod color;
mod error;
mod families;
mod out;
mod util;

use std::{
io::{stdout, IsTerminal, Write},
path::PathBuf,
time::Duration,
};

use clap::Parser;
use futures::{stream::BoxStream, StreamExt};
use futures::{stream::BoxStream, TryStreamExt as _};
use hyvectl_commands::command::{Cli, Families};
use hyveos_sdk::Connection;
use indicatif::ProgressStyle;
Expand All @@ -23,6 +17,12 @@ use crate::{
out::CommandOutput,
};

mod color;
mod error;
mod families;
mod out;
mod util;

impl CommandFamily for Families {
async fn run(
self,
Expand Down Expand Up @@ -103,9 +103,7 @@ async fn main() -> miette::Result<()> {
let mut progress_bar = None;
let mut spinner = None;

while let Some(output) = output_stream.next().await {
let command_output = output?;

while let Some(command_output) = output_stream.try_next().await? {
match command_output {
CommandOutput::Progress(p) => {
if is_tty {
Expand Down
4 changes: 3 additions & 1 deletion crates/hyvectl/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub trait CommandFamily {
#[macro_export]
macro_rules! boxed_try_stream {
($($body:tt)*) => {
async_stream::try_stream!{ $($body)* }.boxed()
::futures::stream::StreamExt::boxed(
::async_stream::try_stream!{ $($body)* }
)
}
}
2 changes: 1 addition & 1 deletion sdks/rust/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ impl Connection {
/// .await
/// .unwrap();
///
/// let data = Result::from(response).unwrap();
/// let data = Vec::try_from(response).unwrap();
/// println!("Received response: {}", String::from_utf8(data).unwrap());
/// # }
/// ```
Expand Down
2 changes: 1 addition & 1 deletion sdks/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
//! .await
//! .unwrap();
//!
//! let data = Result::from(response).unwrap();
//! let data = Vec::try_from(response).unwrap();
//! println!("Received response: {}", String::from_utf8(data).unwrap());
//! }
//! ```
Expand Down
Loading

0 comments on commit d39223f

Please sign in to comment.