Skip to content

Commit

Permalink
more verbose logging, router trait refactor, configrouter now using a…
Browse files Browse the repository at this point in the history
…sync mutex
  • Loading branch information
BRA1L0R committed Jul 20, 2022
1 parent fda7099 commit 354bf18
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 81 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hopper"
version = "0.2.3"
version = "0.2.4"
edition = "2021"

[profile.release]
Expand All @@ -10,7 +10,7 @@ lto = true

[dependencies]
async-trait = "0.1"
tokio = { version = "1.19", features = ["rt-multi-thread", "net", "macros", "io-util"] }
tokio = { version = "1.19", features = ["rt-multi-thread", "net", "macros", "io-util", "sync"] }
config = { version = "0.13", default-features = false, features = ["toml"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = "1"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ default = "127.0.0.1:12345" # optional
## How to run

There are two ways to run hopper:
- Using the [docker image](#docker)
- Using the [binaries](#binaries)
- Using the [docker image](#docker-)
- Using the [binaries](#binary-)

### Docker ![GitHub Workflow Status](https://img.shields.io/github/workflow/status/bra1l0r/hopper-rs/Docker%20build%20and%20registry%20push?label=Container%20Build&style=flat-square)

Expand Down
36 changes: 24 additions & 12 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::server::{router::RouterError, Client, Router};
use crate::server::{bridge::Bridge, router::RouterError, Client, Router};
use async_trait::async_trait;
use config::{ConfigError, File};
use serde::Deserialize;
use std::{collections::HashMap, net::SocketAddr, sync::Mutex};
use serde::{Deserialize, Deserializer};
use std::{collections::HashMap, net::SocketAddr};
use tokio::sync::Mutex;

#[derive(Deserialize)]
/// Defines the structure of a config file. Extension can be
Expand Down Expand Up @@ -53,18 +55,27 @@ impl Balanced {
}
}

fn deserialize_mutex<'de, D, T: Deserialize<'de>>(deserializer: D) -> Result<Mutex<T>, D::Error>
where
D: Deserializer<'de>,
{
let inner = T::deserialize(deserializer)?;
Ok(Mutex::new(inner))
}

#[derive(Deserialize)]
#[serde(untagged)]
enum RouteType {
Simple(SocketAddr),
#[serde(deserialize_with = "deserialize_mutex")]
Balanced(Mutex<Balanced>),
}

impl RouteType {
fn get(&self) -> SocketAddr {
async fn get(&self) -> SocketAddr {
match self {
RouteType::Simple(route) => *route,
RouteType::Balanced(balancer) => balancer.lock().unwrap().get(),
RouteType::Balanced(balancer) => balancer.lock().await.get(),
}
}
}
Expand All @@ -77,15 +88,16 @@ pub struct RouterConfig {
routes: HashMap<String, RouteType>,
}

#[async_trait]
impl Router for RouterConfig {
fn route(&self, client: &Client) -> Result<SocketAddr, RouterError> {
async fn route(&self, client: &Client) -> Result<Bridge, RouterError> {
let destination = client.destination();
self.routes
// tries to read from hashmap
let route = self
.routes
.get(destination)
.map(|dest| dest.get())
// if not present, uses the optional default
.or_else(|| self.default.as_ref().map(|default| default.get()))
.ok_or(RouterError::NoServer)
.or(self.default.as_ref())
.ok_or(RouterError::NoServer)?;

Bridge::connect(route.get().await).await
}
}
56 changes: 56 additions & 0 deletions src/server/bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use crate::{protocol::PacketWriteExtAsync, HopperError};

use super::{client::Client, router::RouterError};
use std::net::SocketAddr;
use tokio::{
io::{copy, AsyncWriteExt},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
},
};

#[derive(Debug)]
pub struct Bridge(TcpStream);

impl Bridge {
pub async fn connect(addr: SocketAddr) -> Result<Self, RouterError> {
let server = TcpStream::connect(addr)
.await
.map_err(RouterError::Unreachable)?;

Ok(Self(server))
}

pub fn address(&self) -> Result<SocketAddr, HopperError> {
self.0.peer_addr().map_err(HopperError::Disconnected)
}

/// handshakes an already connected server and
/// joins two piping futures, bridging the two connections
/// at Layer 4.
///
/// Note: hopper does not care what bytes are shared between
/// the twos
pub async fn bridge(self, client: Client) -> Result<(), HopperError> {
let Bridge(mut server) = self;

// send handshake to server
server.write_serialize(client.data).await?;

let (rc, wc) = client.stream.into_split();
let (rs, ws) = server.into_split();

let pipe = |mut input: OwnedReadHalf, mut output: OwnedWriteHalf| async move {
copy(&mut input, &mut output).await?;
output.shutdown().await
};

// create two futures, one that copies server->client and the other client->server
// then join them together to make them work on the same task concurrently
tokio::try_join!(pipe(rc, ws), pipe(rs, wc))
.map_err(HopperError::Disconnected)
// match the function return signature
.map(drop)
}
}
14 changes: 13 additions & 1 deletion src/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::protocol::{
packets::{Disconnect, Handshake, State},
PacketReadExtAsync, PacketWriteExtAsync,
};
use std::{error::Error, net::SocketAddr};
use std::{error::Error, fmt::Display, net::SocketAddr};
use tokio::net::TcpStream;

pub struct Client {
Expand Down Expand Up @@ -49,3 +49,15 @@ impl Client {
})
}
}

impl Display for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}=>{} ({:?})",
self.address,
self.destination(),
self.data.next_state
)
}
}
22 changes: 12 additions & 10 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::sync::Arc;
use tokio::net::TcpListener;

