From 8cb04a4eb3644934f95f9ae91e54b50e875df13b Mon Sep 17 00:00:00 2001 From: Adam Doyle Date: Tue, 3 Dec 2024 19:40:15 -0500 Subject: [PATCH] stop cloning channels when we only really need two --- server/src/game_handler.rs | 14 ++++------- server/src/incoming_message_handler.rs | 3 +-- server/src/lib.rs | 4 ++-- server/src/main.rs | 4 ++-- server/src/outgoing_message_handler.rs | 12 ++++++---- server/src/user_handler.rs | 33 ++++++++++---------------- 6 files changed, 28 insertions(+), 42 deletions(-) diff --git a/server/src/game_handler.rs b/server/src/game_handler.rs index ba111f3..955871a 100644 --- a/server/src/game_handler.rs +++ b/server/src/game_handler.rs @@ -404,13 +404,11 @@ impl GameHandler { .collect::>(); let tx = self.state.tx_outgoing_message_handler.clone(); - let btx = self.state.tx_broadcast.clone(); tokio::spawn(async move { if let Err(e) = tx - .send(Broadcast(( - btx, - ServerToClientMessage::GamesUpdate(GamesUpdate { games }), - ))) + .send(Broadcast(ServerToClientMessage::GamesUpdate(GamesUpdate { + games, + }))) .await { tracing::error!("Error broadcasting games update: {}", e); @@ -421,15 +419,11 @@ impl GameHandler { /// Broadcast updated game count fn broadcast_game_count(&self) { let tx = self.state.tx_outgoing_message_handler.clone(); - let btx = self.state.tx_broadcast.clone(); let active_games: u32 = self.games.len().try_into().unwrap(); let msg = ServerActiveGames { active_games }; tokio::spawn(async move { if let Err(e) = tx - .send(Broadcast(( - btx, - ServerToClientMessage::ServerActiveGames(msg), - ))) + .send(Broadcast(ServerToClientMessage::ServerActiveGames(msg))) .await { tracing::error!("Error broadcasting game count: {}", e); diff --git a/server/src/incoming_message_handler.rs b/server/src/incoming_message_handler.rs index 0e2017d..2c35db6 100644 --- a/server/src/incoming_message_handler.rs +++ b/server/src/incoming_message_handler.rs @@ -36,10 +36,9 @@ impl IncomingMessageHandler { // Broadcast incoming chat message let msg = ChatMessage { text: msg }; let tx = self.state.tx_outgoing_message_handler.clone(); - let btx = self.state.tx_broadcast.clone(); tokio::spawn(async move { if let Err(e) = tx - .send(Broadcast((btx, ServerToClientMessage::ChatMessage(msg)))) + .send(Broadcast(ServerToClientMessage::ChatMessage(msg))) .await { tracing::error!("Error broadcasting Chat message: {}", e) diff --git a/server/src/lib.rs b/server/src/lib.rs index 92d8206..c3fb1a3 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -9,7 +9,7 @@ use std::{ net::SocketAddr, sync::{Arc, RwLock}, }; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::mpsc; use user_handler::*; use uuid::Uuid; pub mod card_loader; @@ -49,7 +49,7 @@ pub struct AppState { pub games_by_user: RwLock>>, pub offline_users: RwLock>>>, pub online_users: RwLock>>>, - pub tx_broadcast: broadcast::Sender, + pub tx_broadcast: tokio::sync::broadcast::Sender, pub tx_game_handler: mpsc::Sender, pub tx_incoming_message_handler: mpsc::Sender<(SocketAddr, Message)>, pub tx_outgoing_message_handler: diff --git a/server/src/main.rs b/server/src/main.rs index 8775f20..fdcc40f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -82,7 +82,7 @@ async fn main() -> Result<()> { games_by_user, offline_users, online_users, - tx_broadcast, + tx_broadcast: tx_broadcast.clone(), tx_game_handler, tx_incoming_message_handler, tx_outgoing_message_handler, @@ -101,7 +101,7 @@ async fn main() -> Result<()> { .unwrap(); // Spawn task to handle outgoing messages - let outgoing_message_handler = OutgoingMessageHandler::new(); + let outgoing_message_handler = OutgoingMessageHandler::new(tx_broadcast); tokio::task::Builder::new() .name("Outgoing Message Handler") .spawn(async move { diff --git a/server/src/outgoing_message_handler.rs b/server/src/outgoing_message_handler.rs index f773787..418acee 100644 --- a/server/src/outgoing_message_handler.rs +++ b/server/src/outgoing_message_handler.rs @@ -7,14 +7,16 @@ use serde_json::to_string; /// For interacting with the outgoing message handler pub enum OutgoingMessageHandlerMessage { Unicast((tokio::sync::mpsc::Sender, T)), - Broadcast((tokio::sync::broadcast::Sender, T)), + Broadcast(T), } -pub struct OutgoingMessageHandler {} +pub struct OutgoingMessageHandler { + pub tx_broadcast: tokio::sync::broadcast::Sender, +} impl OutgoingMessageHandler { - pub fn new() -> Self { - OutgoingMessageHandler {} + pub fn new(tx_broadcast: tokio::sync::broadcast::Sender) -> Self { + OutgoingMessageHandler { tx_broadcast } } pub async fn handle(&self, message: OutgoingMessageHandlerMessage) { @@ -25,7 +27,7 @@ impl OutgoingMessageHandler { } } Broadcast(message) => { - if let Err(e) = message.0.send(self.serialize(message.1)) { + if let Err(e) = self.tx_broadcast.send(self.serialize(message)) { tracing::error!("Error sending message: {}", e) } } diff --git a/server/src/user_handler.rs b/server/src/user_handler.rs index 7ae24ee..0e4a80a 100644 --- a/server/src/user_handler.rs +++ b/server/src/user_handler.rs @@ -182,9 +182,8 @@ impl UserHandler { if let Err(e) = self .state .tx_outgoing_message_handler - .send(Broadcast(( - self.state.tx_broadcast.clone(), - ServerToClientMessage::ChatMessage(meta_announce_user_join(&self.state, &addr)), + .send(Broadcast(ServerToClientMessage::ChatMessage( + meta_announce_user_join(&self.state, &addr), ))) .await { @@ -214,7 +213,6 @@ impl UserHandler { /// Handle user login async fn login(&mut self, request: UserLogInRequest, addr: SocketAddr) { let username_max_len = 66; // This is the longest name the generator may produce right now - let broadcast_tx = self.state.tx_broadcast.clone(); let new_name; if request.username.len() > username_max_len { @@ -270,10 +268,9 @@ impl UserHandler { if let Err(e) = self .state .tx_outgoing_message_handler - .send(Broadcast(( - self.state.tx_broadcast.clone(), - ServerToClientMessage::ChatMessage(ChatMessage { text: msg }), - ))) + .send(Broadcast(ServerToClientMessage::ChatMessage(ChatMessage { + text: msg, + }))) .await { tracing::error!("Error contacting outgoing message handler: {}", e); @@ -317,10 +314,9 @@ impl UserHandler { if let Err(e) = self .state .tx_outgoing_message_handler - .send(Broadcast(( - broadcast_tx.clone(), - ServerToClientMessage::ChatMessage(ChatMessage { text: msg }), - ))) + .send(Broadcast(ServerToClientMessage::ChatMessage(ChatMessage { + text: msg, + }))) .await { tracing::error!("Error contacting outgoing message handler: {}", e); @@ -373,7 +369,6 @@ impl UserHandler { /// Broadcast updated user count fn broadcast_user_count(&self) { let tx = self.state.tx_outgoing_message_handler.clone(); - let btx = self.state.tx_broadcast.clone(); let online_users: u32 = self .state .online_users @@ -383,7 +378,7 @@ impl UserHandler { .try_into() .unwrap(); let msg = ServerToClientMessage::ServerOnlineUsers(ServerOnlineUsers { online_users }); - tokio::spawn(async move { tx.send(Broadcast((btx, msg))).await }); + tokio::spawn(async move { tx.send(Broadcast(msg)).await }); } /// Clean up after a user when they disconnect @@ -408,10 +403,7 @@ impl UserHandler { if let Err(e) = self .state .tx_outgoing_message_handler - .send(Broadcast(( - self.state.tx_broadcast.clone(), - ServerToClientMessage::ChatMessage(msg), - ))) + .send(Broadcast(ServerToClientMessage::ChatMessage(msg))) .await { tracing::error!("Error contacting outgoing message handler: {}", e); @@ -448,9 +440,8 @@ impl UserHandler { if let Err(e) = self .state .tx_outgoing_message_handler - .send(Broadcast(( - self.state.tx_broadcast.clone(), - ServerToClientMessage::ChatUpdate(generate_chat_update(&self.state)), + .send(Broadcast(ServerToClientMessage::ChatUpdate( + generate_chat_update(&self.state), ))) .await {