6 Commits

Author SHA1 Message Date
e9673211c1 Added liveness probe
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 5m57s
2025-04-21 03:22:34 +02:00
f0bf60c78a Use named container ports 2025-04-21 02:36:59 +02:00
8cafe2b3ca Added support for upgrade requests
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 6m9s
2025-04-21 02:21:22 +02:00
ed7770f792 Fixed spelling of shutdown during forceful shutdown 2025-04-21 02:21:22 +02:00
dc1f75aee3 Close any remaining connections once the tui exits 2025-04-21 02:21:22 +02:00
0fe043acb5 Revert "Use store instead of fetch_add for atomics"
All checks were successful
Build and deploy / Build container and manifests (push) Successful in 6m1s
This reverts commit d4bd0ef1ca.
2025-04-21 02:21:18 +02:00
8 changed files with 242 additions and 41 deletions

73
Cargo.lock generated
View File

@@ -173,6 +173,60 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "axum"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de45108900e1f9b9242f7f2e254aa3e2c029c921c258fe9e6b4217eeebd54288"
dependencies = [
"axum-core",
"bytes",
"form_urlencoded",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-core"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "backtrace"
version = "0.3.71"
@@ -1700,6 +1754,12 @@ dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]]
name = "md5"
version = "0.7.0"
@@ -2769,6 +2829,16 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59fab13f937fa393d08645bf3a84bdfe86e296747b506ada67bb15f10f218b2a"
dependencies = [
"itoa",
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@@ -2862,6 +2932,7 @@ dependencies = [
name = "siranga"
version = "0.0.0"
dependencies = [
"axum",
"bytes",
"clap",
"clio",
@@ -3231,6 +3302,7 @@ dependencies = [
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -3251,6 +3323,7 @@ version = "0.1.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",

View File

@@ -5,6 +5,7 @@ default-run = "siranga"
license = "AGPL-3.0-only"
[dependencies]
axum = "0.8.3"
bytes = "1.10.1"
clap = { version = "4.5.35", features = ["derive"] }
clio = { version = "0.3.5", features = ["clap-parse"] }

View File

@@ -31,8 +31,12 @@ spec:
cpu: 50m
memory: 100Mi
ports:
- containerPort: 3000
- containerPort: 2222
- name: ssh
containerPort: 2222
- name: http
containerPort: 3000
- name: metrics
containerPort: 4000
volumeMounts:
- name: credentials
readOnly: true
@@ -57,6 +61,12 @@ spec:
value: /secrets/credentials/password
- name: PRIVATE_KEY_FILE
value: /secrets/key/private.pem
livenessProbe:
httpGet:
path: /health
port: metrics
initialDelaySeconds: 3
periodSeconds: 3
volumes:
- name: credentials
secret:

View File

@@ -6,6 +6,7 @@ spec:
ports:
- name: http
port: 3000
targetPort: http
selector:
app: siranga
---
@@ -20,6 +21,6 @@ spec:
ports:
- name: ssh
port: 22
targetPort: 2222
targetPort: ssh
selector:
app: siranga

View File

@@ -19,15 +19,15 @@ pub struct Stats {
impl Stats {
pub fn add_connection(&self) {
self.connections.store(1, Ordering::Relaxed);
self.connections.fetch_add(1, Ordering::Relaxed);
}
pub fn add_rx_bytes(&self, n: usize) {
self.rx.store(n, Ordering::Relaxed);
self.rx.fetch_add(n, Ordering::Relaxed);
}
pub fn add_tx_bytes(&self, n: usize) {
self.tx.store(n, Ordering::Relaxed);
self.tx.fetch_add(n, Ordering::Relaxed);
}
pub fn connections(&self) -> usize {

View File

@@ -4,6 +4,8 @@ use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;
use axum::routing::get;
use axum::{Json, Router};
use color_eyre::eyre::Context;
use dotenvy::dotenv;
use rand::rngs::OsRng;
@@ -54,6 +56,10 @@ async fn shutdown_task(token: CancellationToken) {
}
}
async fn axum_graceful_shutdown(token: CancellationToken) {
token.cancelled().await;
}
#[tokio::main]
async fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
@@ -85,12 +91,18 @@ async fn main() -> color_eyre::Result<()> {
russh::keys::PrivateKey::random(&mut OsRng, russh::keys::Algorithm::Ed25519)?
};
let http_port = std::env::var("HTTP_PORT")
.map(|port| port.parse().wrap_err_with(|| format!("HTTP_PORT={port}")))
.unwrap_or(Ok(3000))?;
let ssh_port = std::env::var("SSH_PORT")
.map(|port| port.parse().wrap_err_with(|| format!("SSH_PORT={port}")))
.unwrap_or(Ok(2222))?;
let http_port = std::env::var("HTTP_PORT")
.map(|port| port.parse().wrap_err_with(|| format!("HTTP_PORT={port}")))
.unwrap_or(Ok(3000))?;
let metrics_port = std::env::var("METRICS_PORT")
.map(|port| {
port.parse()
.wrap_err_with(|| format!("METRICS_PORT={port}"))
})
.unwrap_or(Ok(4000))?;
let domain =
std::env::var("TUNNEL_DOMAIN").unwrap_or_else(|_| format!("localhost:{http_port}"));
@@ -114,8 +126,15 @@ async fn main() -> color_eyre::Result<()> {
let http_task = service.serve(http_listener, token.clone());
info!("HTTP is available on {http_addr}");
let metrics_app = Router::new().route("/health", get(async || Json("healthy")));
let metrics_addr = SocketAddr::from(([0, 0, 0, 0], metrics_port));
let metrics_listener = TcpListener::bind(metrics_addr).await?;
let metrics = axum::serve(metrics_listener, metrics_app)
.with_graceful_shutdown(axum_graceful_shutdown(token.clone()));
info!("Metrics are available on {http_addr}");
select! {
_ = join!(ldap_handle, ssh_task, http_task) => {
_ = join!(ldap_handle, ssh_task, http_task, metrics.into_future()) => {
info!("Shutdown gracefully");
}
_ = shutdown_task(token.clone()) => {

View File

@@ -330,6 +330,26 @@ impl russh::server::Handler for Handler {
Ok(session.channel_success(channel)?)
}
async fn channel_close(
&mut self,
channel: ChannelId,
session: &mut Session,
) -> Result<(), Self::Error> {
if let Some(pty_channel) = self.pty_channel
&& pty_channel == channel
{
debug!("Pty channel closed");
session.disconnect(
russh::Disconnect::ByApplication,
"Remaining active connections have been closed",
"EN",
)?;
}
Ok(())
}
async fn tcpip_forward(
&mut self,
address: &str,

View File

@@ -1,7 +1,6 @@
mod auth;
mod response;
use std::future::join;
use std::ops::Deref;
use std::pin::Pin;
@@ -11,16 +10,14 @@ use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt as _, Empty};
use hyper::body::Incoming;
use hyper::client::conn::http1::Builder;
use hyper::header::{self, HOST};
use hyper::server::conn::http1;
use hyper::{Request, Response, StatusCode};
use hyper::header::{self, HOST, UPGRADE};
use hyper::{Request, Response, StatusCode, client, server};
use hyper_util::rt::TokioIo;
use hyper_util::server::graceful::GracefulShutdown;
use response::response;
use tokio::net::TcpListener;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tracing::{debug, error, trace, warn};
use crate::tunnel::{Registry, TunnelAccess};
@@ -29,29 +26,53 @@ use crate::tunnel::{Registry, TunnelAccess};
pub struct Service {
registry: Registry,
auth: ForwardAuth,
task_tracker: TaskTracker,
}
pub fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}
fn copy_request_parts<T>(req: Request<T>) -> (Request<T>, Request<BoxBody<Bytes, hyper::Error>>) {
let (parts, body) = req.into_parts();
let req = Request::from_parts(parts.clone(), body);
let forwarded_req = Request::from_parts(parts, empty());
(req, forwarded_req)
}
fn copy_response_parts<T>(
resp: Response<T>,
) -> (Response<T>, Response<BoxBody<Bytes, hyper::Error>>) {
let (parts, body) = resp.into_parts();
let resp = Response::from_parts(parts.clone(), body);
let forwarded_resp = Response::from_parts(parts, empty());
(resp, forwarded_resp)
}
impl Service {
pub fn new(registry: Registry, auth: ForwardAuth) -> Self {
Self { registry, auth }
Self {
registry,
auth,
task_tracker: Default::default(),
}
}
pub async fn handle_connection(
&self,
listener: &TcpListener,
graceful_shutdown: &GracefulShutdown,
) -> std::io::Result<()> {
pub async fn handle_connection(&self, listener: &TcpListener) -> std::io::Result<()> {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
let connection = http1::Builder::new()
let connection = server::conn::http1::Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.serve_connection(io, self.clone());
.serve_connection(io, self.clone())
.with_upgrades();
let connection = graceful_shutdown.watch(connection);
tokio::spawn(async move {
self.task_tracker.spawn(async move {
if let Err(err) = connection.await {
error!("Failed to serve connection: {err:?}");
}
@@ -61,22 +82,27 @@ impl Service {
}
pub async fn serve(self, listener: TcpListener, token: CancellationToken) {
let graceful_shutdown = GracefulShutdown::new();
loop {
select! {
res = self.handle_connection(&listener, &graceful_shutdown) => {
res = self.handle_connection(&listener) => {
if let Err(err) = res {
error!("Failed to accept connection: {err}")
}
}
_ = token.cancelled() => {
debug!("Graceful shutdown");
break;
}
}
}
graceful_shutdown.shutdown().await;
debug!(
"Waiting for {} connections to close",
self.task_tracker.len()
);
self.task_tracker.close();
self.task_tracker.wait().await;
debug!("Graceful shutdown");
}
}
@@ -109,10 +135,9 @@ impl hyper::service::Service<Request<Incoming>> for Service {
debug!(authority, "Tunnel request");
let registry = self.registry.clone();
let auth = self.auth.clone();
let s = self.clone();
Box::pin(async move {
let Some(entry) = registry.get(&authority).await else {
let Some(entry) = s.registry.get(&authority).await else {
debug!(tunnel = authority, "Unknown tunnel");
let resp = response(StatusCode::NOT_FOUND, "Unknown tunnel");
@@ -120,7 +145,7 @@ impl hyper::service::Service<Request<Incoming>> for Service {
};
if !entry.is_public().await {
let user = match auth.check(req.method(), req.headers()).await {
let user = match s.auth.check(req.method(), req.headers()).await {
Ok(AuthStatus::Authenticated(user)) => user,
Ok(AuthStatus::Unauthenticated(location)) => {
let resp = Response::builder()
@@ -179,21 +204,73 @@ impl hyper::service::Service<Request<Incoming>> for Service {
}
};
let (mut sender, conn) = Builder::new()
let (mut sender, conn) = client::conn::http1::Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.handshake(io)
.await?;
let conn = async {
let conn = conn.with_upgrades();
s.task_tracker.spawn(async move {
if let Err(err) = conn.await {
warn!(runnel = authority, "Connection failed: {err}");
}
};
});
let (resp, _) = join!(sender.send_request(req), conn).await;
let (mut req, forwarded_req) = copy_request_parts(req);
Ok(resp?.map(|b| b.boxed()))
let resp = sender.send_request(forwarded_req).await?;
if req.headers().contains_key(UPGRADE)
&& req.headers().get(UPGRADE) == resp.headers().get(UPGRADE)
{
let (mut resp, forwarded_resp) = copy_response_parts(resp);
debug!("UPGRADE established");
match hyper::upgrade::on(&mut resp).await {
Ok(upgraded_resp) => {
s.task_tracker.spawn(async move {
match hyper::upgrade::on(&mut req).await {
Ok(upgraded_req) => {
let mut upgraded_req = TokioIo::new(upgraded_req);
let mut upgraded_resp = TokioIo::new(upgraded_resp);
match tokio::io::copy_bidirectional(
&mut upgraded_req,
&mut upgraded_resp,
)
.await
{
Ok((rx, tx)) => {
debug!(
"Received {rx} bytes and send {tx} bytes over upgraded tunnel"
);
}
Err(err) => {
// Likely due to channel being closed
// TODO: Show warning if not channel closed, otherwise ignore
debug!("Upgraded connection error: {err:?}");
}
}
}
Err(err) => {
error!("Failed to upgrade: {err}");
}
}
});
return Ok(forwarded_resp.map(|b| b.boxed()));
}
Err(err) => {
error!("Failed to upgrade req: {err}");
return Ok(response(StatusCode::BAD_REQUEST, "Failed to upgrade"));
}
}
}
trace!("{resp:#?}");
Ok(resp.map(|b| b.boxed()))
})
}
}