10 Commits

Author SHA1 Message Date
0fe043acb5 Revert "Use store instead of fetch_add for atomics"
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 6m1s
This reverts commit d4bd0ef1ca.
2025-04-21 02:21:18 +02:00
878df8da40 Start graceful shutdown on SIGTERM
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 5m32s
2025-04-20 00:58:18 +02:00
27f6119905 Second ctrl-c forces application to stop directly
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 5m21s
2025-04-20 00:26:23 +02:00
c7b0cfc888 Gracefully shutdown if LDAP connection is lost 2025-04-20 00:24:32 +02:00
7851d6bb12 Implemented more graceful shutdown
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 7m27s
2025-04-20 00:14:24 +02:00
49fd6d8a3a Added suggestion to enable quiet mode for ssh client 2025-04-20 00:14:23 +02:00
ca742fe332 Updated authelia acl helper 2025-04-20 00:14:20 +02:00
b5c832fb38 Updated crates
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 7m33s
2025-04-18 16:08:09 +02:00
526b9b0e0c Update rust 1.85 -> 1.86
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 7m24s
2025-04-18 15:56:49 +02:00
e92b61b1a7 Set rust toolchain
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 6m53s
2025-04-18 15:30:38 +02:00
16 changed files with 313 additions and 119 deletions

2
.cargo/config.toml Normal file
View File

@@ -0,0 +1,2 @@
[env]
RUSTC_BOOTSTRAP = "1"

View File

@@ -1,4 +1,4 @@
*
!queries
!src
!Cargo.*
!.cargo/config.toml

139
Cargo.lock generated
View File

