Added tunnel stats
This commit is contained in:
parent
cfa8a75962
commit
a8e3fd9d2a
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -3294,6 +3294,7 @@ dependencies = [
|
||||||
"hyper",
|
"hyper",
|
||||||
"hyper-util",
|
"hyper-util",
|
||||||
"ldap3",
|
"ldap3",
|
||||||
|
"pin-project-lite",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"ratatui",
|
"ratatui",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
|
|
|
@ -16,6 +16,7 @@ http-body-util = { version = "0.1.3", features = ["full"] }
|
||||||
hyper = { version = "1.6.0", features = ["full"] }
|
hyper = { version = "1.6.0", features = ["full"] }
|
||||||
hyper-util = { version = "0.1.11", features = ["full"] }
|
hyper-util = { version = "0.1.11", features = ["full"] }
|
||||||
ldap3 = "0.11.5"
|
ldap3 = "0.11.5"
|
||||||
|
pin-project-lite = "0.2.16"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
ratatui = { version = "0.29.0", features = ["unstable-backend-writer"] }
|
ratatui = { version = "0.29.0", features = ["unstable-backend-writer"] }
|
||||||
reqwest = { version = "0.12.15", features = ["rustls-tls"] }
|
reqwest = { version = "0.12.15", features = ["rustls-tls"] }
|
||||||
|
|
|
@ -10,6 +10,8 @@ mod ldap;
|
||||||
mod server;
|
mod server;
|
||||||
mod tui;
|
mod tui;
|
||||||
mod tunnel;
|
mod tunnel;
|
||||||
|
mod units;
|
||||||
|
mod wrapper;
|
||||||
|
|
||||||
pub use ldap::Ldap;
|
pub use ldap::Ldap;
|
||||||
pub use server::Server;
|
pub use server::Server;
|
||||||
|
|
|
@ -7,25 +7,26 @@ use hyper::{
|
||||||
header::{self, HOST},
|
header::{self, HOST},
|
||||||
service::Service,
|
service::Service,
|
||||||
};
|
};
|
||||||
use hyper_util::rt::TokioIo;
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, hash_map::Entry},
|
collections::{HashMap, hash_map::Entry},
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::Arc,
|
sync::{
|
||||||
|
Arc,
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use tracing::{debug, error, trace, warn};
|
use tracing::{debug, error, trace, warn};
|
||||||
|
|
||||||
use russh::{
|
use russh::server::Handle;
|
||||||
Channel,
|
|
||||||
server::{Handle, Msg},
|
|
||||||
};
|
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
animals::get_animal_name,
|
animals::get_animal_name,
|
||||||
auth::{AuthStatus, ForwardAuth},
|
auth::{AuthStatus, ForwardAuth},
|
||||||
helper::response,
|
helper::response,
|
||||||
|
units::Unit,
|
||||||
|
wrapper::Wrapper,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub mod tui;
|
pub mod tui;
|
||||||
|
@ -45,14 +46,25 @@ pub struct Tunnel {
|
||||||
domain: Option<String>,
|
domain: Option<String>,
|
||||||
port: u32,
|
port: u32,
|
||||||
access: Arc<RwLock<TunnelAccess>>,
|
access: Arc<RwLock<TunnelAccess>>,
|
||||||
|
connection_count: Arc<AtomicUsize>,
|
||||||
|
bytes_rx: Arc<AtomicUsize>,
|
||||||
|
bytes_tx: Arc<AtomicUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Tunnel {
|
impl Tunnel {
|
||||||
pub async fn open_tunnel(&self) -> Result<Channel<Msg>, russh::Error> {
|
pub async fn open_tunnel(&self) -> Result<Wrapper, russh::Error> {
|
||||||
trace!(tunnel = self.name, "Opening tunnel");
|
trace!(tunnel = self.name, "Opening tunnel");
|
||||||
self.handle
|
self.connection_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
let channel = self
|
||||||
|
.handle
|
||||||
.channel_open_forwarded_tcpip(&self.address, self.port, &self.address, self.port)
|
.channel_open_forwarded_tcpip(&self.address, self.port, &self.address, self.port)
|
||||||
.await
|
.await?;
|
||||||
|
|
||||||
|
Ok(Wrapper::new(
|
||||||
|
channel.into_stream(),
|
||||||
|
self.bytes_rx.clone(),
|
||||||
|
self.bytes_tx.clone(),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn set_access(&self, access: TunnelAccess) {
|
pub async fn set_access(&self, access: TunnelAccess) {
|
||||||
|
@ -68,6 +80,18 @@ impl Tunnel {
|
||||||
.clone()
|
.clone()
|
||||||
.map(|domain| format!("{}.{domain}", self.name))
|
.map(|domain| format!("{}.{domain}", self.name))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_connections(&self) -> usize {
|
||||||
|
self.connection_count.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_rx_string(&self) -> String {
|
||||||
|
Unit::new(self.bytes_rx.load(Ordering::Relaxed), "B").to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_tx_string(&self) -> String {
|
||||||
|
Unit::new(self.bytes_tx.load(Ordering::Relaxed), "B").to_string()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@ -122,6 +146,9 @@ impl Tunnels {
|
||||||
domain: Some(self.domain.clone()),
|
domain: Some(self.domain.clone()),
|
||||||
port,
|
port,
|
||||||
access: Arc::new(RwLock::new(TunnelAccess::Private(user.into()))),
|
access: Arc::new(RwLock::new(TunnelAccess::Private(user.into()))),
|
||||||
|
connection_count: Default::default(),
|
||||||
|
bytes_rx: Default::default(),
|
||||||
|
bytes_tx: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
if tunnel.name == "localhost" {
|
if tunnel.name == "localhost" {
|
||||||
|
@ -264,8 +291,8 @@ impl Service<Request<Incoming>> for Tunnels {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let channel = match tunnel.open_tunnel().await {
|
let io = match tunnel.open_tunnel().await {
|
||||||
Ok(channel) => channel,
|
Ok(io) => io,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(tunnel = authority, "Failed to open tunnel: {err}");
|
warn!(tunnel = authority, "Failed to open tunnel: {err}");
|
||||||
let resp = response(StatusCode::INTERNAL_SERVER_ERROR, "Failed to open tunnel");
|
let resp = response(StatusCode::INTERNAL_SERVER_ERROR, "Failed to open tunnel");
|
||||||
|
@ -273,7 +300,6 @@ impl Service<Request<Incoming>> for Tunnels {
|
||||||
return Ok(resp);
|
return Ok(resp);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let io = TokioIo::new(channel.into_stream());
|
|
||||||
|
|
||||||
let (mut sender, conn) = Builder::new()
|
let (mut sender, conn) = Builder::new()
|
||||||
.preserve_header_case(true)
|
.preserve_header_case(true)
|
||||||
|
|
|
@ -11,6 +11,9 @@ pub fn header() -> Vec<Span<'static>> {
|
||||||
"Access".into(),
|
"Access".into(),
|
||||||
"Port".into(),
|
"Port".into(),
|
||||||
"Address".into(),
|
"Address".into(),
|
||||||
|
"Conn".into(),
|
||||||
|
"Rx".into(),
|
||||||
|
"Tx".into(),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,5 +34,8 @@ pub async fn to_row(tunnel: &Tunnel) -> Vec<Span<'static>> {
|
||||||
access,
|
access,
|
||||||
tunnel.port.to_string().into(),
|
tunnel.port.to_string().into(),
|
||||||
address,
|
address,
|
||||||
|
tunnel.get_connections().to_string().into(),
|
||||||
|
tunnel.get_rx_string().into(),
|
||||||
|
tunnel.get_tx_string().into(),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
71
src/units.rs
Normal file
71
src/units.rs
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
pub struct Unit {
|
||||||
|
value: usize,
|
||||||
|
prefix: UnitPrefix,
|
||||||
|
unit: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Unit {
|
||||||
|
pub fn new(mut value: usize, unit: impl Into<String>) -> Self {
|
||||||
|
let mut prefix = UnitPrefix::None;
|
||||||
|
|
||||||
|
while value > 10000 {
|
||||||
|
value /= 1000;
|
||||||
|
prefix = prefix.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
Self {
|
||||||
|
value,
|
||||||
|
prefix,
|
||||||
|
unit: unit.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Unit {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "{} {}{}", self.value, self.prefix, self.unit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum UnitPrefix {
|
||||||
|
None,
|
||||||
|
Kilo,
|
||||||
|
Mega,
|
||||||
|
Giga,
|
||||||
|
Tera,
|
||||||
|
Peta,
|
||||||
|
Exa,
|
||||||
|
Impossible,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UnitPrefix {
|
||||||
|
fn next(self) -> Self {
|
||||||
|
match self {
|
||||||
|
UnitPrefix::None => UnitPrefix::Kilo,
|
||||||
|
UnitPrefix::Kilo => UnitPrefix::Mega,
|
||||||
|
UnitPrefix::Mega => UnitPrefix::Giga,
|
||||||
|
UnitPrefix::Giga => UnitPrefix::Tera,
|
||||||
|
UnitPrefix::Tera => UnitPrefix::Peta,
|
||||||
|
UnitPrefix::Peta => UnitPrefix::Exa,
|
||||||
|
UnitPrefix::Exa | UnitPrefix::Impossible => UnitPrefix::Impossible,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for UnitPrefix {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
let prefix = match self {
|
||||||
|
UnitPrefix::None => "",
|
||||||
|
UnitPrefix::Kilo => "k",
|
||||||
|
UnitPrefix::Mega => "M",
|
||||||
|
UnitPrefix::Giga => "G",
|
||||||
|
UnitPrefix::Tera => "T",
|
||||||
|
UnitPrefix::Peta => "P",
|
||||||
|
UnitPrefix::Exa => "E",
|
||||||
|
UnitPrefix::Impossible => "x",
|
||||||
|
};
|
||||||
|
f.write_str(prefix)
|
||||||
|
}
|
||||||
|
}
|
101
src/wrapper.rs
Normal file
101
src/wrapper.rs
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
use std::{
|
||||||
|
pin::Pin,
|
||||||
|
sync::{
|
||||||
|
Arc,
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
},
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use pin_project_lite::pin_project;
|
||||||
|
use russh::{ChannelStream, server::Msg};
|
||||||
|
|
||||||
|
pin_project! {
|
||||||
|
pub struct Wrapper {
|
||||||
|
#[pin]
|
||||||
|
inner: ChannelStream<Msg>,
|
||||||
|
bytes_rx: Arc<AtomicUsize>,
|
||||||
|
bytes_tx: Arc<AtomicUsize>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Wrapper {
|
||||||
|
pub fn new(
|
||||||
|
inner: ChannelStream<Msg>,
|
||||||
|
bytes_rx: Arc<AtomicUsize>,
|
||||||
|
bytes_tx: Arc<AtomicUsize>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
inner,
|
||||||
|
bytes_rx,
|
||||||
|
bytes_tx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl hyper::rt::Read for Wrapper {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
mut buf: hyper::rt::ReadBufCursor<'_>,
|
||||||
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
|
let project = self.project();
|
||||||
|
let n = unsafe {
|
||||||
|
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
|
||||||
|
match tokio::io::AsyncRead::poll_read(project.inner, cx, &mut tbuf) {
|
||||||
|
Poll::Ready(Ok(())) => tbuf.filled().len(),
|
||||||
|
other => return other,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
project.bytes_tx.fetch_add(n, Ordering::Relaxed);
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
buf.advance(n);
|
||||||
|
}
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl hyper::rt::Write for Wrapper {
|
||||||
|
fn poll_write(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<Result<usize, std::io::Error>> {
|
||||||
|
let project = self.project();
|
||||||
|
tokio::io::AsyncWrite::poll_write(project.inner, cx, buf).map(|res| {
|
||||||
|
res.inspect(|n| {
|
||||||
|
project.bytes_rx.fetch_add(*n, Ordering::Relaxed);
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
|
||||||
|
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
|
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[std::io::IoSlice<'_>],
|
||||||
|
) -> Poll<Result<usize, std::io::Error>> {
|
||||||
|
let project = self.project();
|
||||||
|
tokio::io::AsyncWrite::poll_write_vectored(project.inner, cx, bufs).map(|res| {
|
||||||
|
res.inspect(|n| {
|
||||||
|
project.bytes_rx.fetch_add(*n, Ordering::Relaxed);
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user