diff --git a/src/lib.rs b/src/lib.rs index fbd71f1..13073e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ mod input; mod io; mod ldap; mod server; +mod stats; mod tui; mod tunnel; mod units; diff --git a/src/stats.rs b/src/stats.rs new file mode 100644 index 0000000..3f7a265 --- /dev/null +++ b/src/stats.rs @@ -0,0 +1,36 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; + +use crate::units::Unit; + +#[derive(Debug, Default)] +pub struct Stats { + connections: AtomicUsize, + rx: AtomicUsize, + tx: AtomicUsize, +} + +impl Stats { + pub fn add_connection(&self) { + self.connections.fetch_add(1, Ordering::Relaxed); + } + + pub fn add_rx_bytes(&self, n: usize) { + self.rx.fetch_add(n, Ordering::Relaxed); + } + + pub fn add_tx_bytes(&self, n: usize) { + self.tx.fetch_add(n, Ordering::Relaxed); + } + + pub fn connections(&self) -> usize { + self.connections.load(Ordering::Relaxed) + } + + pub fn rx(&self) -> Unit { + Unit::new(self.rx.load(Ordering::Relaxed), "B") + } + + pub fn tx(&self) -> Unit { + Unit::new(self.tx.load(Ordering::Relaxed), "B") + } +} diff --git a/src/tunnel.rs b/src/tunnel.rs index 382cec1..5113478 100644 --- a/src/tunnel.rs +++ b/src/tunnel.rs @@ -11,10 +11,7 @@ use std::{ collections::{HashMap, hash_map::Entry}, ops::Deref, pin::Pin, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, + sync::Arc, }; use tracing::{debug, error, trace, warn}; @@ -25,7 +22,7 @@ use crate::{ animals::get_animal_name, auth::{AuthStatus, ForwardAuth}, helper::response, - units::Unit, + stats::Stats, wrapper::Wrapper, }; @@ -46,25 +43,19 @@ pub struct Tunnel { domain: Option, port: u32, access: Arc>, - connection_count: Arc, - bytes_rx: Arc, - bytes_tx: Arc, + stats: Arc, } impl Tunnel { pub async fn open_tunnel(&self) -> Result { trace!(tunnel = self.name, "Opening tunnel"); - self.connection_count.fetch_add(1, Ordering::Relaxed); + self.stats.add_connection(); let channel = self .handle .channel_open_forwarded_tcpip(&self.address, self.port, &self.address, self.port) .await?; - Ok(Wrapper::new( - channel.into_stream(), - self.bytes_rx.clone(), - self.bytes_tx.clone(), - )) + Ok(Wrapper::new(channel.into_stream(), self.stats.clone())) } pub async fn set_access(&self, access: TunnelAccess) { @@ -80,18 +71,6 @@ impl Tunnel { .clone() .map(|domain| format!("{}.{domain}", self.name)) } - - pub fn get_connections(&self) -> usize { - self.connection_count.load(Ordering::Relaxed) - } - - pub fn get_rx_string(&self) -> String { - Unit::new(self.bytes_rx.load(Ordering::Relaxed), "B").to_string() - } - - pub fn get_tx_string(&self) -> String { - Unit::new(self.bytes_tx.load(Ordering::Relaxed), "B").to_string() - } } #[derive(Debug, Clone)] @@ -146,9 +125,7 @@ impl Tunnels { domain: Some(self.domain.clone()), port, access: Arc::new(RwLock::new(TunnelAccess::Private(user.into()))), - connection_count: Default::default(), - bytes_rx: Default::default(), - bytes_tx: Default::default(), + stats: Default::default(), }; if tunnel.name == "localhost" { diff --git a/src/tunnel/tui.rs b/src/tunnel/tui.rs index d086a91..6c60806 100644 --- a/src/tunnel/tui.rs +++ b/src/tunnel/tui.rs @@ -34,8 +34,8 @@ pub async fn to_row(tunnel: &Tunnel) -> Vec> { access, tunnel.port.to_string().into(), address, - tunnel.get_connections().to_string().into(), - tunnel.get_rx_string().into(), - tunnel.get_tx_string().into(), + tunnel.stats.connections().to_string().into(), + tunnel.stats.rx().to_string().into(), + tunnel.stats.tx().to_string().into(), ] } diff --git a/src/units.rs b/src/units.rs index 5e7bb6b..4c9c36a 100644 --- a/src/units.rs +++ b/src/units.rs @@ -2,22 +2,13 @@ use std::fmt; pub struct Unit { value: usize, - prefix: UnitPrefix, unit: String, } impl Unit { - pub fn new(mut value: usize, unit: impl Into) -> Self { - let mut prefix = UnitPrefix::None; - - while value > 10000 { - value /= 1000; - prefix = prefix.next(); - } - + pub fn new(value: usize, unit: impl Into) -> Self { Self { value, - prefix, unit: unit.into(), } } @@ -25,7 +16,15 @@ impl Unit { impl fmt::Display for Unit { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{} {}{}", self.value, self.prefix, self.unit) + let mut value = self.value; + let mut prefix = UnitPrefix::None; + + while value > 10000 { + value /= 1000; + prefix = prefix.next(); + } + + write!(f, "{} {}{}", value, prefix, self.unit) } } diff --git a/src/wrapper.rs b/src/wrapper.rs index 219772d..4e1a162 100644 --- a/src/wrapper.rs +++ b/src/wrapper.rs @@ -1,35 +1,25 @@ use std::{ pin::Pin, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, + sync::Arc, task::{Context, Poll}, }; use pin_project_lite::pin_project; use russh::{ChannelStream, server::Msg}; +use crate::stats::Stats; + pin_project! { pub struct Wrapper { #[pin] inner: ChannelStream, - bytes_rx: Arc, - bytes_tx: Arc + stats: Arc, } } impl Wrapper { - pub fn new( - inner: ChannelStream, - bytes_rx: Arc, - bytes_tx: Arc, - ) -> Self { - Self { - inner, - bytes_rx, - bytes_tx, - } + pub fn new(inner: ChannelStream, stats: Arc) -> Self { + Self { inner, stats } } } @@ -48,7 +38,7 @@ impl hyper::rt::Read for Wrapper { } }; - project.bytes_tx.fetch_add(n, Ordering::Relaxed); + project.stats.add_tx_bytes(n); unsafe { buf.advance(n); @@ -66,7 +56,7 @@ impl hyper::rt::Write for Wrapper { let project = self.project(); tokio::io::AsyncWrite::poll_write(project.inner, cx, buf).map(|res| { res.inspect(|n| { - project.bytes_rx.fetch_add(*n, Ordering::Relaxed); + project.stats.add_rx_bytes(*n); }) }) } @@ -94,7 +84,7 @@ impl hyper::rt::Write for Wrapper { let project = self.project(); tokio::io::AsyncWrite::poll_write_vectored(project.inner, cx, bufs).map(|res| { res.inspect(|n| { - project.bytes_rx.fetch_add(*n, Ordering::Relaxed); + project.stats.add_rx_bytes(*n); }) }) } diff --git a/test.yaml b/test.yaml new file mode 100644 index 0000000..32b954d --- /dev/null +++ b/test.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: tunnel-demo-acl + annotations: + config.huizinga.dev/fragment: authelia-acl +data: + rules: | + - domain: "*.tunnel.huizinga.dev" + policy: one_factor