Cleanup and improve logging

This commit is contained in:
Dreaded_X 2025-04-05 03:04:59 +02:00
parent 72819faa8f
commit 4a1c7f0279
Signed by: Dreaded_X
GPG Key ID: FA5F485356B0D2D4
3 changed files with 32 additions and 29 deletions

View File

@ -5,7 +5,7 @@ use hyper::server::conn::http1::{self};
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use rand::rngs::OsRng; use rand::rngs::OsRng;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tracing::warn; use tracing::{info, warn};
use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt};
use tunnel_rs::ssh::Server; use tunnel_rs::ssh::Server;
@ -30,12 +30,16 @@ async fn main() {
let domain = std::env::var("TUNNEL_DOMAIN").unwrap_or_else(|_| format!("localhost:{port}")); let domain = std::env::var("TUNNEL_DOMAIN").unwrap_or_else(|_| format!("localhost:{port}"));
let mut ssh = Server::new(domain); let mut ssh = Server::new(domain);
let tunnels = ssh.tunnels(); let tunnels = ssh.tunnels();
tokio::spawn(async move { ssh.run(key, ("0.0.0.0", 2222)).await }); let addr = SocketAddr::from(([0, 0, 0, 0], 2222));
tokio::spawn(async move { ssh.run(key, addr).await });
info!("SSH is available on {addr}");
let addr = SocketAddr::from(([0, 0, 0, 0], port)); let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = TcpListener::bind(addr).await.unwrap(); let listener = TcpListener::bind(addr).await.unwrap();
info!("HTTP is available on {addr}");
// TODO: Graceful shutdown
loop { loop {
let (stream, _) = listener.accept().await.unwrap(); let (stream, _) = listener.accept().await.unwrap();
let io = TokioIo::new(stream); let io = TokioIo::new(stream);

View File

@ -9,7 +9,7 @@ use tokio::{
net::ToSocketAddrs, net::ToSocketAddrs,
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
}; };
use tracing::{debug, error}; use tracing::{debug, trace, warn};
use crate::tunnel::{Tunnel, Tunnels}; use crate::tunnel::{Tunnel, Tunnels};
@ -35,26 +35,24 @@ impl russh::server::Handler for Handler {
channel: russh::Channel<Msg>, channel: russh::Channel<Msg>,
_session: &mut Session, _session: &mut Session,
) -> Result<bool, Self::Error> { ) -> Result<bool, Self::Error> {
debug!("channel_open_session"); trace!("channel_open_session");
let Some(mut rx) = self.rx.take() else { let Some(mut rx) = self.rx.take() else {
return Err(russh::Error::Inconsistent); return Err(russh::Error::Inconsistent);
}; };
tokio::spawn(async move { tokio::spawn(async move {
debug!("Waiting for message to send to client...");
loop { loop {
let message = rx.recv().await; let Some(message) = rx.recv().await else {
debug!("Message!"); break;
};
let Some(message) = message else { break }; trace!("Sending message to client");
if channel.data(message.as_ref()).await.is_err() { if channel.data(message.as_ref()).await.is_err() {
break; break;
} }
} }
debug!("Ending receive task");
}); });
Ok(true) Ok(true)
@ -77,6 +75,7 @@ impl russh::server::Handler for Handler {
data: &[u8], data: &[u8],
_session: &mut Session, _session: &mut Session,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
// TODO: Graceful shutdown
if data == [3] { if data == [3] {
return Err(russh::Error::Disconnect); return Err(russh::Error::Disconnect);
} }
@ -90,7 +89,7 @@ impl russh::server::Handler for Handler {
data: &[u8], data: &[u8],
_session: &mut Session, _session: &mut Session,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
debug!("exec_request data {data:?}"); trace!(data, "exec_request");
Ok(()) Ok(())
} }
@ -101,7 +100,7 @@ impl russh::server::Handler for Handler {
port: &mut u32, port: &mut u32,
session: &mut Session, session: &mut Session,
) -> Result<bool, Self::Error> { ) -> Result<bool, Self::Error> {
debug!("{address}:{port}"); trace!(address, port, "tcpip_forward");
let tunnel = Tunnel::new(session.handle(), address, *port); let tunnel = Tunnel::new(session.handle(), address, *port);
let Some(address) = self.all_tunnels.add_tunnel(address, tunnel).await else { let Some(address) = self.all_tunnels.add_tunnel(address, tunnel).await else {
@ -125,8 +124,6 @@ impl Drop for Handler {
tokio::spawn(async move { tokio::spawn(async move {
all_tunnels.remove_tunnels(tunnels.clone()).await; all_tunnels.remove_tunnels(tunnels.clone()).await;
debug!("{all_tunnels:?}");
}); });
} }
} }
@ -149,7 +146,7 @@ impl Server {
pub fn run( pub fn run(
&mut self, &mut self,
key: PrivateKey, key: PrivateKey,
addr: impl ToSocketAddrs + Send, addr: impl ToSocketAddrs + Send + std::fmt::Debug,
) -> impl Future<Output = Result<(), std::io::Error>> + Send { ) -> impl Future<Output = Result<(), std::io::Error>> + Send {
let config = russh::server::Config { let config = russh::server::Config {
inactivity_timeout: Some(Duration::from_secs(3600)), inactivity_timeout: Some(Duration::from_secs(3600)),
@ -163,6 +160,8 @@ impl Server {
}; };
let config = Arc::new(config); let config = Arc::new(config);
debug!(?addr, "Running ssh");
async move { self.run_on_address(config, addr).await } async move { self.run_on_address(config, addr).await }
} }
} }
@ -182,6 +181,6 @@ impl russh::server::Server for Server {
} }
fn handle_session_error(&mut self, error: <Self::Handler as russh::server::Handler>::Error) { fn handle_session_error(&mut self, error: <Self::Handler as russh::server::Handler>::Error) {
error!("Session error: {error:#?}"); warn!("Session error: {error:#?}");
} }
} }

