Compare commits
2 Commits
v1.5.0
...
2713df2804
| Author | SHA1 | Date | |
|---|---|---|---|
|
2713df2804
|
|||
|
8d703167fb
|
120
Cargo.lock
generated
120
Cargo.lock
generated
@@ -173,60 +173,6 @@ version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "de45108900e1f9b9242f7f2e254aa3e2c029c921c258fe9e6b4217eeebd54288"
|
||||
dependencies = [
|
||||
"axum-core",
|
||||
"bytes",
|
||||
"form_urlencoded",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"http-body-util",
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"itoa",
|
||||
"matchit",
|
||||
"memchr",
|
||||
"mime",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustversion",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
"serde_urlencoded",
|
||||
"sync_wrapper",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-core"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"http",
|
||||
"http-body",
|
||||
"http-body-util",
|
||||
"mime",
|
||||
"pin-project-lite",
|
||||
"rustversion",
|
||||
"sync_wrapper",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.71"
|
||||
@@ -1161,12 +1107,6 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.14.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.15.2"
|
||||
@@ -1533,7 +1473,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"hashbrown 0.15.2",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1684,16 +1624,6 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "leon"
|
||||
version = "3.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42a865ffec5587961f5afc6d365bccb304f4feaa1928f4fe94c91c9d210d7310"
|
||||
dependencies = [
|
||||
"miette",
|
||||
"thiserror 2.0.12",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.172"
|
||||
@@ -1752,7 +1682,7 @@ version = "0.12.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
|
||||
dependencies = [
|
||||
"hashbrown 0.15.2",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1764,12 +1694,6 @@ dependencies = [
|
||||
"regex-automata 0.1.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchit"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
|
||||
|
||||
[[package]]
|
||||
name = "md5"
|
||||
version = "0.7.0"
|
||||
@@ -1782,29 +1706,6 @@ version = "2.7.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
|
||||
|
||||
[[package]]
|
||||
name = "miette"
|
||||
version = "7.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a955165f87b37fd1862df2a59547ac542c77ef6d17c666f619d1ad22dd89484"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"miette-derive",
|
||||
"thiserror 1.0.69",
|
||||
"unicode-width 0.1.14",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miette-derive"
|
||||
version = "7.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf45bf44ab49be92fd1227a3be6fc6f617f1a337c06af54981048574d8783147"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mime"
|
||||
version = "0.3.17"
|
||||
@@ -2862,16 +2763,6 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_path_to_error"
|
||||
version = "0.1.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "59fab13f937fa393d08645bf3a84bdfe86e296747b506ada67bb15f10f218b2a"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_urlencoded"
|
||||
version = "0.7.1"
|
||||
@@ -2965,7 +2856,6 @@ dependencies = [
|
||||
name = "siranga"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"bytes",
|
||||
"clap",
|
||||
"clio",
|
||||
@@ -2978,7 +2868,6 @@ dependencies = [
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"ldap3",
|
||||
"leon",
|
||||
"pin-project-lite",
|
||||
"rand 0.8.5",
|
||||
"ratatui",
|
||||
@@ -2986,7 +2875,6 @@ dependencies = [
|
||||
"russh",
|
||||
"thiserror 2.0.12",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"unicode-width 0.2.0",
|
||||
@@ -3317,8 +3205,6 @@ dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"hashbrown 0.14.5",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
@@ -3336,7 +3222,6 @@ dependencies = [
|
||||
"tokio",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3357,7 +3242,6 @@ version = "0.1.41"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
|
||||
dependencies = [
|
||||
"log",
|
||||
"pin-project-lite",
|
||||
"tracing-attributes",
|
||||
"tracing-core",
|
||||
|
||||
@@ -5,7 +5,6 @@ default-run = "siranga"
|
||||
license = "AGPL-3.0-only"
|
||||
|
||||
[dependencies]
|
||||
axum = "0.8.3"
|
||||
bytes = "1.10.1"
|
||||
clap = { version = "4.5.35", features = ["derive"] }
|
||||
clio = { version = "0.3.5", features = ["clap-parse"] }
|
||||
@@ -18,7 +17,6 @@ http-body-util = { version = "0.1.3", features = ["full"] }
|
||||
hyper = { version = "1.6.0", features = ["full"] }
|
||||
hyper-util = { version = "0.1.11", features = ["full"] }
|
||||
ldap3 = "0.11.5"
|
||||
leon = "3.0.2"
|
||||
pin-project-lite = "0.2.16"
|
||||
rand = "0.8.5"
|
||||
ratatui = { version = "0.29.0", features = ["unstable-backend-writer"] }
|
||||
@@ -26,7 +24,6 @@ reqwest = { version = "0.12.15", features = ["rustls-tls"] }
|
||||
russh = "0.51.1"
|
||||
thiserror = "2.0.12"
|
||||
tokio = { version = "1.44.2", features = ["full"] }
|
||||
tokio-util = { version = "0.7.14", features = ["rt"] }
|
||||
tracing = "0.1.41"
|
||||
tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter"] }
|
||||
unicode-width = "0.2.0"
|
||||
|
||||
@@ -31,12 +31,8 @@ spec:
|
||||
cpu: 50m
|
||||
memory: 100Mi
|
||||
ports:
|
||||
- name: ssh
|
||||
containerPort: 2222
|
||||
- name: http
|
||||
containerPort: 3000
|
||||
- name: metrics
|
||||
containerPort: 4000
|
||||
- containerPort: 3000
|
||||
- containerPort: 2222
|
||||
volumeMounts:
|
||||
- name: credentials
|
||||
readOnly: true
|
||||
@@ -55,23 +51,12 @@ spec:
|
||||
value: ldap://lldap.lldap.svc.cluster.local:3890
|
||||
- name: LDAP_BASE
|
||||
value: ou=people,dc=huizinga,dc=dev
|
||||
- name: LDAP_SEARCH_FILTER
|
||||
value: (uid={username})
|
||||
- name: LDAP_BIND_DN
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: siranga-lldap-credentials
|
||||
key: bind_dn
|
||||
value: uid=siranga.siranga,ou=people,dc=huizinga,dc=dev
|
||||
- name: LDAP_PASSWORD_FILE
|
||||
value: /secrets/credentials/password
|
||||
- name: PRIVATE_KEY_FILE
|
||||
value: /secrets/key/private.pem
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: metrics
|
||||
initialDelaySeconds: 3
|
||||
periodSeconds: 3
|
||||
volumes:
|
||||
- name: credentials
|
||||
secret:
|
||||
|
||||
@@ -6,7 +6,6 @@ spec:
|
||||
ports:
|
||||
- name: http
|
||||
port: 3000
|
||||
targetPort: http
|
||||
selector:
|
||||
app: siranga
|
||||
---
|
||||
@@ -21,6 +20,6 @@ spec:
|
||||
ports:
|
||||
- name: ssh
|
||||
port: 22
|
||||
targetPort: ssh
|
||||
targetPort: 2222
|
||||
selector:
|
||||
app: siranga
|
||||
|
||||
@@ -19,15 +19,15 @@ pub struct Stats {
|
||||
|
||||
impl Stats {
|
||||
pub fn add_connection(&self) {
|
||||
self.connections.fetch_add(1, Ordering::Relaxed);
|
||||
self.connections.store(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn add_rx_bytes(&self, n: usize) {
|
||||
self.rx.fetch_add(n, Ordering::Relaxed);
|
||||
self.rx.store(n, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn add_tx_bytes(&self, n: usize) {
|
||||
self.tx.fetch_add(n, Ordering::Relaxed);
|
||||
self.tx.store(n, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn connections(&self) -> usize {
|
||||
|
||||
55
src/ldap.rs
55
src/ldap.rs
@@ -1,16 +1,10 @@
|
||||
use ldap3::{LdapConnAsync, SearchEntry};
|
||||
use leon::{Template, vals};
|
||||
use russh::keys::PublicKey;
|
||||
use tokio::select;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Ldap {
|
||||
base: String,
|
||||
ldap: ldap3::Ldap,
|
||||
search_filter: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -23,24 +17,16 @@ pub enum LdapError {
|
||||
MissingEnvironmentVariable(&'static str),
|
||||
#[error("Could not read password file: {0}")]
|
||||
CouldNotReadPasswordFile(#[from] std::io::Error),
|
||||
#[error("Failed to parse search filter: {0}")]
|
||||
FailedToParseSearchFilter(#[from] leon::ParseError),
|
||||
#[error("Failed to render search filter: {0}")]
|
||||
FailedToRenderSearchFilter(#[from] leon::RenderError),
|
||||
}
|
||||
|
||||
impl Ldap {
|
||||
pub async fn start_from_env(
|
||||
token: CancellationToken,
|
||||
) -> Result<(Ldap, JoinHandle<()>), LdapError> {
|
||||
pub async fn start_from_env() -> Result<Ldap, LdapError> {
|
||||
let address = std::env::var("LDAP_ADDRESS")
|
||||
.map_err(|_| LdapError::MissingEnvironmentVariable("LDAP_ADDRESS"))?;
|
||||
let base = std::env::var("LDAP_BASE")
|
||||
.map_err(|_| LdapError::MissingEnvironmentVariable("LDAP_BASE"))?;
|
||||
let bind_dn = std::env::var("LDAP_BIND_DN")
|
||||
.map_err(|_| LdapError::MissingEnvironmentVariable("LDAP_BIND_DN"))?;
|
||||
let search_filter = std::env::var("LDAP_SEARCH_FILTER")
|
||||
.map_err(|_| LdapError::MissingEnvironmentVariable("LDAP_SEARCH_FILTER"))?;
|
||||
|
||||
let password = std::env::var("LDAP_PASSWORD_FILE").map_or_else(
|
||||
|_| {
|
||||
@@ -55,57 +41,24 @@ impl Ldap {
|
||||
)?;
|
||||
|
||||
let (conn, mut ldap) = LdapConnAsync::new(&address).await?;
|
||||
let handle = tokio::spawn(async move {
|
||||
select! {
|
||||
res = conn.drive() => {
|
||||
if let Err(err) = res {
|
||||
error!("LDAP connection error: {}", err);
|
||||
} else {
|
||||
error!("LDAP connection lost");
|
||||
token.cancel();
|
||||
}
|
||||
}
|
||||
_ = token.cancelled() => {
|
||||
debug!("Graceful shutdown");
|
||||
}
|
||||
}
|
||||
});
|
||||
ldap3::drive!(conn);
|
||||
|
||||
ldap.simple_bind(&bind_dn, &password).await?.success()?;
|
||||
|
||||
Ok((
|
||||
Self {
|
||||
base,
|
||||
ldap,
|
||||
search_filter,
|
||||
},
|
||||
handle,
|
||||
))
|
||||
Ok(Self { base, ldap })
|
||||
}
|
||||
|
||||
pub async fn get_ssh_keys(
|
||||
&mut self,
|
||||
user: impl AsRef<str>,
|
||||
) -> Result<Vec<PublicKey>, LdapError> {
|
||||
let search_filter = Template::parse(&self.search_filter)?;
|
||||
|
||||
let search_filter = search_filter.render(&&vals(|key| {
|
||||
if key == "username" {
|
||||
Some(user.as_ref().to_string().into())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}))?;
|
||||
|
||||
debug!("search_filter = {search_filter}");
|
||||
|
||||
Ok(self
|
||||
.ldap
|
||||
.search(
|
||||
&self.base,
|
||||
ldap3::Scope::Subtree,
|
||||
// TODO: Make this not hardcoded
|
||||
&search_filter,
|
||||
&format!("(uid={})", user.as_ref()),
|
||||
vec!["sshkeys"],
|
||||
)
|
||||
.await?
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
#![feature(let_chains)]
|
||||
#![feature(iter_intersperse)]
|
||||
#![feature(future_join)]
|
||||
mod helper;
|
||||
mod io;
|
||||
pub mod ldap;
|
||||
|
||||
111
src/main.rs
111
src/main.rs
@@ -1,13 +1,10 @@
|
||||
#![feature(future_join)]
|
||||
use std::future::join;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::routing::get;
|
||||
use axum::{Json, Router};
|
||||
use color_eyre::eyre::Context;
|
||||
use dotenvy::dotenv;
|
||||
use hyper::server::conn::http1::{self};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use rand::rngs::OsRng;
|
||||
use siranga::VERSION;
|
||||
use siranga::ldap::Ldap;
|
||||
@@ -15,51 +12,11 @@ use siranga::ssh::Server;
|
||||
use siranga::tunnel::Registry;
|
||||
use siranga::web::{ForwardAuth, Service};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::select;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{error, info, warn};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn sigterm() {
|
||||
use tokio::signal::unix::SignalKind;
|
||||
|
||||
let mut sigterm =
|
||||
tokio::signal::unix::signal(SignalKind::terminate()).expect("should be able to initialize");
|
||||
sigterm.recv().await;
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
async fn sigterm() {
|
||||
std::future::pending::<()>().await;
|
||||
}
|
||||
|
||||
async fn shutdown_task(token: CancellationToken) {
|
||||
select! {
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
debug!("Received SIGINT");
|
||||
}
|
||||
_ = sigterm() => {
|
||||
debug!("Received SIGTERM");
|
||||
}
|
||||
_ = token.cancelled() => {
|
||||
debug!("Application called for graceful shutdown");
|
||||
}
|
||||
}
|
||||
info!("Starting graceful shutdown");
|
||||
token.cancel();
|
||||
select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
|
||||
_ = tokio::signal::ctrl_c() => {}
|
||||
}
|
||||
}
|
||||
|
||||
async fn axum_graceful_shutdown(token: CancellationToken) {
|
||||
token.cancelled().await;
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> color_eyre::Result<()> {
|
||||
color_eyre::install()?;
|
||||
@@ -91,56 +48,46 @@ async fn main() -> color_eyre::Result<()> {
|
||||
russh::keys::PrivateKey::random(&mut OsRng, russh::keys::Algorithm::Ed25519)?
|
||||
};
|
||||
|
||||
let ssh_port = std::env::var("SSH_PORT")
|
||||
.map(|port| port.parse().wrap_err_with(|| format!("SSH_PORT={port}")))
|
||||
.unwrap_or(Ok(2222))?;
|
||||
let http_port = std::env::var("HTTP_PORT")
|
||||
.map(|port| port.parse().wrap_err_with(|| format!("HTTP_PORT={port}")))
|
||||
.unwrap_or(Ok(3000))?;
|
||||
let metrics_port = std::env::var("METRICS_PORT")
|
||||
.map(|port| {
|
||||
port.parse()
|
||||
.wrap_err_with(|| format!("METRICS_PORT={port}"))
|
||||
})
|
||||
.unwrap_or(Ok(4000))?;
|
||||
let ssh_port = std::env::var("SSH_PORT")
|
||||
.map(|port| port.parse().wrap_err_with(|| format!("SSH_PORT={port}")))
|
||||
.unwrap_or(Ok(2222))?;
|
||||
|
||||
let domain =
|
||||
std::env::var("TUNNEL_DOMAIN").unwrap_or_else(|_| format!("localhost:{http_port}"));
|
||||
let authz_address = std::env::var("AUTHZ_ENDPOINT").wrap_err("AUTHZ_ENDPOINT is not set")?;
|
||||
|
||||
let ldap = Ldap::start_from_env().await?;
|
||||
let registry = Registry::new(domain);
|
||||
|
||||
let token = CancellationToken::new();
|
||||
|
||||
let (ldap, ldap_handle) = Ldap::start_from_env(token.clone()).await?;
|
||||
|
||||
let ssh = Server::new(ldap, registry.clone(), token.clone());
|
||||
let ssh_addr = SocketAddr::from(([0, 0, 0, 0], ssh_port));
|
||||
let ssh_task = ssh.run(key, ssh_addr);
|
||||
info!("SSH is available on {ssh_addr}");
|
||||
let mut ssh = Server::new(ldap, registry.clone());
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], ssh_port));
|
||||
tokio::spawn(async move { ssh.run(key, addr).await });
|
||||
info!("SSH is available on {addr}");
|
||||
|
||||
let auth = ForwardAuth::new(authz_address);
|
||||
let service = Service::new(registry, auth);
|
||||
let http_addr = SocketAddr::from(([0, 0, 0, 0], http_port));
|
||||
let http_listener = TcpListener::bind(http_addr).await?;
|
||||
let http_task = service.serve(http_listener, token.clone());
|
||||
info!("HTTP is available on {http_addr}");
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], http_port));
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
info!("HTTP is available on {addr}");
|
||||
|
||||
let metrics_app = Router::new().route("/health", get(async || Json("healthy")));
|
||||
let metrics_addr = SocketAddr::from(([0, 0, 0, 0], metrics_port));
|
||||
let metrics_listener = TcpListener::bind(metrics_addr).await?;
|
||||
let metrics = axum::serve(metrics_listener, metrics_app)
|
||||
.with_graceful_shutdown(axum_graceful_shutdown(token.clone()));
|
||||
info!("Metrics are available on {http_addr}");
|
||||
// TODO: Graceful shutdown
|
||||
loop {
|
||||
let (stream, _) = listener.accept().await?;
|
||||
let io = TokioIo::new(stream);
|
||||
|
||||
select! {
|
||||
_ = join!(ldap_handle, ssh_task, http_task, metrics.into_future()) => {
|
||||
info!("Shutdown gracefully");
|
||||
let service = service.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = http1::Builder::new()
|
||||
.preserve_header_case(true)
|
||||
.title_case_headers(true)
|
||||
.serve_connection(io, service)
|
||||
.with_upgrades()
|
||||
.await
|
||||
{
|
||||
error!("Failed to serve connection: {err:?}");
|
||||
}
|
||||
_ = shutdown_task(token.clone()) => {
|
||||
error!("Failed to shutdown gracefully");
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -8,10 +8,8 @@ use ratatui::{Terminal, TerminalOptions, Viewport};
|
||||
use russh::ChannelId;
|
||||
use russh::keys::ssh_key::PublicKey;
|
||||
use russh::server::{Auth, Msg, Session};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
use super::renderer::Renderer;
|
||||
use crate::VERSION;
|
||||
use crate::io::{Input, TerminalHandle};
|
||||
use crate::ldap::{Ldap, LdapError};
|
||||
@@ -64,7 +62,7 @@ pub struct Handler {
|
||||
}
|
||||
|
||||
impl Handler {
|
||||
pub fn new(ldap: Ldap, registry: Registry, token: CancellationToken) -> Self {
|
||||
pub fn new(ldap: Ldap, registry: Registry) -> Self {
|
||||
Self {
|
||||
ldap,
|
||||
registry,
|
||||
@@ -72,7 +70,7 @@ impl Handler {
|
||||
user: None,
|
||||
pty_channel: None,
|
||||
|
||||
renderer: Renderer::new(token),
|
||||
renderer: Default::default(),
|
||||
selected: None,
|
||||
rename_input: None,
|
||||
}
|
||||
@@ -330,26 +328,6 @@ impl russh::server::Handler for Handler {
|
||||
Ok(session.channel_success(channel)?)
|
||||
}
|
||||
|
||||
async fn channel_close(
|
||||
&mut self,
|
||||
channel: ChannelId,
|
||||
session: &mut Session,
|
||||
) -> Result<(), Self::Error> {
|
||||
if let Some(pty_channel) = self.pty_channel
|
||||
&& pty_channel == channel
|
||||
{
|
||||
debug!("Pty channel closed");
|
||||
|
||||
session.disconnect(
|
||||
russh::Disconnect::ByApplication,
|
||||
"Remaining active connections have been closed",
|
||||
"EN",
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn tcpip_forward(
|
||||
&mut self,
|
||||
address: &str,
|
||||
|
||||
@@ -11,9 +11,7 @@ use russh::MethodKind;
|
||||
use russh::keys::PrivateKey;
|
||||
use russh::server::Server as _;
|
||||
use tokio::net::ToSocketAddrs;
|
||||
use tokio::select;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, warn};
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::ldap::Ldap;
|
||||
use crate::tunnel::Registry;
|
||||
@@ -21,30 +19,18 @@ use crate::tunnel::Registry;
|
||||
pub struct Server {
|
||||
ldap: Ldap,
|
||||
registry: Registry,
|
||||
token: CancellationToken,
|
||||
}
|
||||
|
||||
async fn graceful_shutdown(token: CancellationToken) {
|
||||
token.cancelled().await;
|
||||
let duration = 1;
|
||||
// All pty sessions will close once the token is cancelled, but to properly allow the sessions
|
||||
// to close the ssh server still needs to be driven, so we let it run a little bit longer.
|
||||
// TODO: Figure out a way to wait for all connections to be closed, would require also closing
|
||||
// non-pty sessions somehow
|
||||
debug!("Waiting for {duration}s before stopping");
|
||||
tokio::time::sleep(Duration::from_secs(duration)).await;
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn new(ldap: Ldap, registry: Registry, token: CancellationToken) -> Self {
|
||||
Server {
|
||||
ldap,
|
||||
registry,
|
||||
token,
|
||||
}
|
||||
pub fn new(ldap: Ldap, registry: Registry) -> Self {
|
||||
Server { ldap, registry }
|
||||
}
|
||||
|
||||
pub async fn run(mut self, key: PrivateKey, addr: impl ToSocketAddrs + Send + std::fmt::Debug) {
|
||||
pub fn run(
|
||||
&mut self,
|
||||
key: PrivateKey,
|
||||
addr: impl ToSocketAddrs + Send + std::fmt::Debug,
|
||||
) -> impl Future<Output = Result<(), std::io::Error>> + Send {
|
||||
let config = russh::server::Config {
|
||||
inactivity_timeout: Some(Duration::from_secs(3600)),
|
||||
auth_rejection_time: Duration::from_secs(1),
|
||||
@@ -61,17 +47,7 @@ impl Server {
|
||||
|
||||
debug!(?addr, "Running ssh");
|
||||
|
||||
let token = self.token.clone();
|
||||
select! {
|
||||
res = self.run_on_address(config, addr) => {
|
||||
if let Err(err) = res {
|
||||
error!("SSH Server error: {err}");
|
||||
}
|
||||
}
|
||||
_ = graceful_shutdown(token) => {
|
||||
debug!("Graceful shutdown");
|
||||
}
|
||||
}
|
||||
async move { self.run_on_address(config, addr).await }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,7 +55,7 @@ impl russh::server::Server for Server {
|
||||
type Handler = Handler;
|
||||
|
||||
fn new_client(&mut self, _peer_addr: Option<SocketAddr>) -> Self::Handler {
|
||||
Handler::new(self.ldap.clone(), self.registry.clone(), self.token.clone())
|
||||
Handler::new(self.ldap.clone(), self.registry.clone())
|
||||
}
|
||||
|
||||
fn handle_session_error(&mut self, error: <Self::Handler as russh::server::Handler>::Error) {
|
||||
|
||||
@@ -14,8 +14,7 @@ use ratatui::widgets::{
|
||||
use ratatui::{Frame, Terminal};
|
||||
use tokio::select;
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error};
|
||||
use tracing::error;
|
||||
use unicode_width::UnicodeWidthStr;
|
||||
|
||||
use crate::VERSION;
|
||||
@@ -37,8 +36,6 @@ struct RendererInner {
|
||||
rows: Vec<TunnelRow>,
|
||||
input: Option<String>,
|
||||
rx: UnboundedReceiver<Message>,
|
||||
|
||||
token: CancellationToken,
|
||||
}
|
||||
|
||||
fn compute_widths(rows: &Vec<Vec<Span<'static>>>) -> Vec<u16> {
|
||||
@@ -78,13 +75,12 @@ fn compute_column_skip(
|
||||
}
|
||||
|
||||
impl RendererInner {
|
||||
fn new(rx: UnboundedReceiver<Message>, token: CancellationToken) -> Self {
|
||||
fn new(rx: UnboundedReceiver<Message>) -> Self {
|
||||
Self {
|
||||
state: Default::default(),
|
||||
rows: Default::default(),
|
||||
input: None,
|
||||
rx,
|
||||
token,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -307,10 +303,6 @@ impl RendererInner {
|
||||
self.render(frame);
|
||||
})?;
|
||||
}
|
||||
_ = self.token.cancelled() => {
|
||||
debug!("Graceful shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -318,24 +310,16 @@ impl RendererInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct Renderer {
|
||||
tx: Option<UnboundedSender<Message>>,
|
||||
token: CancellationToken,
|
||||
}
|
||||
|
||||
impl Renderer {
|
||||
pub fn new(token: CancellationToken) -> Self {
|
||||
Self {
|
||||
tx: Default::default(),
|
||||
token,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(&mut self, terminal: Terminal<CrosstermBackend<TerminalHandle>>) {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
|
||||
let mut inner = RendererInner::new(rx, self.token.clone());
|
||||
let mut inner = RendererInner::new(rx);
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = inner.start(terminal).await {
|
||||
|
||||
150
src/web/mod.rs
150
src/web/mod.rs
@@ -10,14 +10,10 @@ use bytes::Bytes;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt as _, Empty};
|
||||
use hyper::body::Incoming;
|
||||
use hyper::header::{self, HOST, UPGRADE};
|
||||
use hyper::{Request, Response, StatusCode, client, server};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use hyper::client::conn::http1::Builder;
|
||||
use hyper::header::{self, HOST};
|
||||
use hyper::{Request, Response, StatusCode};
|
||||
use response::response;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::select;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tokio_util::task::TaskTracker;
|
||||
use tracing::{debug, error, trace, warn};
|
||||
|
||||
use crate::tunnel::{Registry, TunnelAccess};
|
||||
@@ -26,83 +22,11 @@ use crate::tunnel::{Registry, TunnelAccess};
|
||||
pub struct Service {
|
||||
registry: Registry,
|
||||
auth: ForwardAuth,
|
||||
task_tracker: TaskTracker,
|
||||
}
|
||||
|
||||
pub fn empty() -> BoxBody<Bytes, hyper::Error> {
|
||||
Empty::<Bytes>::new()
|
||||
.map_err(|never| match never {})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
fn copy_request_parts<T>(req: Request<T>) -> (Request<T>, Request<BoxBody<Bytes, hyper::Error>>) {
|
||||
let (parts, body) = req.into_parts();
|
||||
let req = Request::from_parts(parts.clone(), body);
|
||||
let forwarded_req = Request::from_parts(parts, empty());
|
||||
|
||||
(req, forwarded_req)
|
||||
}
|
||||
|
||||
fn copy_response_parts<T>(
|
||||
resp: Response<T>,
|
||||
) -> (Response<T>, Response<BoxBody<Bytes, hyper::Error>>) {
|
||||
let (parts, body) = resp.into_parts();
|
||||
let resp = Response::from_parts(parts.clone(), body);
|
||||
let forwarded_resp = Response::from_parts(parts, empty());
|
||||
|
||||
(resp, forwarded_resp)
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn new(registry: Registry, auth: ForwardAuth) -> Self {
|
||||
Self {
|
||||
registry,
|
||||
auth,
|
||||
task_tracker: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_connection(&self, listener: &TcpListener) -> std::io::Result<()> {
|
||||
let (stream, _) = listener.accept().await?;
|
||||
|
||||
let io = TokioIo::new(stream);
|
||||
let connection = server::conn::http1::Builder::new()
|
||||
.preserve_header_case(true)
|
||||
.title_case_headers(true)
|
||||
.serve_connection(io, self.clone())
|
||||
.with_upgrades();
|
||||
|
||||
self.task_tracker.spawn(async move {
|
||||
if let Err(err) = connection.await {
|
||||
error!("Failed to serve connection: {err:?}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn serve(self, listener: TcpListener, token: CancellationToken) {
|
||||
loop {
|
||||
select! {
|
||||
res = self.handle_connection(&listener) => {
|
||||
if let Err(err) = res {
|
||||
error!("Failed to accept connection: {err}")
|
||||
}
|
||||
}
|
||||
_ = token.cancelled() => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
"Waiting for {} connections to close",
|
||||
self.task_tracker.len()
|
||||
);
|
||||
self.task_tracker.close();
|
||||
self.task_tracker.wait().await;
|
||||
|
||||
debug!("Graceful shutdown");
|
||||
Self { registry, auth }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,9 +59,10 @@ impl hyper::service::Service<Request<Incoming>> for Service {
|
||||
|
||||
debug!(authority, "Tunnel request");
|
||||
|
||||
let s = self.clone();
|
||||
let registry = self.registry.clone();
|
||||
let auth = self.auth.clone();
|
||||
Box::pin(async move {
|
||||
let Some(entry) = s.registry.get(&authority).await else {
|
||||
let Some(entry) = registry.get(&authority).await else {
|
||||
debug!(tunnel = authority, "Unknown tunnel");
|
||||
let resp = response(StatusCode::NOT_FOUND, "Unknown tunnel");
|
||||
|
||||
@@ -145,7 +70,7 @@ impl hyper::service::Service<Request<Incoming>> for Service {
|
||||
};
|
||||
|
||||
if !entry.is_public().await {
|
||||
let user = match s.auth.check(req.method(), req.headers()).await {
|
||||
let user = match auth.check(req.method(), req.headers()).await {
|
||||
Ok(AuthStatus::Authenticated(user)) => user,
|
||||
Ok(AuthStatus::Unauthenticated(location)) => {
|
||||
let resp = Response::builder()
|
||||
@@ -204,72 +129,19 @@ impl hyper::service::Service<Request<Incoming>> for Service {
|
||||
}
|
||||
};
|
||||
|
||||
let (mut sender, conn) = client::conn::http1::Builder::new()
|
||||
let (mut sender, conn) = Builder::new()
|
||||
.preserve_header_case(true)
|
||||
.title_case_headers(true)
|
||||
.handshake(io)
|
||||
.await?;
|
||||
|
||||
let conn = conn.with_upgrades();
|
||||
s.task_tracker.spawn(async move {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = conn.await {
|
||||
warn!(runnel = authority, "Connection failed: {err}");
|
||||
}
|
||||
});
|
||||
|
||||
let (mut req, forwarded_req) = copy_request_parts(req);
|
||||
|
||||
let resp = sender.send_request(forwarded_req).await?;
|
||||
|
||||
if req.headers().contains_key(UPGRADE)
|
||||
&& req.headers().get(UPGRADE) == resp.headers().get(UPGRADE)
|
||||
{
|
||||
let (mut resp, forwarded_resp) = copy_response_parts(resp);
|
||||
|
||||
debug!("UPGRADE established");
|
||||
match hyper::upgrade::on(&mut resp).await {
|
||||
Ok(upgraded_resp) => {
|
||||
s.task_tracker.spawn(async move {
|
||||
match hyper::upgrade::on(&mut req).await {
|
||||
Ok(upgraded_req) => {
|
||||
let mut upgraded_req = TokioIo::new(upgraded_req);
|
||||
let mut upgraded_resp = TokioIo::new(upgraded_resp);
|
||||
|
||||
match tokio::io::copy_bidirectional(
|
||||
&mut upgraded_req,
|
||||
&mut upgraded_resp,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((rx, tx)) => {
|
||||
debug!(
|
||||
"Received {rx} bytes and send {tx} bytes over upgraded tunnel"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
// Likely due to channel being closed
|
||||
// TODO: Show warning if not channel closed, otherwise ignore
|
||||
debug!("Upgraded connection error: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Failed to upgrade: {err}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return Ok(forwarded_resp.map(|b| b.boxed()));
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Failed to upgrade req: {err}");
|
||||
return Ok(response(StatusCode::BAD_REQUEST, "Failed to upgrade"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!("{resp:#?}");
|
||||
|
||||
let resp = sender.send_request(req).await?;
|
||||
Ok(resp.map(|b| b.boxed()))
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user