@@ -304,9 +304,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.17"
version = "1.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fcb57c740ae1daf453ae85f16e37396f672b039e00d9d866e07ddb24e328e3a"
checksum = "8e3a13707ac958681c13b39b458c073d0d9bc8a22cb1b2f4c8e55eb72c13f362"
dependencies = [
"shlex",
]
@@ -360,9 +360,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.35"
version = "4.5.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8aa86934b44c19c50f87cc2790e19f54f7a67aedb64101c2e1a2e5ecfb73944"
checksum = "2df961d8c8a0d08aa9945718ccf584145eee3f3aa06cddbeac12933781102e04"
dependencies = [
"clap_builder",
"clap_derive",
@@ -370,9 +370,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.35"
version = "4.5.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2414dbb2dd0695280da6ea9261e327479e9d37b0630f6b53ba2a11c60c679fd9"
checksum = "132dbda40fb6753878316a489d5a1242a8ef2f0d9e47ba01c951ea8aa7d013a5"
dependencies = [
"anstream",
"anstyle",
@@ -648,9 +648,9 @@ dependencies = [
[[package]]
name = "data-encoding"
version = "2.8.0"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010"
checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
[[package]]
name = "delegate"
@@ -870,12 +870,12 @@ checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d"
[[package]]
name = "flate2"
version = "1.1.0"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc"
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece"
dependencies = [
"crc32fast",
"miniz_oxide 0.8.5",
"miniz_oxide 0.8.8",
]
[[package]]
@@ -1090,9 +1090,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.4.8"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2"
checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633"
dependencies = [
"atomic-waker",
"bytes",
@@ -1107,6 +1107,12 @@ dependencies = [
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "hashbrown"
version = "0.15.2"
@@ -1293,9 +1299,9 @@ dependencies = [
[[package]]
name = "iana-time-zone"
version = "0.1.62"
version = "0.1.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2fd658b06e56721792c5df4475705b6cda790e9298d19d2f8af083457bcd127"
checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8"
dependencies = [
"android_system_properties",
"core-foundation-sys",
@@ -1303,7 +1309,7 @@ dependencies = [
"js-sys",
"log",
"wasm-bindgen",
"windows-core 0.52.0",
"windows-core 0.61.0",
]
[[package]]
@@ -1473,7 +1479,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e"
dependencies = [
"equivalent",
"hashbrown",
"hashbrown 0.15.2",
]
[[package]]
@@ -1626,9 +1632,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.171"
version = "0.2.172"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
[[package]]
name = "libm"
@@ -1644,9 +1650,9 @@ checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab"
[[package]]
name = "linux-raw-sys"
version = "0.9.3"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413"
checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12"
[[package]]
name = "litemap"
@@ -1682,7 +1688,7 @@ version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
dependencies = [
"hashbrown",
"hashbrown 0.15.2",
]
[[package]]
@@ -1729,9 +1735,9 @@ dependencies = [
[[package]]
name = "miniz_oxide"
version = "0.8.5"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5"
checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a"
dependencies = [
"adler2",
]
@@ -2150,9 +2156,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.94"
version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84"
checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778"
dependencies = [
"unicode-ident",
]
@@ -2185,7 +2191,7 @@ checksum = "b820744eb4dc9b57a3398183639c511b5a26d2ed702cedd3febaa1393caa22cc"
dependencies = [
"bytes",
"getrandom 0.3.2",
"rand 0.9.0",
"rand 0.9.1",
"ring",
"rustc-hash",
"rustls",
@@ -2199,9 +2205,9 @@ dependencies = [
[[package]]
name = "quinn-udp"
version = "0.5.10"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e46f3055866785f6b92bc6164b76be02ca8f2eb4b002c0354b28cf4c119e5944"
checksum = "541d0f57c6ec747a90738a52741d3221f7960e8ac2f0ff4b1a63680e033b4ab5"
dependencies = [
"cfg_aliases",
"libc",
@@ -2239,13 +2245,12 @@ dependencies = [
[[package]]
name = "rand"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
"zerocopy",
]
[[package]]
@@ -2309,9 +2314,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.5.10"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1"
checksum = "d2f103c6d277498fbceb16e84d317e2a400f160f46904d5f5410848c829511a3"
dependencies = [
"bitflags",
]
@@ -2586,15 +2591,15 @@ dependencies = [
"bitflags",
"errno",
"libc",
"linux-raw-sys 0.9.3",
"linux-raw-sys 0.9.4",
"windows-sys 0.59.0",
]
[[package]]
name = "rustls"
version = "0.23.25"
version = "0.23.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "822ee9188ac4ec04a2f0531e55d035fb2de73f18b41a63c70c2712503b6fb13c"
checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0"
dependencies = [
"once_cell",
"ring",
@@ -2876,6 +2881,7 @@ dependencies = [
"russh",
"thiserror 2.0.12",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"unicode-width 0.2.0",
@@ -2892,9 +2898,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.14.0"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd"
checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9"
[[package]]
name = "socket2"
@@ -3206,6 +3212,8 @@ dependencies = [
"bytes",
"futures-core",
"futures-sink",
"futures-util",
"hashbrown 0.14.5",
"pin-project-lite",
"tokio",
]
@@ -3608,24 +3616,28 @@ dependencies = [
[[package]]
name = "windows-core"
version = "0.52.0"
version = "0.58.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99"
dependencies = [
"windows-implement 0.58.0",
"windows-interface 0.58.0",
"windows-result 0.2.0",
"windows-strings 0.1.0",
"windows-targets 0.52.6",
]
[[package]]
name = "windows-core"
version = "0.58.0"
version = "0.61.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99"
checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980"
dependencies = [
"windows-implement",
"windows-interface",
"windows-result 0.2.0",
"windows-strings 0.1.0",
"windows-targets 0.52.6",
"windows-implement 0.60.0",
"windows-interface 0.59.1",
"windows-link",
"windows-result 0.3.2",
"windows-strings 0.4.0",
]
[[package]]
@@ -3639,6 +3651,17 @@ dependencies = [
"syn",
]
[[package]]
name = "windows-implement"
version = "0.60.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-interface"
version = "0.58.0"
@@ -3650,6 +3673,17 @@ dependencies = [
"syn",
]
[[package]]
name = "windows-interface"
version = "0.59.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-link"
version = "0.1.1"
@@ -3704,6 +3738,15 @@ dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-sys"
version = "0.42.0"

View File

@@ -24,6 +24,7 @@ reqwest = { version = "0.12.15", features = ["rustls-tls"] }
russh = "0.51.1"
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["full"] }
tokio-util = { version = "0.7.14", features = ["rt"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter"] }
unicode-width = "0.2.0"

View File

@@ -1,4 +1,4 @@
FROM rust:1.85 AS base
FROM rust:1.86 AS base
ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse
RUN cargo install cargo-chef --locked --version 0.1.71 && \
cargo install cargo-auditable --locked --version 0.6.6
@@ -16,8 +16,6 @@ RUN cargo chef cook --release --recipe-path recipe.json
COPY . .
ARG RELEASE_VERSION
ENV RELEASE_VERSION=${RELEASE_VERSION}
# HACK: Enable the use of features on stable
ENV RUSTC_BOOTSTRAP=1
RUN cargo auditable build --release
FROM gcr.io/distroless/cc-debian12:nonroot AS runtime

View File

@@ -29,7 +29,7 @@ This deployment runs in a cluster with [Authelia](https://github.com/authelia/au
A tunnel can be opened using the following command:
```
ssh <username>@<host> [-p <ssh port>] -t -R <local port>:localhost:<local port>
ssh <username>@<host> [-p <ssh port>] -tq -R <local port>:localhost:<local port>
```
This will open a new tunnel with a randomly generated name, you can specify a name for the tunnel by instead using `-R <name>:<local port>:localhost:<local port>`.
@@ -48,6 +48,7 @@ Host tunnel
Port <ssh port>
User <username>
RequestTTY yes
LogLevel QUIET
```
You can now connect with `ssh tunnel -R <local port>:localhost:<local port>`.

View File

@@ -1,10 +1,7 @@
apiVersion: v1
kind: ConfigMap
apiVersion: authelia.huizinga.dev/v1
kind: AccessControlRule
metadata:
name: authelia-acl
annotations:
config.huizinga.dev/fragment: authelia-acl
data:
rules: |
- domain: "*.tunnel.huizinga.dev"
name: tunnel-dev
spec:
domain: "*.tunnel.huizinga.dev"
policy: one_factor

4
rust-toolchain.toml Normal file
View File

@@ -0,0 +1,4 @@
[toolchain]
channel = "1.86"
profile = "default"
components = ["rust-analyzer"]

View File

@@ -19,15 +19,15 @@ pub struct Stats {
impl Stats {
pub fn add_connection(&self) {
self.connections.store(1, Ordering::Relaxed);
self.connections.fetch_add(1, Ordering::Relaxed);
}
pub fn add_rx_bytes(&self, n: usize) {
self.rx.store(n, Ordering::Relaxed);
self.rx.fetch_add(n, Ordering::Relaxed);
}
pub fn add_tx_bytes(&self, n: usize) {
self.tx.store(n, Ordering::Relaxed);
self.tx.fetch_add(n, Ordering::Relaxed);
}
pub fn connections(&self) -> usize {

View File

@@ -1,5 +1,9 @@
use ldap3::{LdapConnAsync, SearchEntry};
use russh::keys::PublicKey;
use tokio::select;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
#[derive(Debug, Clone)]
pub struct Ldap {
@@ -20,7 +24,9 @@ pub enum LdapError {
}
impl Ldap {
pub async fn start_from_env() -> Result<Ldap, LdapError> {
pub async fn start_from_env(
token: CancellationToken,
) -> Result<(Ldap, JoinHandle<()>), LdapError> {
let address = std::env::var("LDAP_ADDRESS")
.map_err(|_| LdapError::MissingEnvironmentVariable("LDAP_ADDRESS"))?;
let base = std::env::var("LDAP_BASE")
@@ -41,11 +47,25 @@ impl Ldap {
)?;
let (conn, mut ldap) = LdapConnAsync::new(&address).await?;
ldap3::drive!(conn);
let handle = tokio::spawn(async move {
select! {
res = conn.drive() => {
if let Err(err) = res {
error!("LDAP connection error: {}", err);
} else {
error!("LDAP connection lost");
token.cancel();
}
}
_ = token.cancelled() => {
debug!("Graceful shutdown");
}
}
});
ldap.simple_bind(&bind_dn, &password).await?.success()?;
Ok(Self { base, ldap })
Ok((Self { base, ldap }, handle))
}
pub async fn get_ssh_keys(

View File

@@ -1,5 +1,6 @@
#![feature(let_chains)]
#![feature(iter_intersperse)]
#![feature(future_join)]
mod helper;
mod io;
pub mod ldap;

View File

@@ -1,10 +1,11 @@
#![feature(future_join)]
use std::future::join;
use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;
use color_eyre::eyre::Context;
use dotenvy::dotenv;
use hyper::server::conn::http1::{self};
use hyper_util::rt::TokioIo;
use rand::rngs::OsRng;
use siranga::VERSION;
use siranga::ldap::Ldap;
@@ -12,11 +13,47 @@ use siranga::ssh::Server;
use siranga::tunnel::Registry;
use siranga::web::{ForwardAuth, Service};
use tokio::net::TcpListener;
use tracing::{error, info, warn};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
#[cfg(unix)]
async fn sigterm() {
use tokio::signal::unix::SignalKind;
let mut sigterm =
tokio::signal::unix::signal(SignalKind::terminate()).expect("should be able to initialize");
sigterm.recv().await;
}
#[cfg(not(unix))]
async fn sigterm() {
std::future::pending::<()>().await;
}
async fn shutdown_task(token: CancellationToken) {
select! {
_ = tokio::signal::ctrl_c() => {
debug!("Received SIGINT");
}
_ = sigterm() => {
debug!("Received SIGTERM");
}
_ = token.cancelled() => {
debug!("Application called for graceful shutdown");
}
}
info!("Starting graceful shutdown");
token.cancel();
select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
_ = tokio::signal::ctrl_c() => {}
}
}
#[tokio::main]
async fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
@@ -59,35 +96,32 @@ async fn main() -> color_eyre::Result<()> {
std::env::var("TUNNEL_DOMAIN").unwrap_or_else(|_| format!("localhost:{http_port}"));
let authz_address = std::env::var("AUTHZ_ENDPOINT").wrap_err("AUTHZ_ENDPOINT is not set")?;
let ldap = Ldap::start_from_env().await?;
let registry = Registry::new(domain);
let mut ssh = Server::new(ldap, registry.clone());
let addr = SocketAddr::from(([0, 0, 0, 0], ssh_port));
tokio::spawn(async move { ssh.run(key, addr).await });
info!("SSH is available on {addr}");
let token = CancellationToken::new();
let (ldap, ldap_handle) = Ldap::start_from_env(token.clone()).await?;
let ssh = Server::new(ldap, registry.clone(), token.clone());
let ssh_addr = SocketAddr::from(([0, 0, 0, 0], ssh_port));
let ssh_task = ssh.run(key, ssh_addr);
info!("SSH is available on {ssh_addr}");
let auth = ForwardAuth::new(authz_address);
let service = Service::new(registry, auth);
let addr = SocketAddr::from(([0, 0, 0, 0], http_port));
let listener = TcpListener::bind(addr).await?;
info!("HTTP is available on {addr}");
let http_addr = SocketAddr::from(([0, 0, 0, 0], http_port));
let http_listener = TcpListener::bind(http_addr).await?;
let http_task = service.serve(http_listener, token.clone());
info!("HTTP is available on {http_addr}");
// TODO: Graceful shutdown
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
select! {
_ = join!(ldap_handle, ssh_task, http_task) => {
info!("Shutdown gracefully");
}
_ = shutdown_task(token.clone()) => {
error!("Failed to shut down gracefully");
}
};
let service = service.clone();
tokio::spawn(async move {
if let Err(err) = http1::Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.serve_connection(io, service)
.with_upgrades()
.await
{
error!("Failed to serve connection: {err:?}");
}
});
}
Ok(())
}

View File

@@ -8,8 +8,10 @@ use ratatui::{Terminal, TerminalOptions, Viewport};
use russh::ChannelId;
use russh::keys::ssh_key::PublicKey;
use russh::server::{Auth, Msg, Session};
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use super::renderer::Renderer;
use crate::VERSION;
use crate::io::{Input, TerminalHandle};
use crate::ldap::{Ldap, LdapError};
@@ -62,7 +64,7 @@ pub struct Handler {
}
impl Handler {
pub fn new(ldap: Ldap, registry: Registry) -> Self {
pub fn new(ldap: Ldap, registry: Registry, token: CancellationToken) -> Self {
Self {
ldap,
registry,
@@ -70,7 +72,7 @@ impl Handler {
user: None,
pty_channel: None,
renderer: Default::default(),
renderer: Renderer::new(token),
selected: None,
rename_input: None,
}

View File

@@ -11,7 +11,9 @@ use russh::MethodKind;
use russh::keys::PrivateKey;
use russh::server::Server as _;
use tokio::net::ToSocketAddrs;
use tracing::{debug, warn};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, warn};
use crate::ldap::Ldap;
use crate::tunnel::Registry;
@@ -19,18 +21,30 @@ use crate::tunnel::Registry;
pub struct Server {
ldap: Ldap,
registry: Registry,
token: CancellationToken,
}
async fn graceful_shutdown(token: CancellationToken) {
token.cancelled().await;
let duration = 1;
// All pty sessions will close once the token is cancelled, but to properly allow the sessions
// to close the ssh server still needs to be driven, so we let it run a little bit longer.
// TODO: Figure out a way to wait for all connections to be closed, would require also closing
// non-pty sessions somehow
debug!("Waiting for {duration}s before stopping");
tokio::time::sleep(Duration::from_secs(duration)).await;
}
impl Server {
pub fn new(ldap: Ldap, registry: Registry) -> Self {
Server { ldap, registry }
pub fn new(ldap: Ldap, registry: Registry, token: CancellationToken) -> Self {
Server {
ldap,
registry,
token,
}
}
pub fn run(
&mut self,
key: PrivateKey,
addr: impl ToSocketAddrs + Send + std::fmt::Debug,
) -> impl Future<Output = Result<(), std::io::Error>> + Send {
pub async fn run(mut self, key: PrivateKey, addr: impl ToSocketAddrs + Send + std::fmt::Debug) {
let config = russh::server::Config {
inactivity_timeout: Some(Duration::from_secs(3600)),
auth_rejection_time: Duration::from_secs(1),
@@ -47,7 +61,17 @@ impl Server {
debug!(?addr, "Running ssh");
async move { self.run_on_address(config, addr).await }
let token = self.token.clone();
select! {
res = self.run_on_address(config, addr) => {
if let Err(err) = res {
error!("SSH Server error: {err}");
}
}
_ = graceful_shutdown(token) => {
debug!("Graceful shutdown");
}
}
}
}
@@ -55,7 +79,7 @@ impl russh::server::Server for Server {
type Handler = Handler;
fn new_client(&mut self, _peer_addr: Option<SocketAddr>) -> Self::Handler {
Handler::new(self.ldap.clone(), self.registry.clone())
Handler::new(self.ldap.clone(), self.registry.clone(), self.token.clone())
}
fn handle_session_error(&mut self, error: <Self::Handler as russh::server::Handler>::Error) {

View File

@@ -14,7 +14,8 @@ use ratatui::widgets::{
use ratatui::{Frame, Terminal};
use tokio::select;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
use tracing::error;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use unicode_width::UnicodeWidthStr;
use crate::VERSION;
@@ -36,6 +37,8 @@ struct RendererInner {
rows: Vec<TunnelRow>,
input: Option<String>,
rx: UnboundedReceiver<Message>,
token: CancellationToken,
}
fn compute_widths(rows: &Vec<Vec<Span<'static>>>) -> Vec<u16> {
@@ -75,12 +78,13 @@ fn compute_column_skip(
}
impl RendererInner {
fn new(rx: UnboundedReceiver<Message>) -> Self {
fn new(rx: UnboundedReceiver<Message>, token: CancellationToken) -> Self {
Self {
state: Default::default(),
rows: Default::default(),
input: None,
rx,
token,
}
}
@@ -303,6 +307,10 @@ impl RendererInner {
self.render(frame);
})?;
}
_ = self.token.cancelled() => {
debug!("Graceful shutdown");
break;
}
}
}
@@ -310,16 +318,24 @@ impl RendererInner {
}
}
#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
pub struct Renderer {
tx: Option<UnboundedSender<Message>>,
token: CancellationToken,
}
impl Renderer {
pub fn new(token: CancellationToken) -> Self {
Self {
tx: Default::default(),
token,
}
}
pub fn start(&mut self, terminal: Terminal<CrosstermBackend<TerminalHandle>>) {
let (tx, rx) = unbounded_channel();
let mut inner = RendererInner::new(rx);
let mut inner = RendererInner::new(rx, self.token.clone());
tokio::spawn(async move {
if let Err(err) = inner.start(terminal).await {

View File

@@ -1,6 +1,7 @@
mod auth;
mod response;
use std::future::join;
use std::ops::Deref;
use std::pin::Pin;
@@ -12,8 +13,14 @@ use http_body_util::{BodyExt as _, Empty};
use hyper::body::Incoming;
use hyper::client::conn::http1::Builder;
use hyper::header::{self, HOST};
use hyper::server::conn::http1;
use hyper::{Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use hyper_util::server::graceful::GracefulShutdown;
use response::response;
use tokio::net::TcpListener;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, trace, warn};
use crate::tunnel::{Registry, TunnelAccess};
@@ -28,6 +35,49 @@ impl Service {
pub fn new(registry: Registry, auth: ForwardAuth) -> Self {
Self { registry, auth }
}
pub async fn handle_connection(
&self,
listener: &TcpListener,
graceful_shutdown: &GracefulShutdown,
) -> std::io::Result<()> {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
let connection = http1::Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.serve_connection(io, self.clone());
let connection = graceful_shutdown.watch(connection);
tokio::spawn(async move {
if let Err(err) = connection.await {
error!("Failed to serve connection: {err:?}");
}
});
Ok(())
}
pub async fn serve(self, listener: TcpListener, token: CancellationToken) {
let graceful_shutdown = GracefulShutdown::new();
loop {
select! {
res = self.handle_connection(&listener, &graceful_shutdown) => {
if let Err(err) = res {
error!("Failed to accept connection: {err}")
}
}
_ = token.cancelled() => {
debug!("Graceful shutdown");
break;
}
}
}
graceful_shutdown.shutdown().await;
}
}
impl hyper::service::Service<Request<Incoming>> for Service {
@@ -135,14 +185,15 @@ impl hyper::service::Service<Request<Incoming>> for Service {
.handshake(io)
.await?;
tokio::spawn(async move {
let conn = async {
if let Err(err) = conn.await {
warn!(runnel = authority, "Connection failed: {err}");
}
});
};
let resp = sender.send_request(req).await?;
Ok(resp.map(|b| b.boxed()))
let (resp, _) = join!(sender.send_request(req), conn).await;
Ok(resp?.map(|b| b.boxed()))
})
}
}