use anyhow::Result; use std::time::Duration; use tokio::time::sleep; use tracing::error; use tracing::info; use tracing::info_span; use tracing::Instrument; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::EnvFilter; use wtransport::endpoint::IncomingSession; use wtransport::tls::Certificate; use wtransport::ServerConfig; use wtransport::{Connection, Endpoint}; #[tokio::main] async fn main() -> Result<()> { init_logging(); let config = ServerConfig::builder() .with_bind_default(4433) .with_certificate(Certificate::load("cert.crt", "cert.key")?) .keep_alive_interval(Some(Duration::from_secs(3))) .build(); let server = Endpoint::server(config)?; info!("Server listening at localhost:4433"); for id in 0.. { let incoming_session = server.accept().await; tokio::spawn(handle_connection(incoming_session).instrument(info_span!("Connection", id))); } Ok(()) } async fn handle_connection(incoming_session: IncomingSession) { let result = handle_connection_impl(incoming_session).await; error!("{:?}", result); } async fn handle_connection_impl(incoming_session: IncomingSession) -> Result<()> { let mut buffer = vec![0; 65536].into_boxed_slice(); info!("Waiting for session request..."); let session_request = incoming_session.await?; info!( "New session: Authority: '{}', Path: '{}'", session_request.authority(), session_request.path() ); let connection = session_request.accept().await?; info!("Waiting for data from client..."); open_bidir_stream(&connection).await?; open_uni_stream(&connection).await?; loop { tokio::select! { stream = connection.accept_bi() => { let mut stream = stream?; info!("Accepted BI stream"); let bytes_read = match stream.1.read(&mut buffer).await? { Some(bytes_read) => bytes_read, None => continue, }; let str_data = std::str::from_utf8(&buffer[..bytes_read])?; info!("Received (bi) '{str_data}' from client"); stream.0.write_all(b"ACK").await?; } stream = connection.accept_uni() => { let mut stream = stream?; info!("Accepted UNI stream"); let bytes_read = match stream.read(&mut buffer).await? { Some(bytes_read) => bytes_read, None => continue, }; let str_data = std::str::from_utf8(&buffer[..bytes_read])?; info!("Received (uni) '{str_data}' from client"); let mut stream = connection.open_uni().await?.await?; stream.write_all(b"ACK").await?; } dgram = connection.receive_datagram() => { let dgram = dgram?; let str_data = std::str::from_utf8(&dgram)?; info!("Received (dgram) '{str_data}' from client"); connection.send_datagram(b"ACK")?; } } } } async fn open_bidir_stream(connection: &Connection) -> Result<()> { sleep(Duration::from_millis(2000)).await; let mut buffer = vec![0; 65536].into_boxed_slice(); let mut stream = connection.open_bi().await?.await?; info!("Opened BI stream"); stream.0.write_all(b"Hello from server").await?; info!("Sent 'Hello from server' to client"); let bytes_read = match stream.1.read(&mut buffer).await? { Some(bytes_read) => bytes_read, None => { info!("Server opened bidir stream closed"); return Ok(()); } }; let str_data = std::str::from_utf8(&buffer[..bytes_read])?; info!("Received (bi) '{str_data}' from client"); stream.0.write_all(b"ACK").await?; Ok(()) } async fn open_uni_stream(connection: &Connection) -> Result<()> { sleep(Duration::from_millis(2000)).await; let mut stream = connection.open_uni().await?.await?; info!("Opened UNI stream"); stream.write_all(b"Hello from server").await?; info!("Sent 'Hello from server' to client"); Ok(()) } fn init_logging() { let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy(); tracing_subscriber::fmt() .with_target(true) .with_level(true) .with_env_filter(env_filter) .init(); }