Improved how stats are handled
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 7m25s
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 7m25s
This commit is contained in:
parent
3ba80ab202
commit
f75726b93a
|
@ -8,6 +8,7 @@ mod input;
|
||||||
mod io;
|
mod io;
|
||||||
mod ldap;
|
mod ldap;
|
||||||
mod server;
|
mod server;
|
||||||
|
mod stats;
|
||||||
mod tui;
|
mod tui;
|
||||||
mod tunnel;
|
mod tunnel;
|
||||||
mod units;
|
mod units;
|
||||||
|
|
36
src/stats.rs
Normal file
36
src/stats.rs
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
|
use crate::units::Unit;
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct Stats {
|
||||||
|
connections: AtomicUsize,
|
||||||
|
rx: AtomicUsize,
|
||||||
|
tx: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stats {
|
||||||
|
pub fn add_connection(&self) {
|
||||||
|
self.connections.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_rx_bytes(&self, n: usize) {
|
||||||
|
self.rx.fetch_add(n, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_tx_bytes(&self, n: usize) {
|
||||||
|
self.tx.fetch_add(n, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn connections(&self) -> usize {
|
||||||
|
self.connections.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rx(&self) -> Unit {
|
||||||
|
Unit::new(self.rx.load(Ordering::Relaxed), "B")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tx(&self) -> Unit {
|
||||||
|
Unit::new(self.tx.load(Ordering::Relaxed), "B")
|
||||||
|
}
|
||||||
|
}
|
|
@ -11,10 +11,7 @@ use std::{
|
||||||
collections::{HashMap, hash_map::Entry},
|
collections::{HashMap, hash_map::Entry},
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::{
|
sync::Arc,
|
||||||
Arc,
|
|
||||||
atomic::{AtomicUsize, Ordering},
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
use tracing::{debug, error, trace, warn};
|
use tracing::{debug, error, trace, warn};
|
||||||
|
|
||||||
|
@ -25,7 +22,7 @@ use crate::{
|
||||||
animals::get_animal_name,
|
animals::get_animal_name,
|
||||||
auth::{AuthStatus, ForwardAuth},
|
auth::{AuthStatus, ForwardAuth},
|
||||||
helper::response,
|
helper::response,
|
||||||
units::Unit,
|
stats::Stats,
|
||||||
wrapper::Wrapper,
|
wrapper::Wrapper,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -46,25 +43,19 @@ pub struct Tunnel {
|
||||||
domain: Option<String>,
|
domain: Option<String>,
|
||||||
port: u32,
|
port: u32,
|
||||||
access: Arc<RwLock<TunnelAccess>>,
|
access: Arc<RwLock<TunnelAccess>>,
|
||||||
connection_count: Arc<AtomicUsize>,
|
stats: Arc<Stats>,
|
||||||
bytes_rx: Arc<AtomicUsize>,
|
|
||||||
bytes_tx: Arc<AtomicUsize>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Tunnel {
|
impl Tunnel {
|
||||||
pub async fn open_tunnel(&self) -> Result<Wrapper, russh::Error> {
|
pub async fn open_tunnel(&self) -> Result<Wrapper, russh::Error> {
|
||||||
trace!(tunnel = self.name, "Opening tunnel");
|
trace!(tunnel = self.name, "Opening tunnel");
|
||||||
self.connection_count.fetch_add(1, Ordering::Relaxed);
|
self.stats.add_connection();
|
||||||
let channel = self
|
let channel = self
|
||||||
.handle
|
.handle
|
||||||
.channel_open_forwarded_tcpip(&self.address, self.port, &self.address, self.port)
|
.channel_open_forwarded_tcpip(&self.address, self.port, &self.address, self.port)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(Wrapper::new(
|
Ok(Wrapper::new(channel.into_stream(), self.stats.clone()))
|
||||||
channel.into_stream(),
|
|
||||||
self.bytes_rx.clone(),
|
|
||||||
self.bytes_tx.clone(),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn set_access(&self, access: TunnelAccess) {
|
pub async fn set_access(&self, access: TunnelAccess) {
|
||||||
|
@ -80,18 +71,6 @@ impl Tunnel {
|
||||||
.clone()
|
.clone()
|
||||||
.map(|domain| format!("{}.{domain}", self.name))
|
.map(|domain| format!("{}.{domain}", self.name))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_connections(&self) -> usize {
|
|
||||||
self.connection_count.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_rx_string(&self) -> String {
|
|
||||||
Unit::new(self.bytes_rx.load(Ordering::Relaxed), "B").to_string()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_tx_string(&self) -> String {
|
|
||||||
Unit::new(self.bytes_tx.load(Ordering::Relaxed), "B").to_string()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@ -146,9 +125,7 @@ impl Tunnels {
|
||||||
domain: Some(self.domain.clone()),
|
domain: Some(self.domain.clone()),
|
||||||
port,
|
port,
|
||||||
access: Arc::new(RwLock::new(TunnelAccess::Private(user.into()))),
|
access: Arc::new(RwLock::new(TunnelAccess::Private(user.into()))),
|
||||||
connection_count: Default::default(),
|
stats: Default::default(),
|
||||||
bytes_rx: Default::default(),
|
|
||||||
bytes_tx: Default::default(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if tunnel.name == "localhost" {
|
if tunnel.name == "localhost" {
|
||||||
|
|
|
@ -34,8 +34,8 @@ pub async fn to_row(tunnel: &Tunnel) -> Vec<Span<'static>> {
|
||||||
access,
|
access,
|
||||||
tunnel.port.to_string().into(),
|
tunnel.port.to_string().into(),
|
||||||
address,
|
address,
|
||||||
tunnel.get_connections().to_string().into(),
|
tunnel.stats.connections().to_string().into(),
|
||||||
tunnel.get_rx_string().into(),
|
tunnel.stats.rx().to_string().into(),
|
||||||
tunnel.get_tx_string().into(),
|
tunnel.stats.tx().to_string().into(),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
21
src/units.rs
21
src/units.rs
|
@ -2,22 +2,13 @@ use std::fmt;
|
||||||
|
|
||||||
pub struct Unit {
|
pub struct Unit {
|
||||||
value: usize,
|
value: usize,
|
||||||
prefix: UnitPrefix,
|
|
||||||
unit: String,
|
unit: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Unit {
|
impl Unit {
|
||||||
pub fn new(mut value: usize, unit: impl Into<String>) -> Self {
|
pub fn new(value: usize, unit: impl Into<String>) -> Self {
|
||||||
let mut prefix = UnitPrefix::None;
|
|
||||||
|
|
||||||
while value > 10000 {
|
|
||||||
value /= 1000;
|
|
||||||
prefix = prefix.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
value,
|
value,
|
||||||
prefix,
|
|
||||||
unit: unit.into(),
|
unit: unit.into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,7 +16,15 @@ impl Unit {
|
||||||
|
|
||||||
impl fmt::Display for Unit {
|
impl fmt::Display for Unit {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
write!(f, "{} {}{}", self.value, self.prefix, self.unit)
|
let mut value = self.value;
|
||||||
|
let mut prefix = UnitPrefix::None;
|
||||||
|
|
||||||
|
while value > 10000 {
|
||||||
|
value /= 1000;
|
||||||
|
prefix = prefix.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
write!(f, "{} {}{}", value, prefix, self.unit)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,35 +1,25 @@
|
||||||
use std::{
|
use std::{
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::{
|
sync::Arc,
|
||||||
Arc,
|
|
||||||
atomic::{AtomicUsize, Ordering},
|
|
||||||
},
|
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use russh::{ChannelStream, server::Msg};
|
use russh::{ChannelStream, server::Msg};
|
||||||
|
|
||||||
|
use crate::stats::Stats;
|
||||||
|
|
||||||
pin_project! {
|
pin_project! {
|
||||||
pub struct Wrapper {
|
pub struct Wrapper {
|
||||||
#[pin]
|
#[pin]
|
||||||
inner: ChannelStream<Msg>,
|
inner: ChannelStream<Msg>,
|
||||||
bytes_rx: Arc<AtomicUsize>,
|
stats: Arc<Stats>,
|
||||||
bytes_tx: Arc<AtomicUsize>
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Wrapper {
|
impl Wrapper {
|
||||||
pub fn new(
|
pub fn new(inner: ChannelStream<Msg>, stats: Arc<Stats>) -> Self {
|
||||||
inner: ChannelStream<Msg>,
|
Self { inner, stats }
|
||||||
bytes_rx: Arc<AtomicUsize>,
|
|
||||||
bytes_tx: Arc<AtomicUsize>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
inner,
|
|
||||||
bytes_rx,
|
|
||||||
bytes_tx,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,7 +38,7 @@ impl hyper::rt::Read for Wrapper {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
project.bytes_tx.fetch_add(n, Ordering::Relaxed);
|
project.stats.add_tx_bytes(n);
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
buf.advance(n);
|
buf.advance(n);
|
||||||
|
@ -66,7 +56,7 @@ impl hyper::rt::Write for Wrapper {
|
||||||
let project = self.project();
|
let project = self.project();
|
||||||
tokio::io::AsyncWrite::poll_write(project.inner, cx, buf).map(|res| {
|
tokio::io::AsyncWrite::poll_write(project.inner, cx, buf).map(|res| {
|
||||||
res.inspect(|n| {
|
res.inspect(|n| {
|
||||||
project.bytes_rx.fetch_add(*n, Ordering::Relaxed);
|
project.stats.add_rx_bytes(*n);
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -94,7 +84,7 @@ impl hyper::rt::Write for Wrapper {
|
||||||
let project = self.project();
|
let project = self.project();
|
||||||
tokio::io::AsyncWrite::poll_write_vectored(project.inner, cx, bufs).map(|res| {
|
tokio::io::AsyncWrite::poll_write_vectored(project.inner, cx, bufs).map(|res| {
|
||||||
res.inspect(|n| {
|
res.inspect(|n| {
|
||||||
project.bytes_rx.fetch_add(*n, Ordering::Relaxed);
|
project.stats.add_rx_bytes(*n);
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user