Skip to content

Commit

Permalink
terminal access
Browse files Browse the repository at this point in the history
  • Loading branch information
Smehnov committed Feb 8, 2024
1 parent bd904f2 commit 4d0c3b8
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tokio = { version = "1.35.1", features = ["rt-multi-thread", "macros"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
openssl = { version = "0.10", features = ["vendored"] }

termion = "3.0.0"


114 changes: 96 additions & 18 deletions src/commands/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,28 @@ use bollard;
use bollard::Docker;
use rust_socketio::asynchronous::Client;

use futures_util::TryStreamExt;
use futures_util::{StreamExt, TryStreamExt};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;

use std;
use tracing::{error, info};

use serde::{Deserialize, Serialize};

use crate::commands::{RobotJob, RobotJobResult};
use crate::{
commands::{RobotJob, RobotJobResult},
store::{JobManager, Jobs},
};

pub async fn execute_launch(socket: Client, robot_job: RobotJob) {
pub async fn execute_launch(socket: Client, robot_job: RobotJob, jobs: Jobs) {
let args = serde_json::from_str::<DockerLaunchArgs>(&robot_job.args).unwrap();
info!("launching docker job {:?}", args);
let docker_launch = DockerLaunch { args };

let robot_job_result = match docker_launch.execute(robot_job.clone()).await {
let robot_job_result = match docker_launch.execute(robot_job.clone(), jobs).await {
Ok(result) => {
info!("job successfully executed");
result
Expand Down Expand Up @@ -60,6 +68,7 @@ impl DockerLaunch {
pub async fn execute(
&self,
robot_job: RobotJob,
jobs: Jobs,
) -> Result<RobotJobResult, bollard::errors::Error> {
info!("launching docker with image {}", self.args.image);
let docker = Docker::connect_with_socket_defaults().unwrap();
Expand Down Expand Up @@ -116,23 +125,92 @@ impl DockerLaunch {

docker.start_container::<String>(&id, None).await?;

let logs_options: bollard::container::LogsOptions<String> =
bollard::container::LogsOptions {
follow: true,
stdout: true,
stderr: true,
..Default::default()
};

let mut logs = docker.logs(&id, Some(logs_options));
let mut concatenated_logs: String = String::new();

while let Some(log) = logs.try_next().await? {
concatenated_logs.push_str(std::str::from_utf8(&log.into_bytes()).unwrap_or(""));
match &self.args.custom_cmd {
Some(custom_cmd) => {
let exec = docker
.create_exec(
&id,
bollard::exec::CreateExecOptions {
attach_stdout: Some(true),
attach_stderr: Some(true),
attach_stdin: Some(true),
tty: Some(true),
cmd: Some(vec!["sh"]),
..Default::default()
},
)
.await?
.id;

#[cfg(not(windows))]
if let bollard::exec::StartExecResults::Attached {
mut output,
mut input,
} = docker.start_exec(&exec, None).await?
{
// pipe stdin into the docker exec stream input
{
let shared_jobs = Arc::clone(&jobs);
let job_manager = shared_jobs.lock().unwrap();
let channel_to_job_tx =
job_manager.get_channel_to_job_tx_by_job_id(&robot_job.id);
if let Some(channel_to_job_tx) = channel_to_job_tx {
tokio::task::spawn(async move {
let mut channel_to_job_rx = channel_to_job_tx.subscribe();
loop {
let data = channel_to_job_rx.recv().await.unwrap();
for byte in data.as_bytes().iter() {
input.write(&[*byte]).await.ok();
}
}
});
}
}

docker
.resize_exec(
&exec,
bollard::exec::ResizeExecOptions {
height: 35,
width: 100,
},
)
.await?;
{
let shared_jobs = Arc::clone(&jobs);
while let Some(Ok(output)) = output.next().await {
let job_manager = shared_jobs.lock().unwrap();
if let Some(tx) = job_manager.get_channel_by_job_id(&robot_job.id) {
tx.send(output.to_string()).unwrap();
}

info!("{:?}", output.into_bytes());
}
}
}
}
None => {
let logs_options: bollard::container::LogsOptions<String> =
bollard::container::LogsOptions {
follow: true,
stdout: true,
stderr: true,
..Default::default()
};

let mut logs = docker.logs(&id, Some(logs_options));

while let Some(log) = logs.try_next().await? {
concatenated_logs
.push_str(std::str::from_utf8(&log.into_bytes()).unwrap_or(""));
}

info!("log: {}", concatenated_logs);
}
}

info!("log: {}", concatenated_logs);

docker
.remove_container(
&id,
Expand Down
119 changes: 117 additions & 2 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,30 @@
use futures_util::future::err;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};

use rust_socketio::{asynchronous::Client, Payload};

use serde::{Deserialize, Serialize};

use serde_json::json;
use tracing::info;

use crate::store;

mod docker;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StartTunnelReq {
pub job_id: String,
pub client_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageToRobot{
pub job_id: String,
pub content: String
}

#[derive(Clone, Serialize, Deserialize)]
pub struct RobotJobResult {
pub job_id: String,
Expand All @@ -22,19 +41,115 @@ pub struct RobotJob {
pub args: String,
}

pub async fn launch_new_job(payload: Payload, socket: Client) {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RobotStartTunnelResponse{
is_ok: bool,
error: Option<String>
}

pub async fn launch_new_job(payload: Payload, socket: Client, jobs: store::Jobs) {
match payload {
Payload::String(str) => {
let robot_job: RobotJob = serde_json::from_str(&str).unwrap();
info!("{:?}", robot_job);

match robot_job.job_type.as_str() {
"docker-container-launch" => {
info!("container launch");
tokio::spawn(docker::execute_launch(socket, robot_job));
let mut job_manager = jobs.lock().unwrap();
job_manager.new_job(
robot_job.id.clone(),
robot_job.job_type.clone(),
robot_job.status.clone(),
);
let shared_jobs = Arc::clone(&jobs);
tokio::spawn(docker::execute_launch(socket, robot_job, shared_jobs));
}
_ => {}
}
}
Payload::Binary(bin_data) => info!("{:?}", bin_data),
};
}

pub async fn start_tunnel_messanger(
socket: Client,
tx: broadcast::Sender<String>,
client_id: String,
) {
let mut rx = tx.subscribe();
loop {
let data = rx.recv().await.unwrap();
info!("from docker: {}", data);
let _res: Result<(), rust_socketio::Error> = socket
.emit(
"message_to_client",
json!({"client_id": client_id, "content": data}),
)
.await;
}
}

pub async fn start_tunnel(payload: Payload, socket: Client, jobs: store::Jobs) {
let mut ack_result = RobotStartTunnelResponse{
is_ok: false,
error: None
};
match payload {
Payload::String(str) => {
info!("Start tunnel request");
let start_tunnel_request: StartTunnelReq = serde_json::from_str(&str).unwrap();
info!("Start tunnel: {:?}", start_tunnel_request);
let mut job_manager = jobs.lock().unwrap();

match job_manager.get_job_or_none(&start_tunnel_request.job_id) {
Some(job) => {
job_manager.create_job_tunnel(
&start_tunnel_request.job_id,
start_tunnel_request.client_id.clone(),
);
if let Some(channel_tx) =
job_manager.get_channel_by_job_id(&start_tunnel_request.job_id)
{
tokio::spawn(start_tunnel_messanger(
socket,
channel_tx.clone(),
start_tunnel_request.client_id.clone(),
));
ack_result.is_ok = true;
} else {
info!("no channel tx");
}
}
None => {
//todo: socket res
}
}
}
Payload::Binary(bin_data) => info!("{:?}", bin_data),
};
}


pub async fn message_to_robot(payload: Payload, socket: Client, jobs: store::Jobs) {
match payload {
Payload::String(str) => {
info!("Message to robot request");
let message: MessageToRobot = serde_json::from_str(&str).unwrap();
info!("Message to robot: {:?}", message);
let mut job_manager = jobs.lock().unwrap();

match job_manager.get_job_or_none(&message.job_id) {
Some(job) => {
if let Some(channel) = job_manager.get_channel_to_job_tx_by_job_id(&message.job_id){
channel.send(message.content).unwrap();
};
}
None => {
//todo: socket res
}
}
}
Payload::Binary(bin_data) => info!("{:?}", bin_data),
};
}
Loading

0 comments on commit 4d0c3b8

Please sign in to comment.