Moved tunnel service impl to correct file

This commit is contained in:
Dreaded_X 2025-04-05 02:10:01 +02:00
parent 516e618f93
commit add640eaaf
Signed by: Dreaded_X
GPG Key ID: FA5F485356B0D2D4
2 changed files with 86 additions and 84 deletions

View File

@ -1,12 +1,5 @@
use std::{collections::HashSet, net::SocketAddr, pin::Pin, sync::Arc, time::Duration}; use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use bytes::Bytes;
use http_body_util::{BodyExt as _, Full, combinators::BoxBody};
use hyper::{
Request, Response, StatusCode, body::Incoming, client::conn::http1::Builder, header::HOST,
service::Service,
};
use hyper_util::rt::TokioIo;
use russh::{ use russh::{
ChannelId, ChannelId,
keys::PrivateKey, keys::PrivateKey,
@ -16,7 +9,7 @@ use tokio::{
net::ToSocketAddrs, net::ToSocketAddrs,
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
}; };
use tracing::{debug, error, trace, warn}; use tracing::{debug, error};
use crate::tunnel::{Tunnel, Tunnels}; use crate::tunnel::{Tunnel, Tunnels};
@ -195,78 +188,3 @@ impl russh::server::Server for Server {
error!("Session error: {error:#?}"); error!("Session error: {error:#?}");
} }
} }
impl Service<Request<Incoming>> for Tunnels {
type Response = Response<BoxBody<Bytes, hyper::Error>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn call(&self, req: Request<Incoming>) -> Self::Future {
fn response(
status_code: StatusCode,
body: impl Into<String>,
) -> Response<BoxBody<Bytes, hyper::Error>> {
Response::builder()
.status(status_code)
.body(Full::new(Bytes::from(body.into())))
.unwrap()
.map(|b| b.map_err(|never| match never {}).boxed())
}
trace!(?req);
let Some(authority) = req
.uri()
.authority()
.as_ref()
.map(|a| a.to_string())
.or_else(|| {
req.headers()
.get(HOST)
.map(|h| h.to_str().unwrap().to_owned())
})
else {
let resp = response(StatusCode::BAD_REQUEST, "Missing authority or host header");
return Box::pin(async { Ok(resp) });
};
debug!("Request for {authority:?}");
let tunnels = self.clone();
Box::pin(async move {
let Some(tunnel) = tunnels.get_tunnel(&authority).await else {
let resp = response(StatusCode::NOT_FOUND, "Unknown tunnel");
return Ok::<_, hyper::Error>(resp);
};
debug!("Opening channel");
let channel = match tunnel.open_tunnel().await {
Ok(channel) => channel,
Err(err) => {
warn!("Failed to open tunnel: {err}");
let resp = response(StatusCode::INTERNAL_SERVER_ERROR, "Failed to open tunnel");
return Ok::<_, hyper::Error>(resp);
}
};
let io = TokioIo::new(channel.into_stream());
let (mut sender, conn) = Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.handshake(io)
.await?;
tokio::spawn(async move {
if let Err(err) = conn.await {
warn!("Connection failed: {err}");
}
});
let resp = sender.send_request(req).await.unwrap();
Ok(resp.map(|b| b.boxed()))
})
}
}

View File

@ -1,7 +1,16 @@
use bytes::Bytes;
use http_body_util::{BodyExt as _, Full, combinators::BoxBody};
use hyper::{
Request, Response, StatusCode, body::Incoming, client::conn::http1::Builder, header::HOST,
service::Service,
};
use hyper_util::rt::TokioIo;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
pin::Pin,
sync::Arc, sync::Arc,
}; };
use tracing::{debug, trace, warn};
use russh::{ use russh::{
Channel, Channel,
@ -83,3 +92,78 @@ impl Default for Tunnels {
Self::new() Self::new()
} }
} }
impl Service<Request<Incoming>> for Tunnels {
type Response = Response<BoxBody<Bytes, hyper::Error>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn call(&self, req: Request<Incoming>) -> Self::Future {
fn response(
status_code: StatusCode,
body: impl Into<String>,
) -> Response<BoxBody<Bytes, hyper::Error>> {
Response::builder()
.status(status_code)
.body(Full::new(Bytes::from(body.into())))
.unwrap()
.map(|b| b.map_err(|never| match never {}).boxed())
}
trace!(?req);
let Some(authority) = req
.uri()
.authority()
.as_ref()
.map(|a| a.to_string())
.or_else(|| {
req.headers()
.get(HOST)
.map(|h| h.to_str().unwrap().to_owned())
})
else {
let resp = response(StatusCode::BAD_REQUEST, "Missing authority or host header");
return Box::pin(async { Ok(resp) });
};
debug!("Request for {authority:?}");
let tunnels = self.clone();
Box::pin(async move {
let Some(tunnel) = tunnels.get_tunnel(&authority).await else {
let resp = response(StatusCode::NOT_FOUND, "Unknown tunnel");
return Ok::<_, hyper::Error>(resp);
};
debug!("Opening channel");
let channel = match tunnel.open_tunnel().await {
Ok(channel) => channel,
Err(err) => {
warn!("Failed to open tunnel: {err}");
let resp = response(StatusCode::INTERNAL_SERVER_ERROR, "Failed to open tunnel");
return Ok::<_, hyper::Error>(resp);
}
};
let io = TokioIo::new(channel.into_stream());
let (mut sender, conn) = Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.handshake(io)
.await?;
tokio::spawn(async move {
if let Err(err) = conn.await {
warn!("Connection failed: {err}");
}
});
let resp = sender.send_request(req).await.unwrap();
Ok(resp.map(|b| b.boxed()))
})
}
}