From e1c0cf185ca2dd24b8b7ea84b776e060a0fa1753 Mon Sep 17 00:00:00 2001 From: Adam Doyle Date: Fri, 13 Dec 2024 04:38:14 -0500 Subject: [PATCH] let user handler own user data --- server/src/game_handler.rs | 260 ++++++++++++------------- server/src/incoming_message_handler.rs | 144 +++++++------- server/src/lib.rs | 7 +- server/src/main.rs | 9 +- server/src/user_handler.rs | 219 +++++++++++++++------ server/src/websocket.rs | 68 ++++--- 6 files changed, 403 insertions(+), 304 deletions(-) diff --git a/server/src/game_handler.rs b/server/src/game_handler.rs index cdb7dfe..538546d 100644 --- a/server/src/game_handler.rs +++ b/server/src/game_handler.rs @@ -5,18 +5,22 @@ use crate::DmUserMethod::*; use crate::GameHandlerMessage::*; use crate::OutgoingMessageHandlerMessage::*; use crate::SendUserMessage::*; -use crate::UserHandlerMessage::*; +use crate::User; +use crate::UserHandlerMessage; use axum::extract::ws::Message; use lib::*; -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; use tokio::sync::mpsc::Sender; /// For interacting with the game handler pub enum GameHandlerMessage { - NewGame(NewGameRequest, SocketAddr), - JoinGame(String, SocketAddr), - MoveRequest(PlayerMoveRequest, SocketAddr), - JudgeDecision(JudgeDecisionRequest, SocketAddr), + NewGame(NewGameRequest, Arc>), + JoinGame(String, Arc>), + MoveRequest(PlayerMoveRequest, Arc>), + JudgeDecision(JudgeDecisionRequest, String), DeleteGame(GameDeleteRequest), SendGameStateUpdate(Vec), SendGameMetaUpdate(Vec), @@ -54,10 +58,14 @@ impl GameHandler { /// Handles incoming messages pub async fn handle(&mut self, message: GameHandlerMessage) { match message { - NewGame(request, addr) => self.create_new_game(request, addr).await, - JoinGame(request, addr) => self.join_game(request, addr).await, - MoveRequest(request, addr) => self.handle_player_move(request, addr).await, - JudgeDecision(request, addr) => self.handle_judging(request, addr).await, + NewGame(request, host) => self.create_new_game(request, host).await, + JoinGame(request, user) => self.join_game(request, user).await, + MoveRequest(move_request, player_user) => { + self.handle_player_move(move_request, player_user).await + } + JudgeDecision(request, player_user_id) => { + self.handle_judging(request, player_user_id).await + } DeleteGame(request) => self.delete_game(request).await, SendGameStateUpdate(game_ids) => self.send_game_state_update_all(game_ids), SendGameMetaUpdate(game_ids) => self.send_game_meta_update(game_ids), @@ -78,20 +86,15 @@ impl GameHandler { } /// Process judging - async fn handle_judging(&mut self, request: JudgeDecisionRequest, addr: SocketAddr) { + async fn handle_judging(&mut self, request: JudgeDecisionRequest, player_user_id: String) { if let Some(this_game) = self.games.get_mut(&request.game_id) { - if let Some(player_user) = self.state.online_users.read().unwrap().get(&addr) { - let player_user_id = player_user.read().unwrap().uuid.to_string(); - let game_id = request.game_id.to_string(); + let game_id = request.game_id.to_string(); - // Send to game - this_game.judge_round(&request, player_user_id); + // Send to game + this_game.judge_round(&request, player_user_id); - self.send_game_state_update_all(vec![game_id.clone()]); - self.send_game_meta_update(vec![game_id]); - } else { - tracing::error!("Received judge request for nonexistent judge player!"); - } + self.send_game_state_update_all(vec![game_id.clone()]); + self.send_game_meta_update(vec![game_id]); } else { tracing::error!("Received judge request for nonexistent game!"); } @@ -102,43 +105,49 @@ impl GameHandler { } /// Process player move request - async fn handle_player_move(&mut self, request: PlayerMoveRequest, addr: SocketAddr) { + async fn handle_player_move( + &mut self, + request: PlayerMoveRequest, + player_user: Arc>, + ) { if let Some(this_game) = self.games.get_mut(&request.game_id) { - if let Some(player_user) = self.state.online_users.read().unwrap().get(&addr) { - let player_user_id = player_user.read().unwrap().uuid.to_string(); + let player_user_id = player_user.read().unwrap().uuid.to_string(); - // Do the stuff - match this_game.player_move(&request, player_user_id) { - Err(err) => { - let message = ChatMessage { text: err }; - let users_tx = self.state.tx_user_handler.clone(); - tokio::spawn(async move { - if let Err(e) = users_tx - .send(DmUser(SendChatMessage(message), Addr(addr))) - .await - { - tracing::error!("Could not send message: {}", e); - } - }); - } - Ok(None) => { - tracing::debug!("TODO: whatever i'm supposed to do") - } - Ok(Some((judge_round, czar_id))) => { - let users_tx = self.state.tx_user_handler.clone(); - tokio::spawn(async move { - if let Err(e) = users_tx - .send(DmUser(SendJudgeRound(judge_round), Id(czar_id))) - .await - { - tracing::error!("Could not send message: {}", e); - } - }); - } - }; - } else { - tracing::error!("Nonexistent player tried to submit move for game!"); - } + // Do the stuff + match this_game.player_move(&request, player_user_id.clone()) { + Err(err) => { + let message = ChatMessage { text: err }; + let users_tx = self.state.tx_user_handler.clone(); + tokio::spawn(async move { + if let Err(e) = users_tx + .send(UserHandlerMessage::DmUser( + SendChatMessage(message), + Id(player_user_id), + )) + .await + { + tracing::error!("Could not send message: {}", e); + } + }); + } + Ok(None) => { + tracing::debug!("TODO: whatever i'm supposed to do") + } + Ok(Some((judge_round, czar_id))) => { + let users_tx = self.state.tx_user_handler.clone(); + tokio::spawn(async move { + if let Err(e) = users_tx + .send(UserHandlerMessage::DmUser( + SendJudgeRound(judge_round), + Id(czar_id), + )) + .await + { + tracing::error!("Could not send message: {}", e); + } + }); + } + }; } else { tracing::error!("Player tried to submit move for nonexistent game!"); } @@ -146,34 +155,29 @@ impl GameHandler { } /// Puts a user in a game - async fn join_game(&mut self, game_id: String, addr: SocketAddr) { + async fn join_game(&mut self, game_id: String, this_user: Arc>) { if self.games.contains_key(&game_id) { - if let Some(this_user) = self.state.online_users.read().unwrap().get(&addr) { - let this_user_id = this_user.read().unwrap().uuid.clone(); + let this_user_id = this_user.read().unwrap().uuid.clone(); - // Register game to user - if !self.game_id_by_user_id.contains_key(&this_user_id) { - self.game_id_by_user_id - .insert(this_user_id.clone(), vec![game_id.clone()]); - } else if self.game_id_by_user_id.contains_key(&this_user_id) { - self.game_id_by_user_id - .get_mut(&this_user_id) - .unwrap() - .extend(vec![game_id.clone()]); - } - - // Create player - self.games - .get_mut(&game_id) + // Register game to user + if !self.game_id_by_user_id.contains_key(&this_user_id) { + self.game_id_by_user_id + .insert(this_user_id.clone(), vec![game_id.clone()]); + } else if self.game_id_by_user_id.contains_key(&this_user_id) { + self.game_id_by_user_id + .get_mut(&this_user_id) .unwrap() - .create_player(this_user.clone()); - - // Send cards - self.send_game_state_update_single(this_user_id, game_id.clone()) - } else { - tracing::error!("Tried to add a nonexistent user to game!"); - return; + .extend(vec![game_id.clone()]); } + + // Create player + self.games + .get_mut(&game_id) + .unwrap() + .create_player(this_user.clone()); + + // Send cards + self.send_game_state_update_single(this_user_id, game_id.clone()) } else { tracing::error!("User tried to join a nonexistent game!"); return; @@ -324,7 +328,7 @@ impl GameHandler { } /// Creates a new game - async fn create_new_game(&mut self, new_game: NewGameRequest, addr: SocketAddr) { + async fn create_new_game(&mut self, new_game: NewGameRequest, host: Arc>) { if new_game.game_packs.is_empty() { tracing::error!("New game cards are empty!"); return; @@ -333,58 +337,54 @@ impl GameHandler { return; } - if let Some(host) = self.state.online_users.read().unwrap().get(&addr) { - let new_game_name; - let max_game_name_len = 32; + let new_game_name; + let max_game_name_len = 32; - if new_game.game_name.len() > max_game_name_len { - new_game_name = new_game.game_name[..max_game_name_len].to_string() - } else { - new_game_name = new_game.game_name - } - - // Create manifest - let manifest = NewGameManifest { - name: new_game_name, - host: host.clone(), - packs: new_game - .game_packs - .into_iter() - .map(|pack| u8::from_str_radix(&pack, 10).unwrap()) - .collect(), - }; - - // Create game using manifest - let mut new_game_object = Game::new(&self.packs, manifest); - - // Don't forget to create the host player!!! - new_game_object.create_player(host.clone()); - - let game_id = new_game_object.uuid.to_string(); - - // Add game to active list - self.games - .insert(new_game_object.uuid.to_string(), new_game_object); - - // Register game to user - let user_id = host.read().unwrap().uuid.clone(); - if !self.game_id_by_user_id.contains_key(&user_id) { - self.game_id_by_user_id - .insert(user_id.clone(), vec![game_id.clone()]); - } else if self.game_id_by_user_id.contains_key(&user_id) { - self.game_id_by_user_id - .get_mut(&user_id) - .unwrap() - .extend(vec![game_id.clone()]); - } - self.send_game_state_update_all(vec![game_id.clone()]); - self.send_game_meta_update(vec![game_id]); - - self.broadcast_game_browser_update(); - self.broadcast_game_count(); + if new_game.game_name.len() > max_game_name_len { + new_game_name = new_game.game_name[..max_game_name_len].to_string() } else { - tracing::error!("Attempted to create game for nonexistent player!"); + new_game_name = new_game.game_name } + + // Create manifest + let manifest = NewGameManifest { + name: new_game_name, + host: host.clone(), + packs: new_game + .game_packs + .into_iter() + .map(|pack| u8::from_str_radix(&pack, 10).unwrap()) + .collect(), + }; + + // Create game using manifest + let mut new_game_object = Game::new(&self.packs, manifest); + + // Don't forget to create the host player!!! + new_game_object.create_player(host.clone()); + + let game_id = new_game_object.uuid.to_string(); + + // Add game to active list + self.games + .insert(new_game_object.uuid.to_string(), new_game_object); + + // Register game to user + let user_id = host.read().unwrap().uuid.clone(); + if !self.game_id_by_user_id.contains_key(&user_id) { + self.game_id_by_user_id + .insert(user_id.clone(), vec![game_id.clone()]); + } else if self.game_id_by_user_id.contains_key(&user_id) { + self.game_id_by_user_id + .get_mut(&user_id) + .unwrap() + .extend(vec![game_id.clone()]); + } + self.send_game_state_update_all(vec![game_id.clone()]); + self.send_game_meta_update(vec![game_id]); + + self.broadcast_game_browser_update(); + self.broadcast_game_count(); } /// Generate games list update diff --git a/server/src/incoming_message_handler.rs b/server/src/incoming_message_handler.rs index 2c35db6..d6b322e 100644 --- a/server/src/incoming_message_handler.rs +++ b/server/src/incoming_message_handler.rs @@ -1,7 +1,6 @@ use crate::user_handler::*; use crate::AppState; use crate::GameHandlerMessage::*; -use crate::OutgoingMessageHandlerMessage::*; use crate::UserHandlerMessage::*; use axum::extract::ws::{CloseFrame, Message}; use lib::*; @@ -26,54 +25,59 @@ impl IncomingMessageHandler { Message::Text(text) => match text { _chat_message if let Ok(chat_message) = from_str::(&text) => { // TODO: This should be delegated to user handler and an outgoing message and/or chat handler - let msg; + // let msg; if chat_message.text.len() > 1024 { - msg = format! {"{0}: {1}", self.state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text[..1024].to_string()}; - } else { - msg = format! {"{0}: {1}", self.state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text}; - } - - // Broadcast incoming chat message - let msg = ChatMessage { text: msg }; - let tx = self.state.tx_outgoing_message_handler.clone(); - tokio::spawn(async move { - if let Err(e) = tx - .send(Broadcast(ServerToClientMessage::ChatMessage(msg))) + if let Err(e) = self + .state + .tx_user_handler + .send(PassChatMessage(chat_message.text[..1024].to_string(), addr)) .await { - tracing::error!("Error broadcasting Chat message: {}", e) + tracing::error!("Error contacting outgoing message handler: {}", e); } - }); + } else { + if let Err(e) = self + .state + .tx_user_handler + .send(PassChatMessage(chat_message.text, addr)) + .await + { + tracing::error!("Error contacting outgoing message handler: {}", e); + } + } } _user_log_in_request if let Ok(user_log_in) = from_str::(&text) => { - let msg = UserLogIn(user_log_in, addr); - let tx = self.state.tx_user_handler.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send(msg).await { - tracing::error!("Error sending user login: {}", e) - } - }); + if let Err(e) = self + .state + .tx_user_handler + .send(UserLogIn(user_log_in, addr)) + .await + { + tracing::error!("Error sending user login: {}", e) + } } _new_game_request if let Ok(new_game) = from_str::(&text) => { - let msg = NewGame(new_game, addr); - let tx = self.state.tx_game_handler.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send(msg).await { - tracing::error!("Error requesting new game: {}", e) - } - }); + if let Err(e) = self + .state + .tx_user_handler + .send(GetUser(new_game, addr)) + .await + { + tracing::error!("Error requesting new game: {}", e) + } } _join_game_request if let Ok(join_request) = from_str::(&text) => { - let msg = JoinGame(join_request.id, addr); - let tx = self.state.tx_game_handler.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send(msg).await { - tracing::error!("Error requesting game join: {}", e) - } - }); + if let Err(e) = self + .state + .tx_user_handler + .send(UserHandlerMessage::JoinGame(join_request.id, addr)) + .await + { + tracing::error!("Error requesting game join: {}", e) + } } _player_move_request @@ -87,40 +91,43 @@ impl IncomingMessageHandler { tracing::error!("Move request game_id is empty! Ignoring..."); return; } else { - let msg = MoveRequest(move_request, addr); - let tx = self.state.tx_game_handler.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send(msg).await { - tracing::error!("Error sending move request: {}", e) - } - }); + if let Err(e) = self + .state + .tx_user_handler + .send(UserHandlerMessage::MoveRequest(move_request, addr)) + .await + { + tracing::error!("Error sending move request: {}", e) + } } } _judge_decision - if let Ok(judge_request) = from_str::(&text) => + if let Ok(judge_decision) = from_str::(&text) => { - if !judge_request.winning_cards.is_empty() { - let msg = JudgeDecision(judge_request, addr); - let tx = self.state.tx_game_handler.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send(msg).await { - tracing::error!("Error sending Judge Decision: {}", e) - } - }); + if !judge_decision.winning_cards.is_empty() { + if let Err(e) = self + .state + .tx_user_handler + .send(GetPlayerUserId(judge_decision, addr)) + .await + { + tracing::error!("Error contacting user handler: {}", e) + } } else { tracing::error!("Judge request received with empty cards"); } } _delete_game if let Ok(delete_request) = from_str::(&text) => { - let msg = DeleteGame(delete_request); - let tx = self.state.tx_game_handler.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send(msg).await { - tracing::error!("Error sending delete game: {}", e) - } - }); + if let Err(e) = self + .state + .tx_game_handler + .send(DeleteGame(delete_request)) + .await + { + tracing::error!("Error sending delete game: {}", e) + } } _ => tracing::error!( @@ -136,9 +143,7 @@ impl IncomingMessageHandler { data ), - Message::Close(close_frame) => { - self.handle_close(close_frame, addr); - } + Message::Close(close_frame) => self.handle_close(close_frame, addr).await, Message::Ping(ping) => { tracing::info!("Pong received with: {:?}", ping); @@ -151,11 +156,7 @@ impl IncomingMessageHandler { } /// This runs when a connection closes - fn handle_close(&self, close_frame: Option, addr: SocketAddr) { - let msg = UserHandlerMessage::Cleanup(addr); - let tx = self.state.tx_user_handler.clone(); - tokio::spawn(async move { tx.send(msg).await }); - + async fn handle_close(&self, close_frame: Option>, addr: SocketAddr) { // Process close frame if let Some(cf) = close_frame { tracing::info!( @@ -166,5 +167,14 @@ impl IncomingMessageHandler { } else { tracing::info!("close received without close frame") } + + if let Err(e) = self + .state + .tx_user_handler + .send(UserHandlerMessage::Cleanup(addr)) + .await + { + tracing::info!("Error contacting user handler: {}", e) + } } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 3a76a12..258728b 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -4,11 +4,7 @@ use crate::outgoing_message_handler::*; use axum::extract::ws::Message; use lib::*; use std::fmt::Debug; -use std::{ - collections::HashMap, - net::SocketAddr, - sync::{Arc, RwLock}, -}; +use std::net::SocketAddr; use tokio::sync::mpsc; use user_handler::*; use uuid::Uuid; @@ -46,7 +42,6 @@ impl User { /// Shared state pub struct AppState { - pub online_users: RwLock>>>, pub tx_broadcast: tokio::sync::broadcast::Sender, pub tx_game_handler: mpsc::Sender, pub tx_incoming_message_handler: mpsc::Sender<(SocketAddr, Message)>, diff --git a/server/src/main.rs b/server/src/main.rs index 67bfc3f..c4686f7 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,12 +6,7 @@ use anyhow::{Context, Result}; use axum::{routing::get, Router}; use clap::{arg, command}; use server::*; -use std::{ - collections::HashMap, - net::SocketAddr, - sync::{Arc, RwLock}, - time::Duration, -}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use tokio::sync::{broadcast, mpsc}; use tower::ServiceBuilder; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; @@ -82,10 +77,8 @@ async fn main() -> Result<()> { let (tx_incoming_message_handler, mut rx_incoming_message_handler) = mpsc::channel(32); let (tx_outgoing_message_handler, mut rx_outgoing_message_handler) = mpsc::channel(32); let (tx_user_handler, mut rx_user_handler) = mpsc::channel(32); - let online_users = RwLock::new(HashMap::>>::new()); let app_state = Arc::new(AppState { - online_users, tx_broadcast: tx_broadcast.clone(), tx_game_handler, tx_incoming_message_handler, diff --git a/server/src/user_handler.rs b/server/src/user_handler.rs index b19c0a0..be9855b 100644 --- a/server/src/user_handler.rs +++ b/server/src/user_handler.rs @@ -2,6 +2,7 @@ use crate::name_generator::*; use crate::AppState; use crate::DmUserMethod::*; use crate::GameHandlerMessage; +use crate::OutgoingMessageHandlerMessage; use crate::OutgoingMessageHandlerMessage::*; use crate::SendUserMessage::*; use crate::User; @@ -17,16 +18,6 @@ use tokio::sync::mpsc::Sender; // TODO: clean up all this tx/msg mess -/// Handles users -pub struct UserHandler { - /// Pointer to global state - state: Arc, - users_by_id: HashMap>>, - name_generator: NameGenerator, - reserved_names: HashSet, - offline_users: HashMap>>, -} - pub enum DmUserMethod { Addr(SocketAddr), Id(String), @@ -38,6 +29,11 @@ pub enum UserHandlerMessage { UserLogIn(UserLogInRequest, SocketAddr), DmUser(SendUserMessage, DmUserMethod), Cleanup(SocketAddr), + PassChatMessage(String, SocketAddr), + GetPlayerUserId(JudgeDecisionRequest, SocketAddr), + GetUser(NewGameRequest, SocketAddr), + MoveRequest(PlayerMoveRequest, SocketAddr), + JoinGame(String, SocketAddr), } /// Types of messages that can be sent to a user as a DM @@ -48,12 +44,25 @@ pub enum SendUserMessage { SendJudgeRound(JudgeRound), } +/// Handles users +pub struct UserHandler { + /// Pointer to global state + state: Arc, + users_by_id: HashMap>>, + name_generator: NameGenerator, + reserved_names: HashSet, + offline_users: HashMap>>, + online_users: RwLock>>>, +} + impl UserHandler { /// Returns new UserHandler pub fn new(state: Arc) -> Self { let offline_users = HashMap::>>::new(); + let online_users = RwLock::new(HashMap::>>::new()); UserHandler { offline_users, + online_users, state, users_by_id: HashMap::>>::new(), reserved_names: HashSet::::new(), @@ -68,6 +77,107 @@ impl UserHandler { UserLogIn(request, addr) => self.login(request, addr).await, DmUser(message, method) => self.dm_user(message, method).await, Cleanup(addr) => self.user_cleanup(addr).await, + PassChatMessage(msg, addr) => self.pass_chat_message(msg, addr).await, + GetPlayerUserId(judge_decision, addr) => { + self.get_player_user_id(judge_decision, addr).await + } + GetUser(new_game, addr) => self.get_user(new_game, addr).await, + MoveRequest(move_request, addr) => self.move_request(move_request, addr).await, + JoinGame(game_id, addr) => self.join_game(game_id, addr).await, + } + } + + async fn join_game(&self, game_id: String, addr: SocketAddr) { + let user = self + .online_users + .read() + .unwrap() + .get(&addr) + .unwrap() + .clone(); + + if let Err(e) = self + .state + .tx_game_handler + .send(GameHandlerMessage::JoinGame(game_id, user)) + .await + { + tracing::error!("Error: {}", e) + } + } + + async fn move_request(&self, move_request: PlayerMoveRequest, addr: SocketAddr) { + let player_user = self + .online_users + .read() + .unwrap() + .get(&addr) + .unwrap() + .clone(); + if let Err(e) = self + .state + .tx_game_handler + .send(GameHandlerMessage::MoveRequest(move_request, player_user)) + .await + { + tracing::error!("Error: {}", e) + } + } + + async fn get_user(&self, new_game: NewGameRequest, addr: SocketAddr) { + let host = self + .online_users + .read() + .unwrap() + .get(&addr) + .unwrap() + .clone(); + if let Err(e) = self + .state + .tx_game_handler + .send(GameHandlerMessage::NewGame(new_game, host)) + .await + { + tracing::error!("Error requesting new game: {}", e) + } + } + + async fn get_player_user_id(&self, judge_decision: JudgeDecisionRequest, addr: SocketAddr) { + let player_user_id = self + .online_users + .read() + .unwrap() + .get(&addr) + .unwrap() + .read() + .unwrap() + .uuid + .clone(); + + if let Err(e) = self + .state + .tx_game_handler + .send(GameHandlerMessage::JudgeDecision( + judge_decision, + player_user_id, + )) + .await + { + tracing::error!("Error contacting game handler: {}", e) + } + } + + async fn pass_chat_message(&self, msg: String, addr: SocketAddr) { + let text = format! {"{0}: {1}", self.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, msg}; + if let Err(e) = self + .state + .tx_outgoing_message_handler + .send(OutgoingMessageHandlerMessage::Broadcast( + ServerToClientMessage::ChatMessage(ChatMessage { text }), + )) + .await + { + tracing::error!("Can't contact outgoing message handler: {}", e); } } @@ -88,7 +198,7 @@ impl UserHandler { } } Addr(addr) => { - if let Some(user) = self.state.online_users.read().unwrap().get(&addr) { + if let Some(user) = self.online_users.read().unwrap().get(&addr) { tx = user.read().unwrap().tx.clone(); } else { tracing::error!("Attempted to send message to offline user!"); @@ -186,7 +296,7 @@ impl UserHandler { .state .tx_outgoing_message_handler .send(Broadcast(ServerToClientMessage::ChatMessage( - meta_announce_user_join(&self.state, &addr), + self.meta_announce_user_join(&addr), ))) .await { @@ -226,7 +336,7 @@ impl UserHandler { let old_name; - if let Some(user) = self.state.online_users.read().unwrap().get(&addr) { + if let Some(user) = self.online_users.read().unwrap().get(&addr) { old_name = user.read().unwrap().name.clone(); // User abandons current name by requesting a new one @@ -240,7 +350,7 @@ impl UserHandler { if self.offline_users.contains_key(&new_name) { let buf; // Copy over new tx - if let Some(online_user) = self.state.online_users.write().unwrap().remove(&addr) { + if let Some(online_user) = self.online_users.write().unwrap().remove(&addr) { if let Some(offline_user) = self.offline_users.remove(&new_name) { offline_user.write().unwrap().tx = online_user.write().unwrap().tx.clone(); buf = offline_user; @@ -254,7 +364,7 @@ impl UserHandler { } // Move offline user object to online - self.state.online_users.write().unwrap().insert(addr, buf); + self.online_users.write().unwrap().insert(addr, buf); // Send welcome back messages let msg = format! { @@ -296,7 +406,7 @@ impl UserHandler { self.reserved_names.insert(new_name.clone()); // Change user's name - if let Some(user) = self.state.online_users.write().unwrap().get_mut(&addr) { + if let Some(user) = self.online_users.write().unwrap().get_mut(&addr) { user.write().unwrap().change_name(new_name.clone()); } else { tracing::error!("Error updating username: Can't find user!"); @@ -331,7 +441,7 @@ impl UserHandler { ) .await; // Update games this user is in - if let Some(user) = &self.state.online_users.read().unwrap().get(&addr) { + if let Some(user) = &self.online_users.read().unwrap().get(&addr) { let user_id = user.read().unwrap().uuid.to_string(); let msg = GameHandlerMessage::SendGameUserUpdate(user_id); let tx = self.state.tx_game_handler.clone(); @@ -348,14 +458,7 @@ impl UserHandler { /// Broadcast updated user count fn broadcast_user_count(&self) { let tx = self.state.tx_outgoing_message_handler.clone(); - let online_users: u32 = self - .state - .online_users - .read() - .unwrap() - .len() - .try_into() - .unwrap(); + let online_users: u32 = self.online_users.read().unwrap().len().try_into().unwrap(); let msg = ServerToClientMessage::ServerOnlineUsers(ServerOnlineUsers { online_users }); tokio::spawn(async move { tx.send(Broadcast(msg)).await }); } @@ -363,7 +466,6 @@ impl UserHandler { /// Clean up after a user when they disconnect async fn user_cleanup(&mut self, addr: SocketAddr) { let user_name = self - .state .online_users .read() .unwrap() @@ -398,19 +500,14 @@ impl UserHandler { /// Set user status to online fn set_user_online(&self, addr: SocketAddr, user: Arc>) { - self.state.online_users.write().unwrap().insert(addr, user); + self.online_users.write().unwrap().insert(addr, user); } /// Set user status to offline fn set_user_offline(&mut self, user_name: String, addr: &SocketAddr) { self.offline_users.insert( user_name, - self.state - .online_users - .write() - .unwrap() - .remove(addr) - .unwrap(), + self.online_users.write().unwrap().remove(addr).unwrap(), ); } @@ -420,13 +517,39 @@ impl UserHandler { .state .tx_outgoing_message_handler .send(Broadcast(ServerToClientMessage::ChatUpdate( - generate_chat_update(&self.state), + self.generate_chat_update(), ))) .await { tracing::error!("Error contacting outgoing message handler: {}", e); } } + /// Generate chatroom join announcement + fn meta_announce_user_join(&self, addr: &SocketAddr) -> ChatMessage { + let msg = format!("{} joined.", { + if let Some(user) = self.online_users.read().unwrap().get(addr) { + user.read().unwrap().name.clone() + } else { + "Error!".to_string() + } + }); + + ChatMessage { text: msg } + } + /// Generate chatroom metadata update + fn generate_chat_update(&self) -> ChatUpdate { + // TODO: this may get expensive if there are many users + let mut names = vec![]; + + for user in self.online_users.read().unwrap().iter() { + names.push(user.1.read().unwrap().name.clone()); + } + + ChatUpdate { + room: "Lobby".to_string(), + users: names, + } + } } /// Generate message to notify client of user changes @@ -436,34 +559,6 @@ pub fn user_client_self_update(new_user: &Arc>) -> UserUpdate { } } -/// Generate chatroom metadata update -pub fn generate_chat_update(state: &Arc) -> ChatUpdate { - // TODO: this may get expensive if there are many users - let mut names = vec![]; - - for user in state.online_users.read().unwrap().iter() { - names.push(user.1.read().unwrap().name.clone()); - } - - ChatUpdate { - room: "Lobby".to_string(), - users: names, - } -} - -/// Generate chatroom join announcement -pub fn meta_announce_user_join(state: &Arc, addr: &SocketAddr) -> ChatMessage { - let msg = format!("{} joined.", { - if let Some(user) = state.online_users.read().unwrap().get(addr) { - user.read().unwrap().name.clone() - } else { - "Error!".to_string() - } - }); - - ChatMessage { text: msg } -} - /// Generate message-of-the-day server greeting pub fn meta_motd() -> ChatMessage { ChatMessage { diff --git a/server/src/websocket.rs b/server/src/websocket.rs index d863e81..55ab0b4 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -42,42 +42,48 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc, ad let mut rx = state.tx_broadcast.subscribe(); // Send messages to this client - let mut send_task = tokio::spawn(async move { - let mut broadcast = None; - let mut dm: Option = None; - loop { - tokio::select! { - b = rx.recv() => broadcast = Some(b.unwrap()), - d = dm_rx.recv() => dm = d, - }; + let mut send_task = tokio::task::Builder::new() + .name("User send task") + .spawn(async move { + let mut broadcast = None; + let mut dm: Option = None; + loop { + tokio::select! { + b = rx.recv() => broadcast = Some(b.unwrap()), + d = dm_rx.recv() => dm = d, + }; - if let Some(msg) = &dm { - if sender.send(msg.to_owned()).await.is_err() { - break; - } else { - dm = Option::None; - } - } else if let Some(msg) = &broadcast { - if sender.send(msg.to_owned()).await.is_err() { - } else { - broadcast = Option::None; + if let Some(msg) = &dm { + if sender.send(msg.to_owned()).await.is_err() { + break; + } else { + dm = Option::None; + } + } else if let Some(msg) = &broadcast { + if sender.send(msg.to_owned()).await.is_err() { + } else { + broadcast = Option::None; + } } } - } - }); + }) + .expect("Can't start user send task"); // Receive messages from this client - let mut recv_task = tokio::spawn(async move { - while let Some(Ok(message)) = receiver.next().await { - if let Err(e) = state - .tx_incoming_message_handler - .send((addr.clone(), message.clone())) - .await - { - tracing::error!("Error relaying received message: {}", e) - }; - } - }); + let mut recv_task = tokio::task::Builder::new() + .name("User recv task") + .spawn(async move { + while let Some(Ok(message)) = receiver.next().await { + if let Err(e) = state + .tx_incoming_message_handler + .send((addr.clone(), message.clone())) + .await + { + tracing::error!("Error relaying received message: {}", e) + }; + } + }) + .expect("Can't start user recv task"); // If either task completes then abort the other tokio::select! {