diff --git a/server/src/lib.rs b/server/src/lib.rs index a2acd4f..6a4a0b3 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -2,7 +2,6 @@ use crate::mpsc::Sender; use anyhow::{Context, Result}; -use axum::extract::ws::Message; use lib::*; use rand::prelude::IteratorRandom; use rand::prelude::SliceRandom; @@ -17,6 +16,7 @@ use std::{ }; use tokio::sync::{broadcast, mpsc}; pub mod websocket; +pub mod user_handler; /// User #[derive(Default, Debug, Eq, PartialEq, Hash)] @@ -355,13 +355,13 @@ pub fn load_names(path: &str) -> Result> { #[derive(Debug)] pub struct NewUser { - pub sender: Sender, + pub sender: Sender, pub addr: SocketAddr, } // Our shared state pub struct AppState { - pub tx: broadcast::Sender, + pub broadcast_channel: broadcast::Sender, pub users_tx: mpsc::Sender, pub first_names: Vec, pub last_names: Vec, diff --git a/server/src/main.rs b/server/src/main.rs index 8b380e6..07ab021 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,8 +1,5 @@ -use crate::meta::*; -use crate::user::*; use crate::websocket::*; use anyhow::{Context, Result}; -use axum::extract::ws::Message; use axum::{routing::get, Router}; use server::*; use std::{ @@ -13,6 +10,7 @@ use std::{ use tokio::sync::{broadcast, mpsc}; use tower_http::services::ServeDir; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use user_handler::UserHandler; #[tokio::main] async fn main() -> Result<()> { @@ -26,7 +24,7 @@ async fn main() -> Result<()> { .init(); // Set up state - let (tx, _rx) = broadcast::channel(100); + let (broadcast_channel, _rx) = broadcast::channel(100); let (users_tx, mut users_rx) = mpsc::channel(100); let first_names = load_names("data/first.txt")?; let last_names = load_names("data/last.txt")?; @@ -37,7 +35,7 @@ async fn main() -> Result<()> { let games = RwLock::new(HashMap::new()); let app_state = Arc::new(AppState { - tx, + broadcast_channel, users_tx, first_names, last_names, @@ -49,72 +47,11 @@ async fn main() -> Result<()> { games, }); - let cloned_state = app_state.clone(); + let user_handler = UserHandler::new(app_state.clone()); let _user_handler = tokio::spawn(async move { while let Some(message) = users_rx.recv().await { - // - // Create, Register, and Hydrate new user - // - // Create - let new_user = Arc::new(RwLock::new(User::new(&cloned_state))); - - // Notify client of new username - message - .sender - .send(Message::Text(user_client_self_update(&new_user))) - .await - .unwrap(); - - // Register using `addr` as key until something longer lived exists - cloned_state - .online_users - .write() - .unwrap() - .insert(message.addr, new_user.clone()); - - // Hydrate client - // this should probably be combined and sent as one - message - .sender - .send(Message::Text(meta_chat_update(&cloned_state))) - .await - .unwrap(); - message - .sender - .send(Message::Text(meta_motd())) - .await - .unwrap(); - message - .sender - .send(Message::Text(meta_server_summary_update(&cloned_state))) - .await - .unwrap(); - message - .sender - .send(Message::Text(meta_games_browser_update(&cloned_state))) - .await - .unwrap(); - message - .sender - .send(Message::Text(meta_new_game_card_packs(&cloned_state))) - .await - .unwrap(); - - // Broadcast new user's existence - // this should probably be combined and sent as one - let _ = &cloned_state - .tx - .send(meta_announce_user_join(&cloned_state, &message.addr)) - .unwrap(); - let _ = &cloned_state - .tx - .send(meta_server_summary_update(&cloned_state)) - .unwrap(); - let _ = &cloned_state - .tx - .send(meta_chat_update(&cloned_state)) - .unwrap(); + user_handler.process(message).await.unwrap(); } }); diff --git a/server/src/user_handler.rs b/server/src/user_handler.rs new file mode 100644 index 0000000..0eebe2c --- /dev/null +++ b/server/src/user_handler.rs @@ -0,0 +1,85 @@ +use crate::websocket::meta::*; +use crate::websocket::user::*; +use crate::AppState; +use crate::NewUser; +use crate::User; +use anyhow::Result; +use std::sync::{Arc, RwLock}; + +pub struct UserHandler { + state: Arc, +} + +impl UserHandler { + pub fn new(state: Arc) -> Self { + UserHandler { state } + } + + pub async fn process(&self, message: NewUser) -> Result<()> { + // + // Create, Register, and Hydrate new user + // + let new_user = Arc::new(RwLock::new(User::new(&self.state))); + + // Notify client of new username + message + .sender + .send(user_client_self_update(&new_user)) + .await + .unwrap(); + + // Register using `addr` as key until something longer lived exists + self.state + .online_users + .write() + .unwrap() + .insert(message.addr, new_user.clone()); + + // Hydrate client + // this should probably be combined and sent as one + message + .sender + .send(meta_chat_update(&self.state)) + .await + .unwrap(); + message + .sender + .send(meta_server_summary_update(&self.state)) + .await + .unwrap(); + message + .sender + .send(meta_games_browser_update(&self.state)) + .await + .unwrap(); + message + .sender + .send(meta_new_game_card_packs(&self.state)) + .await + .unwrap(); + + // Broadcast new user's existence + // this should probably be combined and sent as one + let _ = &self + .state + .broadcast_channel + .send(meta_announce_user_join(&self.state, &message.addr)) + .unwrap(); + let _ = &self + .state + .broadcast_channel + .send(meta_server_summary_update(&self.state)) + .unwrap(); + let _ = &self + .state + .broadcast_channel + .send(meta_chat_update(&self.state)) + .unwrap(); + + // this races the broadcasts but if it's done last it'll probably show up + // last + message.sender.send(meta_motd()).await.unwrap(); + + Ok(()) + } +} diff --git a/server/src/websocket.rs b/server/src/websocket.rs index 39f7898..e6e7b6d 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -51,21 +51,28 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc, ad .await; // Subscribe to receive from global broadcast channel - let mut rx = state.tx.subscribe(); + let mut rx = state.broadcast_channel.subscribe(); // Send messages to this client let mut send_task = tokio::spawn(async move { + let mut broadcast = None; + let mut dm = None; loop { - // Global broadcast channel - while let Ok(msg) = rx.recv().await { - if sender.send(Message::Text(msg)).await.is_err() { + tokio::select! { + b = rx.recv() => broadcast = Some(b.unwrap()), + d = dm_rx.recv() => dm = d, + }; + + if let Some(msg) = &dm { + if sender.send(Message::Text(msg.to_string())).await.is_err() { break; + } else { + dm = Option::None; } - } - // Direct message channel - while let Some(msg) = dm_rx.recv().await { - if sender.send(msg).await.is_err() { - break; + } else if let Some(msg) = &broadcast { + if sender.send(Message::Text(msg.to_string())).await.is_err() { + } else { + broadcast = Option::None; } } } @@ -93,7 +100,7 @@ pub async fn websocket_message_handler( addr: SocketAddr, message: Message, ) -> Result<()> { - let tx = &state.tx; + let tx = &state.broadcast_channel; match message { Message::Text(text) => match text { diff --git a/server/src/websocket/meta.rs b/server/src/websocket/meta.rs index 6cefeb7..6ceab34 100644 --- a/server/src/websocket/meta.rs +++ b/server/src/websocket/meta.rs @@ -21,7 +21,6 @@ pub fn meta_chat_update(state: &Arc) -> String { /// Generage cards meta message pub fn meta_new_game_card_packs(state: &Arc) -> String { - tracing::debug!("sending cards meta"); to_string::(&state.packs_meta).unwrap() }