Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADD: publish, subscribe #344

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"primitives",
"protocol",
"protocol/examples/reqres",
"protocol/examples/ros",
"rpc",
"runtime/local",
"runtime/alpha",
Expand Down
1 change: 1 addition & 0 deletions bin/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ parity-util-mem = { version = "0.11", default-features = false, features = [
"jemalloc-global",
] }
robonomics-service = { path = "../../service", default-features = false }
tokio = { version = "1.23.0", features = ["full"] }

[features]
default = ["robonomics-service/kusama", "robonomics-service/wasmtime"]
Expand Down
6 changes: 6 additions & 0 deletions bin/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

#![warn(missing_docs)]

#[cfg(not(feature = "discovery"))]
fn main() -> robonomics_service::Result<()> {
robonomics_service::run()
}
#[cfg(feature = "discovery")]
#[tokio::main]
async fn main() -> robonomics_service::Result<()> {
robonomics_service::run().await
}
5 changes: 2 additions & 3 deletions protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ rand = "0.8.4"
rust-base58 = "0.0.4"
chrono = "0.4"
libp2p = { version = "0.49", features = [
"identify",
"gossipsub",
"kad",
"mdns",
Expand All @@ -38,6 +39,4 @@ libp2p = { version = "0.49", features = [
"tokio",
"rsa",
] }

tokio = { version = "1", features = ["full"] }

tokio = { version = "1.23.0", features = ["full"] }
1 change: 1 addition & 0 deletions protocol/examples/ros/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target
10 changes: 10 additions & 0 deletions protocol/examples/ros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "robonomics-ros-example"
version = "0.1.0"
edition = "2021"

[dependencies]
robonomics-protocol = { path = "../.." }
tokio = { version = "1.23", features = ["full"] }
futures = "0.3"
libp2p = "0.49"
99 changes: 99 additions & 0 deletions protocol/examples/ros/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use futures::StreamExt;
use libp2p::{
gossipsub::{GossipsubEvent, IdentTopic as Topic},
identity,
swarm::{SwarmBuilder, SwarmEvent},
Multiaddr, PeerId,
};
use std::{env, error::Error, time::Duration};
use tokio::io::{self, AsyncBufReadExt};

use robonomics_protocol::network::behaviour::{OutEvent, RobonomicsNetworkBehaviour};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Create a random PeerId
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {local_peer_id}");

// Set params
let heartbeat_interval = Duration::from_millis(1000);
let disable_pubsub = false;
let disable_mdns = true;
let disable_kad = true;
let network_listen_address = "/ip4/0.0.0.0/tcp/0"
.parse()
.expect("network listen address");

// Set up an encrypted DNS-enabled TCP Transport over the Mplex protocol
let transport = libp2p::tokio_development_transport(local_key.clone())?;

// Create robonomics network behaviour
let behaviour = RobonomicsNetworkBehaviour::new(
local_key,
local_peer_id,
heartbeat_interval,
disable_pubsub,
disable_mdns,
disable_kad,
)
.expect("Correct behaviour");

// Create swarm
let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id)
.executor(Box::new(|fut| {
tokio::spawn(fut);
}))
.build();

// Create topic
let topic = Topic::new("ROS");

// Subscribe to topic
if let Some(pubsub) = swarm.behaviour_mut().pubsub.as_mut() {
pubsub.subscribe(&topic)?;
}

// Add nodes
if let Some(to_dial) = env::args().nth(1) {
let addr: Multiaddr = to_dial.parse()?;
swarm.dial(addr.clone())?;

// Add node to pubsub swarm
if let Some(pubsub) = swarm.behaviour_mut().pubsub.as_mut() {
if let Some(peer) = PeerId::try_from_multiaddr(&addr) {
pubsub.add_explicit_peer(&peer);
}
}
}

// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();

// Listen on all interfaces and whatever port the OS assigns
swarm.listen_on(network_listen_address)?;

println!("Enter messages via STDIN and they will be sent to connected peers using Pubsub");

loop {
tokio::select! {
line = stdin.next_line() => {
if let Some(pubsub) = swarm.behaviour_mut().pubsub.as_mut() {
pubsub.publish(topic.clone(), line.expect("Stdin not to close").expect("").as_bytes())?;
}
},
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(OutEvent::Pubsub(GossipsubEvent::Message {
propagation_source: peer_id,
message_id: id,
message,
})) => println!(
"Got message: '{}' with id: {id} from peer: {peer_id}",
String::from_utf8_lossy(&message.data),
),
_ => {},
}
}
}
}
Loading