From 2bd26f1db2791c5b4f28213d580605b8434069a2 Mon Sep 17 00:00:00 2001 From: Dreaded_X Date: Fri, 4 Apr 2025 16:14:13 +0200 Subject: [PATCH] Reorganization ssh --- src/lib.rs | 2 + src/main.rs | 242 ++------------------------------------------------ src/ssh.rs | 221 +++++++++++++++++++++++++++++++++++++++++++++ src/tunnel.rs | 36 ++++++++ 4 files changed, 267 insertions(+), 234 deletions(-) create mode 100644 src/ssh.rs create mode 100644 src/tunnel.rs diff --git a/src/lib.rs b/src/lib.rs index c2bb831..43f4421 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,3 @@ pub mod animals; +pub mod ssh; +pub mod tunnel; diff --git a/src/main.rs b/src/main.rs index 4722fbb..cc036ce 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - net::SocketAddr, - path::Path, - sync::Arc, - time::Duration, -}; +use std::{net::SocketAddr, path::Path}; use bytes::Bytes; use http_body_util::{BodyExt, Full, combinators::BoxBody}; @@ -17,20 +11,10 @@ use hyper::{ }; use hyper_util::rt::TokioIo; use rand::rngs::OsRng; -use russh::{ - ChannelId, - server::{self, Handle, Server as _}, -}; -use tokio::{ - net::TcpListener, - sync::{ - RwLock, - mpsc::{self, UnboundedReceiver, UnboundedSender}, - }, -}; -use tracing::{debug, error, trace, warn}; +use tokio::net::TcpListener; +use tracing::{debug, trace, warn}; use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; -use tunnel_rs::animals::get_animal_name; +use tunnel_rs::ssh::Server; fn full>(chunk: T) -> BoxBody { Full::new(chunk.into()) @@ -53,23 +37,9 @@ async fn main() { russh::keys::PrivateKey::random(&mut OsRng, russh::keys::Algorithm::Ed25519).unwrap() }; - let config = russh::server::Config { - inactivity_timeout: Some(Duration::from_secs(3600)), - auth_rejection_time: Duration::from_secs(3), - auth_rejection_time_initial: Some(Duration::from_secs(0)), - keys: vec![key], - preferred: russh::Preferred { - ..Default::default() - }, - ..Default::default() - }; - - let config = Arc::new(config); - - let mut sh = Server::new(); - - let tunnels = sh.tunnels.clone(); - tokio::spawn(async move { sh.run_on_address(config, ("0.0.0.0", 2222)).await }); + let mut ssh = Server::new(); + let tunnels = ssh.tunnels(); + tokio::spawn(async move { ssh.run(key, ("0.0.0.0", 2222)).await }); let service = service_fn(move |req: Request<_>| { let tunnels = tunnels.clone(); @@ -108,16 +78,7 @@ async fn main() { }; debug!("Opening channel"); - let channel = match tunnel - .handle - .channel_open_forwarded_tcpip( - &tunnel.address, - tunnel.port, - &tunnel.address, - tunnel.port, - ) - .await - { + let channel = match tunnel.open_tunnel().await { Ok(channel) => channel, Err(err) => { warn!("Failed to tunnel: {err}"); @@ -167,190 +128,3 @@ async fn main() { }); } } - -#[derive(Debug, Clone)] -struct Tunnel { - handle: Handle, - address: String, - port: u32, -} - -type Tunnels = Arc>>; - -struct Server { - tunnels: Tunnels, -} - -impl Server { - fn new() -> Self { - Server { - tunnels: Arc::new(RwLock::new(HashMap::new())), - } - } -} - -impl server::Server for Server { - type Handler = Handler; - - fn new_client(&mut self, _peer_addr: Option) -> Self::Handler { - let (tx, rx) = mpsc::unbounded_channel::>(); - - Handler { - tx, - rx: Some(rx), - all_tunnels: self.tunnels.clone(), - tunnels: HashSet::new(), - } - } - - fn handle_session_error(&mut self, error: ::Error) { - error!("Session error: {error:#?}"); - } -} - -struct Handler { - tx: UnboundedSender>, - rx: Option>>, - - all_tunnels: Tunnels, - tunnels: HashSet, -} - -impl Handler { - fn send(&self, data: &str) { - let _ = self.tx.send(data.as_bytes().to_vec()); - } - - async fn full_address(&self, address: &str) -> Option { - let all_tunnels = self.all_tunnels.read().await; - - let address = if address == "localhost" { - loop { - let address = get_animal_name(); - if !all_tunnels.contains_key(address) { - break address; - } - } - } else { - if all_tunnels.contains_key(address) { - return None; - } - address - }; - - Some(format!("{address}.tunnel.huizinga.dev")) - } -} - -impl server::Handler for Handler { - type Error = russh::Error; - - async fn channel_open_session( - &mut self, - channel: russh::Channel, - _session: &mut server::Session, - ) -> Result { - debug!("channel_open_session"); - - let Some(mut rx) = self.rx.take() else { - return Err(russh::Error::Inconsistent); - }; - - tokio::spawn(async move { - debug!("Waiting for message to send to client..."); - loop { - let message = rx.recv().await; - debug!("Message!"); - - let Some(message) = message else { break }; - - if channel.data(message.as_ref()).await.is_err() { - break; - } - } - - debug!("Ending receive task"); - }); - - Ok(true) - } - - async fn auth_publickey( - &mut self, - user: &str, - _public_key: &russh::keys::ssh_key::PublicKey, - ) -> Result { - debug!("Login from {user}"); - - // TODO: Get ssh keys associated with user from ldap - Ok(server::Auth::Accept) - } - - async fn data( - &mut self, - _channel: ChannelId, - data: &[u8], - _session: &mut server::Session, - ) -> Result<(), Self::Error> { - if data == [3] { - return Err(russh::Error::Disconnect); - } - - Ok(()) - } - - async fn exec_request( - &mut self, - _channel: ChannelId, - data: &[u8], - _session: &mut server::Session, - ) -> Result<(), Self::Error> { - debug!("exec_request data {data:?}"); - - Ok(()) - } - - async fn tcpip_forward( - &mut self, - address: &str, - port: &mut u32, - session: &mut server::Session, - ) -> Result { - debug!("{address}:{port}"); - - let Some(full_address) = self.full_address(address).await else { - self.send(&format!("{port} => FAILED ({address} already in use)\r\n")); - return Ok(false); - }; - - self.tunnels.insert(full_address.clone()); - self.all_tunnels.write().await.insert( - full_address.clone(), - Tunnel { - handle: session.handle(), - address: address.into(), - port: *port, - }, - ); - - self.send(&format!("{port} => https://{full_address}\r\n")); - - Ok(true) - } -} - -impl Drop for Handler { - fn drop(&mut self) { - let tunnels = self.tunnels.clone(); - let all_tunnels = self.all_tunnels.clone(); - - tokio::spawn(async move { - let mut all_tunnels = all_tunnels.write().await; - for tunnel in tunnels { - all_tunnels.remove(&tunnel); - } - - debug!("{all_tunnels:?}"); - }); - } -} diff --git a/src/ssh.rs b/src/ssh.rs new file mode 100644 index 0000000..f285edd --- /dev/null +++ b/src/ssh.rs @@ -0,0 +1,221 @@ +use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration}; + +use russh::{ + ChannelId, + keys::PrivateKey, + server::{Auth, Msg, Server as _, Session}, +}; +use tokio::{ + net::ToSocketAddrs, + sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, +}; +use tracing::{debug, error}; + +use crate::{ + animals::get_animal_name, + tunnel::{self, Tunnel, Tunnels}, +}; + +pub struct Handler { + tx: UnboundedSender>, + rx: Option>>, + + all_tunnels: Tunnels, + tunnels: HashSet, +} + +impl Handler { + fn send(&self, data: &str) { + let _ = self.tx.send(data.as_bytes().to_vec()); + } + + async fn full_address(&self, address: &str) -> Option { + let all_tunnels = self.all_tunnels.read().await; + + let address = if address == "localhost" { + loop { + let address = get_animal_name(); + if !all_tunnels.contains_key(address) { + break address; + } + } + } else { + if all_tunnels.contains_key(address) { + return None; + } + address + }; + + Some(format!("{address}.tunnel.huizinga.dev")) + } +} + +impl russh::server::Handler for Handler { + type Error = russh::Error; + + async fn channel_open_session( + &mut self, + channel: russh::Channel, + _session: &mut Session, + ) -> Result { + debug!("channel_open_session"); + + let Some(mut rx) = self.rx.take() else { + return Err(russh::Error::Inconsistent); + }; + + tokio::spawn(async move { + debug!("Waiting for message to send to client..."); + loop { + let message = rx.recv().await; + debug!("Message!"); + + let Some(message) = message else { break }; + + if channel.data(message.as_ref()).await.is_err() { + break; + } + } + + debug!("Ending receive task"); + }); + + Ok(true) + } + + async fn auth_publickey( + &mut self, + user: &str, + _public_key: &russh::keys::ssh_key::PublicKey, + ) -> Result { + debug!("Login from {user}"); + + // TODO: Get ssh keys associated with user from ldap + Ok(Auth::Accept) + } + + async fn data( + &mut self, + _channel: ChannelId, + data: &[u8], + _session: &mut Session, + ) -> Result<(), Self::Error> { + if data == [3] { + return Err(russh::Error::Disconnect); + } + + Ok(()) + } + + async fn exec_request( + &mut self, + _channel: ChannelId, + data: &[u8], + _session: &mut Session, + ) -> Result<(), Self::Error> { + debug!("exec_request data {data:?}"); + + Ok(()) + } + + async fn tcpip_forward( + &mut self, + address: &str, + port: &mut u32, + session: &mut Session, + ) -> Result { + debug!("{address}:{port}"); + + let Some(full_address) = self.full_address(address).await else { + self.send(&format!("{port} => FAILED ({address} already in use)\r\n")); + return Ok(false); + }; + + self.tunnels.insert(full_address.clone()); + self.all_tunnels.write().await.insert( + full_address.clone(), + Tunnel::new(session.handle(), address, *port), + ); + + self.send(&format!("{port} => https://{full_address}\r\n")); + + Ok(true) + } +} + +impl Drop for Handler { + fn drop(&mut self) { + let tunnels = self.tunnels.clone(); + let all_tunnels = self.all_tunnels.clone(); + + tokio::spawn(async move { + let mut all_tunnels = all_tunnels.write().await; + for tunnel in tunnels { + all_tunnels.remove(&tunnel); + } + + debug!("{all_tunnels:?}"); + }); + } +} + +pub struct Server { + tunnels: Tunnels, +} + +impl Server { + pub fn new() -> Self { + Server { + tunnels: tunnel::new(), + } + } + + pub fn tunnels(&self) -> Tunnels { + self.tunnels.clone() + } + + pub fn run( + &mut self, + key: PrivateKey, + addr: impl ToSocketAddrs + Send, + ) -> impl Future> + Send { + let config = russh::server::Config { + inactivity_timeout: Some(Duration::from_secs(3600)), + auth_rejection_time: Duration::from_secs(3), + auth_rejection_time_initial: Some(Duration::from_secs(0)), + keys: vec![key], + preferred: russh::Preferred { + ..Default::default() + }, + ..Default::default() + }; + let config = Arc::new(config); + + async move { self.run_on_address(config, addr).await } + } +} + +impl Default for Server { + fn default() -> Self { + Self::new() + } +} + +impl russh::server::Server for Server { + type Handler = Handler; + + fn new_client(&mut self, _peer_addr: Option) -> Self::Handler { + let (tx, rx) = unbounded_channel::>(); + + Handler { + tx, + rx: Some(rx), + all_tunnels: self.tunnels.clone(), + tunnels: HashSet::new(), + } + } + + fn handle_session_error(&mut self, error: ::Error) { + error!("Session error: {error:#?}"); + } +} diff --git a/src/tunnel.rs b/src/tunnel.rs new file mode 100644 index 0000000..68400dc --- /dev/null +++ b/src/tunnel.rs @@ -0,0 +1,36 @@ +use std::{collections::HashMap, sync::Arc}; + +use russh::{ + Channel, + server::{Handle, Msg}, +}; +use tokio::sync::RwLock; + +#[derive(Debug, Clone)] +pub struct Tunnel { + handle: Handle, + address: String, + port: u32, +} + +impl Tunnel { + pub fn new(handle: Handle, address: impl Into, port: u32) -> Self { + Self { + handle, + address: address.into(), + port, + } + } + + pub async fn open_tunnel(&self) -> Result, russh::Error> { + self.handle + .channel_open_forwarded_tcpip(&self.address, self.port, &self.address, self.port) + .await + } +} + +pub type Tunnels = Arc>>; + +pub fn new() -> Tunnels { + Arc::new(RwLock::new(HashMap::new())) +}