View File

@ -37,6 +37,7 @@ impl Tunnel {
} }
pub async fn open_tunnel(&self) -> Result<Channel<Msg>, russh::Error> { pub async fn open_tunnel(&self) -> Result<Channel<Msg>, russh::Error> {
trace!(tunnel = self.address, "Opening tunnel");
self.handle self.handle
.channel_open_forwarded_tcpip(&self.address, self.port, &self.address, self.port) .channel_open_forwarded_tcpip(&self.address, self.port, &self.address, self.port)
.await .await
@ -76,6 +77,7 @@ impl Tunnels {
let address = format!("{address}.{}", self.domain); let address = format!("{address}.{}", self.domain);
trace!(tunnel = address, "Adding tunnel");
all_tunnels.insert(address.clone(), tunnel); all_tunnels.insert(address.clone(), tunnel);
Some(address) Some(address)
@ -84,13 +86,10 @@ impl Tunnels {
pub async fn remove_tunnels(&mut self, tunnels: HashSet<String>) { pub async fn remove_tunnels(&mut self, tunnels: HashSet<String>) {
let mut all_tunnels = self.tunnels.write().await; let mut all_tunnels = self.tunnels.write().await;
for tunnel in tunnels { for tunnel in tunnels {
trace!(tunnel, "Removing tunnel");
all_tunnels.remove(&tunnel); all_tunnels.remove(&tunnel);
} }
} }
pub async fn get_tunnel(&self, address: &str) -> Option<Tunnel> {
self.tunnels.read().await.get(address).cloned()
}
} }
impl Service<Request<Incoming>> for Tunnels { impl Service<Request<Incoming>> for Tunnels {
@ -128,24 +127,25 @@ impl Service<Request<Incoming>> for Tunnels {
return Box::pin(async { Ok(resp) }); return Box::pin(async { Ok(resp) });
}; };
debug!("Request for {authority:?}"); debug!(tunnel = authority, "Request");
let tunnels = self.clone(); let tunnels = self.tunnels.clone();
Box::pin(async move { Box::pin(async move {
let Some(tunnel) = tunnels.get_tunnel(&authority).await else { let tunnels = tunnels.read().await;
let Some(tunnel) = tunnels.get(&authority) else {
debug!(tunnel = authority, "Unknown tunnel");
let resp = response(StatusCode::NOT_FOUND, "Unknown tunnel"); let resp = response(StatusCode::NOT_FOUND, "Unknown tunnel");
return Ok::<_, hyper::Error>(resp); return Ok(resp);
}; };
debug!("Opening channel");
let channel = match tunnel.open_tunnel().await { let channel = match tunnel.open_tunnel().await {
Ok(channel) => channel, Ok(channel) => channel,
Err(err) => { Err(err) => {
warn!("Failed to open tunnel: {err}"); warn!(tunnel = authority, "Failed to open tunnel: {err}");
let resp = response(StatusCode::INTERNAL_SERVER_ERROR, "Failed to open tunnel"); let resp = response(StatusCode::INTERNAL_SERVER_ERROR, "Failed to open tunnel");
return Ok::<_, hyper::Error>(resp); return Ok(resp);
} }
}; };
let io = TokioIo::new(channel.into_stream()); let io = TokioIo::new(channel.into_stream());
@ -158,7 +158,7 @@ impl Service<Request<Incoming>> for Tunnels {
tokio::spawn(async move { tokio::spawn(async move {
if let Err(err) = conn.await { if let Err(err) = conn.await {
warn!("Connection failed: {err}"); warn!(runnel = authority, "Connection failed: {err}");
} }
}); });