From b2f9e622ef6cc81d0d798878b3cb951a81bd5a54 Mon Sep 17 00:00:00 2001 From: Adam Doyle Date: Fri, 11 Oct 2024 00:26:24 -0400 Subject: [PATCH] single thread > lockup --- server/src/game_handler.rs | 1 - server/src/main.rs | 74 +++++++++++++++++++++----------------- server/src/user_handler.rs | 13 ++++--- server/src/websocket.rs | 13 +++---- 4 files changed, 54 insertions(+), 47 deletions(-) diff --git a/server/src/game_handler.rs b/server/src/game_handler.rs index cab3189..0247e2a 100644 --- a/server/src/game_handler.rs +++ b/server/src/game_handler.rs @@ -391,7 +391,6 @@ impl GameHandler { // Register game to user self.register_user_in_game(game_id.clone(), host.read().unwrap().uuid.clone()); - self.send_game_state_update_all(vec![game_id.clone()]); self.send_game_meta_update(vec![game_id]); diff --git a/server/src/main.rs b/server/src/main.rs index 9a14ab9..0944083 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,22 +15,23 @@ use tokio::sync::{broadcast, mpsc}; use tower::ServiceBuilder; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tower_http::{compression::CompressionLayer, services::ServeDir}; -// use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tracing_subscriber::prelude::*; use user_handler::UserHandler; -#[tokio::main] +#[tokio::main(flavor = "current_thread")] +// #[tokio::main] async fn main() -> Result<()> { - // // stuff for logging - // tracing_subscriber::registry() - // .with( - // tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { - // "server=trace,tower_http=trace,lib=trace,tokio=trace,runtime=trace".into() - // }), - // ) - // .with(tracing_subscriber::fmt::layer()) - // .init(); - - console_subscriber::init(); + // stuff for logging + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { + "server=trace,tower_http=trace,lib=trace".into() + // "server=trace,tower_http=trace,lib=trace,tokio=trace,runtime=trace".into() + }), + ) + // .with(console_subscriber::ConsoleLayer::builder().spawn()) + .with(tracing_subscriber::fmt::layer()) + .init(); // Handle command-line args let matches = command!() @@ -67,10 +68,10 @@ async fn main() -> Result<()> { }); // Set up state - let (broadcast_tx, _rx) = broadcast::channel(1000); - let (users_tx, mut users_rx) = mpsc::channel(1000); - let (messages_tx, mut messages_rx) = mpsc::channel(1000); - let (games_tx, mut games_rx) = mpsc::channel(1000); + let (broadcast_tx, _rx) = broadcast::channel(1000000); + let (users_tx, mut users_rx) = mpsc::channel(1000000); + let (messages_tx, mut messages_rx) = mpsc::channel(1000000); + let (games_tx, mut games_rx) = mpsc::channel(1000000); let first_names = load_names("data/first.txt")?; let last_names = load_names("data/last.txt")?; let reserved_names = RwLock::new(HashSet::::new()); @@ -101,30 +102,39 @@ async fn main() -> Result<()> { // Spawn task to handle incoming messages, also handles outging messages let message_handler = MessageHandler::new(app_state.clone()); - tokio::spawn(async move { - while let Some((addr, message)) = messages_rx.recv().await { - message_handler.handle(addr, message).await; - } - }); + tokio::task::Builder::new() + .name("Message Handler") + .spawn(async move { + while let Some((addr, message)) = messages_rx.recv().await { + message_handler.handle(addr, message).await + } + }) + .unwrap(); // TODO: Restart handler threads if they crash // TODO: Make an outgoing message handler handler? // Spawn task to handle User things let user_handler = UserHandler::new(app_state.clone()); - tokio::spawn(async move { - while let Some(message) = users_rx.recv().await { - user_handler.handle(message).await; - } - }); + tokio::task::Builder::new() + .name("User Handler") + .spawn(async move { + while let Some(message) = users_rx.recv().await { + user_handler.handle(message).await + } + }) + .unwrap(); // Spawn task to handle Game things let game_handler = GameHandler::new(app_state.clone()); - tokio::spawn(async move { - while let Some(message) = games_rx.recv().await { - game_handler.handle(message).await; - } - }); + tokio::task::Builder::new() + .name("Game Handler") + .spawn(async move { + while let Some(message) = games_rx.recv().await { + game_handler.handle(message).await + } + }) + .unwrap(); // Router let app = Router::new() diff --git a/server/src/user_handler.rs b/server/src/user_handler.rs index 3acc242..eb7cbdf 100644 --- a/server/src/user_handler.rs +++ b/server/src/user_handler.rs @@ -376,17 +376,20 @@ pub fn meta_server_summary_update(state: &Arc) -> String { /// Generate games list update pub fn meta_games_browser_update(state: &Arc) -> String { // TODO: this may get expensive if there are many games - let mut games = vec![]; - for game in state.games.read().unwrap().values() { - games.push(GameBrowserMeta { + let games = state + .games + .read() + .unwrap() + .values() + .map(|game| GameBrowserMeta { uuid: game.read().unwrap().uuid.to_string(), name: game.read().unwrap().name.clone(), host: game.read().unwrap().host.read().unwrap().name.clone(), players: game.read().unwrap().players.len(), packs: game.read().unwrap().packs.clone(), - }); - } + }) + .collect::>(); to_string::(&GamesUpdate { games }).unwrap() } diff --git a/server/src/websocket.rs b/server/src/websocket.rs index 1bf68cf..25a20b8 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -31,7 +31,7 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc, ad let (mut sender, mut receiver) = stream.split(); // Create channel for direct messages - let (dm_tx, mut dm_rx) = mpsc::channel(30); + let (dm_tx, mut dm_rx) = mpsc::channel(1000000); let mut map = HashMap::new(); map.insert(addr, dm_tx.clone()); @@ -51,14 +51,9 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc, ad } } - state - .users_tx - .send(UserHandlerMessage::NewUser( - User::new(username, dm_tx.clone()), - addr, - )) - .await - .expect("User handler is down"); + let tx = state.users_tx.clone(); + let msg = UserHandlerMessage::NewUser(User::new(username, dm_tx.clone()), addr); + tokio::spawn(async move { tx.send(msg).await.expect("User handler is down") }); // Subscribe to receive from global broadcast channel let mut rx = state.broadcast_tx.subscribe();