stop cloning channels when we only really need two

This commit is contained in:
Adam 2024-12-03 19:40:15 -05:00
parent 7b5269feb4
commit 8cb04a4eb3
6 changed files with 28 additions and 42 deletions

View file

@ -404,13 +404,11 @@ impl GameHandler {
.collect::<Vec<GameBrowserMeta>>(); .collect::<Vec<GameBrowserMeta>>();
let tx = self.state.tx_outgoing_message_handler.clone(); let tx = self.state.tx_outgoing_message_handler.clone();
let btx = self.state.tx_broadcast.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = tx if let Err(e) = tx
.send(Broadcast(( .send(Broadcast(ServerToClientMessage::GamesUpdate(GamesUpdate {
btx, games,
ServerToClientMessage::GamesUpdate(GamesUpdate { games }), })))
)))
.await .await
{ {
tracing::error!("Error broadcasting games update: {}", e); tracing::error!("Error broadcasting games update: {}", e);
@ -421,15 +419,11 @@ impl GameHandler {
/// Broadcast updated game count /// Broadcast updated game count
fn broadcast_game_count(&self) { fn broadcast_game_count(&self) {
let tx = self.state.tx_outgoing_message_handler.clone(); let tx = self.state.tx_outgoing_message_handler.clone();
let btx = self.state.tx_broadcast.clone();
let active_games: u32 = self.games.len().try_into().unwrap(); let active_games: u32 = self.games.len().try_into().unwrap();
let msg = ServerActiveGames { active_games }; let msg = ServerActiveGames { active_games };
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = tx if let Err(e) = tx
.send(Broadcast(( .send(Broadcast(ServerToClientMessage::ServerActiveGames(msg)))
btx,
ServerToClientMessage::ServerActiveGames(msg),
)))
.await .await
{ {
tracing::error!("Error broadcasting game count: {}", e); tracing::error!("Error broadcasting game count: {}", e);

View file

@ -36,10 +36,9 @@ impl IncomingMessageHandler {
// Broadcast incoming chat message // Broadcast incoming chat message
let msg = ChatMessage { text: msg }; let msg = ChatMessage { text: msg };
let tx = self.state.tx_outgoing_message_handler.clone(); let tx = self.state.tx_outgoing_message_handler.clone();
let btx = self.state.tx_broadcast.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = tx if let Err(e) = tx
.send(Broadcast((btx, ServerToClientMessage::ChatMessage(msg)))) .send(Broadcast(ServerToClientMessage::ChatMessage(msg)))
.await .await
{ {
tracing::error!("Error broadcasting Chat message: {}", e) tracing::error!("Error broadcasting Chat message: {}", e)

View file

@ -9,7 +9,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use tokio::sync::{broadcast, mpsc}; use tokio::sync::mpsc;
use user_handler::*; use user_handler::*;
use uuid::Uuid; use uuid::Uuid;
pub mod card_loader; pub mod card_loader;
@ -49,7 +49,7 @@ pub struct AppState {
pub games_by_user: RwLock<HashMap<String, Vec<String>>>, pub games_by_user: RwLock<HashMap<String, Vec<String>>>,
pub offline_users: RwLock<HashMap<String, Arc<RwLock<User>>>>, pub offline_users: RwLock<HashMap<String, Arc<RwLock<User>>>>,
pub online_users: RwLock<HashMap<SocketAddr, Arc<RwLock<User>>>>, pub online_users: RwLock<HashMap<SocketAddr, Arc<RwLock<User>>>>,
pub tx_broadcast: broadcast::Sender<Message>, pub tx_broadcast: tokio::sync::broadcast::Sender<Message>,
pub tx_game_handler: mpsc::Sender<GameHandlerMessage>, pub tx_game_handler: mpsc::Sender<GameHandlerMessage>,
pub tx_incoming_message_handler: mpsc::Sender<(SocketAddr, Message)>, pub tx_incoming_message_handler: mpsc::Sender<(SocketAddr, Message)>,
pub tx_outgoing_message_handler: pub tx_outgoing_message_handler:

View file

@ -82,7 +82,7 @@ async fn main() -> Result<()> {
games_by_user, games_by_user,
offline_users, offline_users,
online_users, online_users,
tx_broadcast, tx_broadcast: tx_broadcast.clone(),
tx_game_handler, tx_game_handler,
tx_incoming_message_handler, tx_incoming_message_handler,
tx_outgoing_message_handler, tx_outgoing_message_handler,
@ -101,7 +101,7 @@ async fn main() -> Result<()> {
.unwrap(); .unwrap();
// Spawn task to handle outgoing messages // Spawn task to handle outgoing messages
let outgoing_message_handler = OutgoingMessageHandler::new(); let outgoing_message_handler = OutgoingMessageHandler::new(tx_broadcast);
tokio::task::Builder::new() tokio::task::Builder::new()
.name("Outgoing Message Handler") .name("Outgoing Message Handler")
.spawn(async move { .spawn(async move {

View file

@ -7,14 +7,16 @@ use serde_json::to_string;
/// For interacting with the outgoing message handler /// For interacting with the outgoing message handler
pub enum OutgoingMessageHandlerMessage<T: Serialize> { pub enum OutgoingMessageHandlerMessage<T: Serialize> {
Unicast((tokio::sync::mpsc::Sender<Message>, T)), Unicast((tokio::sync::mpsc::Sender<Message>, T)),
Broadcast((tokio::sync::broadcast::Sender<Message>, T)), Broadcast(T),
} }
pub struct OutgoingMessageHandler {} pub struct OutgoingMessageHandler {
pub tx_broadcast: tokio::sync::broadcast::Sender<Message>,
}
impl OutgoingMessageHandler { impl OutgoingMessageHandler {
pub fn new() -> Self { pub fn new(tx_broadcast: tokio::sync::broadcast::Sender<Message>) -> Self {
OutgoingMessageHandler {} OutgoingMessageHandler { tx_broadcast }
} }
pub async fn handle(&self, message: OutgoingMessageHandlerMessage<ServerToClientMessage>) { pub async fn handle(&self, message: OutgoingMessageHandlerMessage<ServerToClientMessage>) {
@ -25,7 +27,7 @@ impl OutgoingMessageHandler {
} }
} }
Broadcast(message) => { Broadcast(message) => {
if let Err(e) = message.0.send(self.serialize(message.1)) { if let Err(e) = self.tx_broadcast.send(self.serialize(message)) {
tracing::error!("Error sending message: {}", e) tracing::error!("Error sending message: {}", e)
} }
} }

View file

@ -182,9 +182,8 @@ impl UserHandler {
if let Err(e) = self if let Err(e) = self
.state .state
.tx_outgoing_message_handler .tx_outgoing_message_handler
.send(Broadcast(( .send(Broadcast(ServerToClientMessage::ChatMessage(
self.state.tx_broadcast.clone(), meta_announce_user_join(&self.state, &addr),
ServerToClientMessage::ChatMessage(meta_announce_user_join(&self.state, &addr)),
))) )))
.await .await
{ {
@ -214,7 +213,6 @@ impl UserHandler {
/// Handle user login /// Handle user login
async fn login(&mut self, request: UserLogInRequest, addr: SocketAddr) { async fn login(&mut self, request: UserLogInRequest, addr: SocketAddr) {
let username_max_len = 66; // This is the longest name the generator may produce right now let username_max_len = 66; // This is the longest name the generator may produce right now
let broadcast_tx = self.state.tx_broadcast.clone();
let new_name; let new_name;
if request.username.len() > username_max_len { if request.username.len() > username_max_len {
@ -270,10 +268,9 @@ impl UserHandler {
if let Err(e) = self if let Err(e) = self
.state .state
.tx_outgoing_message_handler .tx_outgoing_message_handler
.send(Broadcast(( .send(Broadcast(ServerToClientMessage::ChatMessage(ChatMessage {
self.state.tx_broadcast.clone(), text: msg,
ServerToClientMessage::ChatMessage(ChatMessage { text: msg }), })))
)))
.await .await
{ {
tracing::error!("Error contacting outgoing message handler: {}", e); tracing::error!("Error contacting outgoing message handler: {}", e);
@ -317,10 +314,9 @@ impl UserHandler {
if let Err(e) = self if let Err(e) = self
.state .state
.tx_outgoing_message_handler .tx_outgoing_message_handler
.send(Broadcast(( .send(Broadcast(ServerToClientMessage::ChatMessage(ChatMessage {
broadcast_tx.clone(), text: msg,
ServerToClientMessage::ChatMessage(ChatMessage { text: msg }), })))
)))
.await .await
{ {
tracing::error!("Error contacting outgoing message handler: {}", e); tracing::error!("Error contacting outgoing message handler: {}", e);
@ -373,7 +369,6 @@ impl UserHandler {
/// Broadcast updated user count /// Broadcast updated user count
fn broadcast_user_count(&self) { fn broadcast_user_count(&self) {
let tx = self.state.tx_outgoing_message_handler.clone(); let tx = self.state.tx_outgoing_message_handler.clone();
let btx = self.state.tx_broadcast.clone();
let online_users: u32 = self let online_users: u32 = self
.state .state
.online_users .online_users
@ -383,7 +378,7 @@ impl UserHandler {
.try_into() .try_into()
.unwrap(); .unwrap();
let msg = ServerToClientMessage::ServerOnlineUsers(ServerOnlineUsers { online_users }); let msg = ServerToClientMessage::ServerOnlineUsers(ServerOnlineUsers { online_users });
tokio::spawn(async move { tx.send(Broadcast((btx, msg))).await }); tokio::spawn(async move { tx.send(Broadcast(msg)).await });
} }
/// Clean up after a user when they disconnect /// Clean up after a user when they disconnect
@ -408,10 +403,7 @@ impl UserHandler {
if let Err(e) = self if let Err(e) = self
.state .state
.tx_outgoing_message_handler .tx_outgoing_message_handler
.send(Broadcast(( .send(Broadcast(ServerToClientMessage::ChatMessage(msg)))
self.state.tx_broadcast.clone(),
ServerToClientMessage::ChatMessage(msg),
)))
.await .await
{ {
tracing::error!("Error contacting outgoing message handler: {}", e); tracing::error!("Error contacting outgoing message handler: {}", e);
@ -448,9 +440,8 @@ impl UserHandler {
if let Err(e) = self if let Err(e) = self
.state .state
.tx_outgoing_message_handler .tx_outgoing_message_handler
.send(Broadcast(( .send(Broadcast(ServerToClientMessage::ChatUpdate(
self.state.tx_broadcast.clone(), generate_chat_update(&self.state),
ServerToClientMessage::ChatUpdate(generate_chat_update(&self.state)),
))) )))
.await .await
{ {