pub mod bridge;
mod client;
pub mod router;

pub use crate::HopperError;
pub use client::Client;
pub use router::Router;

use self::router::Bridge;

pub struct Hopper {
router: Arc<dyn Router>,
}
Expand All @@ -21,6 +20,8 @@ impl Hopper {
}

pub async fn listen(&self, listener: TcpListener) -> ! {
log::info!("Listening on {}", listener.local_addr().unwrap());

loop {
let client = listener.accept().await.unwrap();
let router = self.router.clone();
Expand All @@ -32,14 +33,15 @@ impl Hopper {
// routes a client by reading handshake information
// then if a route has been found it connects to the server
// but does not yet send handshaking information
let bridge = match router.route(&client).map(Bridge::connect) {
Ok(future) => future.await,
Err(err) => Err(err),
};

match bridge {
Ok(bridge) => bridge.bridge(client).await,
Err(err) => Err(client.disconnect_err_chain(err).await.into()),
match router.route(&client).await {
Ok(bridge) => {
log::info!("{client} connected to {}", bridge.address()?);
bridge.bridge(client).await
}
Err(err) => {
log::error!("Couldn't connect {client}: {err}");
Err(client.disconnect_err_chain(err).await.into())
}
}
};

Expand Down
58 changes: 5 additions & 53 deletions src/server/router.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
use crate::{protocol::PacketWriteExtAsync, HopperError};

use super::client::Client;
use std::net::SocketAddr;
use thiserror::Error;
use tokio::{
io::{copy, AsyncWriteExt},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
},
};

use super::{bridge::Bridge, Client};

#[derive(Error, Debug)]
pub enum RouterError {
Expand All @@ -20,47 +11,8 @@ pub enum RouterError {
Unreachable(std::io::Error),
}

// #[async_trait::async_trait]
#[async_trait::async_trait]
pub trait Router: Send + Sync {
fn route(&self, client: &Client) -> Result<SocketAddr, RouterError>;
}

#[derive(Debug)]
pub struct Bridge(TcpStream);

impl Bridge {
pub async fn connect(addr: SocketAddr) -> Result<Self, RouterError> {
let server = TcpStream::connect(addr)
.await
.map_err(RouterError::Unreachable)?;

Ok(Self(server))
}
}

impl Bridge {
/// handshakes an already connected server and
/// joins two piping futures, bridging the two connections
/// at Layer 4.
///
/// Note: hopper does not care what bytes are shared between
/// the twos
pub async fn bridge(self, client: Client) -> Result<(), HopperError> {
let Bridge(mut server) = self;

server.write_serialize(client.data).await?;

let (rc, wc) = client.stream.into_split();
let (rs, ws) = server.into_split();

// let client_to_server = async {};
let pipe = |mut input: OwnedReadHalf, mut output: OwnedWriteHalf| async move {
copy(&mut input, &mut output).await?;
output.shutdown().await
};

tokio::try_join!(pipe(rc, ws), pipe(rs, wc))
.map_err(HopperError::Disconnected)
.map(drop)
}
// type Server: ConnectedServer;
async fn route(&self, client: &Client) -> Result<Bridge, RouterError>;
}

0 comments on commit 354bf18

Please sign in to comment.