diff --git a/src/main.rs b/src/main.rs index 9ed955f..116661b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use hyper::server::conn::http1::{self}; use hyper_util::rt::TokioIo; use rand::rngs::OsRng; use tokio::net::TcpListener; -use tracing::warn; +use tracing::{info, warn}; use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; use tunnel_rs::ssh::Server; @@ -30,12 +30,16 @@ async fn main() { let domain = std::env::var("TUNNEL_DOMAIN").unwrap_or_else(|_| format!("localhost:{port}")); let mut ssh = Server::new(domain); - let tunnels = ssh.tunnels(); - tokio::spawn(async move { ssh.run(key, ("0.0.0.0", 2222)).await }); + let addr = SocketAddr::from(([0, 0, 0, 0], 2222)); + tokio::spawn(async move { ssh.run(key, addr).await }); + info!("SSH is available on {addr}"); let addr = SocketAddr::from(([0, 0, 0, 0], port)); let listener = TcpListener::bind(addr).await.unwrap(); + info!("HTTP is available on {addr}"); + + // TODO: Graceful shutdown loop { let (stream, _) = listener.accept().await.unwrap(); let io = TokioIo::new(stream); diff --git a/src/ssh.rs b/src/ssh.rs index 775260d..8756c02 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -9,7 +9,7 @@ use tokio::{ net::ToSocketAddrs, sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, }; -use tracing::{debug, error}; +use tracing::{debug, trace, warn}; use crate::tunnel::{Tunnel, Tunnels}; @@ -35,26 +35,24 @@ impl russh::server::Handler for Handler { channel: russh::Channel, _session: &mut Session, ) -> Result { - debug!("channel_open_session"); + trace!("channel_open_session"); let Some(mut rx) = self.rx.take() else { return Err(russh::Error::Inconsistent); }; tokio::spawn(async move { - debug!("Waiting for message to send to client..."); loop { - let message = rx.recv().await; - debug!("Message!"); + let Some(message) = rx.recv().await else { + break; + }; - let Some(message) = message else { break }; + trace!("Sending message to client"); if channel.data(message.as_ref()).await.is_err() { break; } } - - debug!("Ending receive task"); }); Ok(true) @@ -77,6 +75,7 @@ impl russh::server::Handler for Handler { data: &[u8], _session: &mut Session, ) -> Result<(), Self::Error> { + // TODO: Graceful shutdown if data == [3] { return Err(russh::Error::Disconnect); } @@ -90,7 +89,7 @@ impl russh::server::Handler for Handler { data: &[u8], _session: &mut Session, ) -> Result<(), Self::Error> { - debug!("exec_request data {data:?}"); + trace!(data, "exec_request"); Ok(()) } @@ -101,7 +100,7 @@ impl russh::server::Handler for Handler { port: &mut u32, session: &mut Session, ) -> Result { - debug!("{address}:{port}"); + trace!(address, port, "tcpip_forward"); let tunnel = Tunnel::new(session.handle(), address, *port); let Some(address) = self.all_tunnels.add_tunnel(address, tunnel).await else { @@ -125,8 +124,6 @@ impl Drop for Handler { tokio::spawn(async move { all_tunnels.remove_tunnels(tunnels.clone()).await; - - debug!("{all_tunnels:?}"); }); } } @@ -149,7 +146,7 @@ impl Server { pub fn run( &mut self, key: PrivateKey, - addr: impl ToSocketAddrs + Send, + addr: impl ToSocketAddrs + Send + std::fmt::Debug, ) -> impl Future> + Send { let config = russh::server::Config { inactivity_timeout: Some(Duration::from_secs(3600)), @@ -163,6 +160,8 @@ impl Server { }; let config = Arc::new(config); + debug!(?addr, "Running ssh"); + async move { self.run_on_address(config, addr).await } } } @@ -182,6 +181,6 @@ impl russh::server::Server for Server { } fn handle_session_error(&mut self, error: ::Error) { - error!("Session error: {error:#?}"); + warn!("Session error: {error:#?}"); } } diff --git a/src/tunnel.rs b/src/tunnel.rs index 5a9be4f..0452ca6 100644 --- a/src/tunnel.rs +++ b/src/tunnel.rs @@ -37,6 +37,7 @@ impl Tunnel { } pub async fn open_tunnel(&self) -> Result, russh::Error> { + trace!(tunnel = self.address, "Opening tunnel"); self.handle .channel_open_forwarded_tcpip(&self.address, self.port, &self.address, self.port) .await @@ -76,6 +77,7 @@ impl Tunnels { let address = format!("{address}.{}", self.domain); + trace!(tunnel = address, "Adding tunnel"); all_tunnels.insert(address.clone(), tunnel); Some(address) @@ -84,13 +86,10 @@ impl Tunnels { pub async fn remove_tunnels(&mut self, tunnels: HashSet) { let mut all_tunnels = self.tunnels.write().await; for tunnel in tunnels { + trace!(tunnel, "Removing tunnel"); all_tunnels.remove(&tunnel); } } - - pub async fn get_tunnel(&self, address: &str) -> Option { - self.tunnels.read().await.get(address).cloned() - } } impl Service> for Tunnels { @@ -128,24 +127,25 @@ impl Service> for Tunnels { return Box::pin(async { Ok(resp) }); }; - debug!("Request for {authority:?}"); + debug!(tunnel = authority, "Request"); - let tunnels = self.clone(); + let tunnels = self.tunnels.clone(); Box::pin(async move { - let Some(tunnel) = tunnels.get_tunnel(&authority).await else { + let tunnels = tunnels.read().await; + let Some(tunnel) = tunnels.get(&authority) else { + debug!(tunnel = authority, "Unknown tunnel"); let resp = response(StatusCode::NOT_FOUND, "Unknown tunnel"); - return Ok::<_, hyper::Error>(resp); + return Ok(resp); }; - debug!("Opening channel"); let channel = match tunnel.open_tunnel().await { Ok(channel) => channel, Err(err) => { - warn!("Failed to open tunnel: {err}"); + warn!(tunnel = authority, "Failed to open tunnel: {err}"); let resp = response(StatusCode::INTERNAL_SERVER_ERROR, "Failed to open tunnel"); - return Ok::<_, hyper::Error>(resp); + return Ok(resp); } }; let io = TokioIo::new(channel.into_stream()); @@ -158,7 +158,7 @@ impl Service> for Tunnels { tokio::spawn(async move { if let Err(err) = conn.await { - warn!("Connection failed: {err}"); + warn!(runnel = authority, "Connection failed: {err}"); } });