From 0e30551221c636f9679e9d8fe0e476c0025f8495 Mon Sep 17 00:00:00 2001 From: Adam <24621027+adoyle0@users.noreply.github.com> Date: Thu, 8 Aug 2024 04:07:23 -0400 Subject: [PATCH] mostly move message handler and update new user name to client --- lib/src/lib.rs | 2 + server/src/lib.rs | 44 ++++----- server/src/main.rs | 16 ++- server/src/message_handler.rs | 59 ++++++++++++ server/src/user_handler.rs | 177 ++++++++++++++++++++++++++++------ server/src/websocket.rs | 136 +++++--------------------- 6 files changed, 270 insertions(+), 164 deletions(-) create mode 100644 server/src/message_handler.rs diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 07a7242..c40e053 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,3 +1,5 @@ +use std::net::SocketAddr; + use serde::{Deserialize, Serialize}; /// Card Pack Meta diff --git a/server/src/lib.rs b/server/src/lib.rs index 6a4a0b3..8fcdf79 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,10 +1,9 @@ #![feature(if_let_guard)] -use crate::mpsc::Sender; use anyhow::{Context, Result}; +use axum::extract::ws::Message; use lib::*; use rand::prelude::IteratorRandom; -use rand::prelude::SliceRandom; use rand::thread_rng; use serde::Deserialize; use std::{ @@ -14,26 +13,24 @@ use std::{ net::SocketAddr, sync::{Arc, RwLock}, }; +use tokio::sync::mpsc::Sender; use tokio::sync::{broadcast, mpsc}; -pub mod websocket; +use user_handler::UserHandlerMessage; +pub mod message_handler; pub mod user_handler; +pub mod websocket; /// User -#[derive(Default, Debug, Eq, PartialEq, Hash)] +#[derive(Debug)] pub struct User { pub name: String, + pub tx: Sender, } impl User { /// Create a new user object from incoming data - pub fn new(state: &Arc) -> User { - User { - name: format!( - "{} {}", - state.first_names.choose(&mut rand::thread_rng()).unwrap(), - state.last_names.choose(&mut rand::thread_rng()).unwrap(), - ), - } + pub fn new(name: String, tx: Sender) -> User { + User { name, tx } } pub fn change_name(&mut self, new_name: String) { @@ -109,7 +106,7 @@ pub struct Player { } /// The game master -#[derive(Default, Debug)] +#[derive(Debug)] pub struct Game { /// The name of the game pub name: String, @@ -151,7 +148,15 @@ impl Game { pub fn new(request: NewGameManifest) -> Result { let mut game = Game { - ..Default::default() + name: request.host.read().unwrap().name.clone(), + host: request.host.clone(), + white: vec![], + black: vec![], + white_discard: vec![], + black_discard: vec![], + game_active: false, + players: vec![], + current_black: Option::None, }; tracing::debug!( "Creating game {} with {} as host", @@ -353,16 +358,11 @@ pub fn load_names(path: &str) -> Result> { Ok(buf) } -#[derive(Debug)] -pub struct NewUser { - pub sender: Sender, - pub addr: SocketAddr, -} - // Our shared state pub struct AppState { - pub broadcast_channel: broadcast::Sender, - pub users_tx: mpsc::Sender, + pub broadcast_tx: broadcast::Sender, + pub users_tx: mpsc::Sender, + pub messages_tx: mpsc::Sender<(SocketAddr, Message)>, pub first_names: Vec, pub last_names: Vec, pub reserved_names: RwLock>, diff --git a/server/src/main.rs b/server/src/main.rs index 44b16ec..9222b9e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,3 +1,4 @@ +use crate::message_handler::*; use crate::websocket::*; use anyhow::{Context, Result}; use axum::{routing::get, Router}; @@ -24,8 +25,9 @@ async fn main() -> Result<()> { .init(); // Set up state - let (broadcast_channel, _rx) = broadcast::channel(100); + let (broadcast_tx, _rx) = broadcast::channel(100); let (users_tx, mut users_rx) = mpsc::channel(100); + let (messages_tx, mut messages_rx) = mpsc::channel(100); let first_names = load_names("data/first.txt")?; let last_names = load_names("data/last.txt")?; let reserved_names = RwLock::new(HashSet::::new()); @@ -35,8 +37,9 @@ async fn main() -> Result<()> { let games = RwLock::new(HashMap::new()); let app_state = Arc::new(AppState { - broadcast_channel, + broadcast_tx, users_tx, + messages_tx, first_names, last_names, reserved_names, @@ -47,10 +50,17 @@ async fn main() -> Result<()> { games, }); + let message_handler = MessageHandler::new(app_state.clone()); + tokio::spawn(async move { + while let Some((addr, message)) = messages_rx.recv().await { + message_handler.handle(addr, message).await; + } + }); + let user_handler = UserHandler::new(app_state.clone()); tokio::spawn(async move { while let Some(message) = users_rx.recv().await { - user_handler.process(message).await; + user_handler.handle(message).await; } }); diff --git a/server/src/message_handler.rs b/server/src/message_handler.rs new file mode 100644 index 0000000..b3fe3c9 --- /dev/null +++ b/server/src/message_handler.rs @@ -0,0 +1,59 @@ +use crate::user_handler::*; +use crate::AppState; +use axum::extract::ws::Message; +use lib::*; +use serde_json::{from_str, to_string}; +use std::net::SocketAddr; +use std::sync::Arc; + +/// Handle incoming messages over the WebSocket + +pub struct MessageHandler { + state: Arc, +} + +impl MessageHandler { + pub fn new(state: Arc) -> Self { + MessageHandler { state } + } + + pub async fn handle(&self, addr: SocketAddr, message: Message) { + match message { + Message::Text(text) => match text { + _chat_message if let Ok(chat_message) = from_str::(&text) => { + // This should be delegated to user handler and an outgoing message and/or chat handler + let msg = format! {"{0}: {1}", self.state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text}; + tracing::debug!("{msg}"); + self.state + .broadcast_tx + .send(to_string::(&ChatMessage { text: msg }).unwrap()) + .unwrap(); + } + _user_log_in if let Ok(user_log_in) = from_str::(&text) => { + self.state + .users_tx + .send(UserHandlerMessage::UserLogIn { + username: user_log_in.username, + addr, + }) + .await + .unwrap(); + tracing::debug!("passed login to user handler"); + } + + _ => tracing::debug!("Unhandled text from {}", addr), + }, + Message::Binary(data) => tracing::debug!("{} sent binary: {:?}", addr, data), + Message::Close(_close_frame) => { + // make this + // websocket_handle_close(close_frame, &state, tx, addr)?; + } + Message::Pong(ping) => { + tracing::debug!("Pong received with: {:?}", ping); + } + Message::Ping(pong) => { + tracing::debug!("Pong received with: {:?}", pong); + } + } + } +} diff --git a/server/src/user_handler.rs b/server/src/user_handler.rs index 9c685e4..7444823 100644 --- a/server/src/user_handler.rs +++ b/server/src/user_handler.rs @@ -1,11 +1,15 @@ use crate::AppState; -use crate::NewUser; use crate::User; use lib::*; use serde_json::to_string; use std::net::SocketAddr; use std::sync::{Arc, RwLock}; +pub enum UserHandlerMessage { + NewUser { user: User, addr: SocketAddr }, + UserLogIn { username: String, addr: SocketAddr }, +} + pub struct UserHandler { state: Arc, } @@ -15,43 +19,38 @@ impl UserHandler { UserHandler { state } } - pub async fn process(&self, message: NewUser) { + pub async fn handle(&self, message: UserHandlerMessage) { + match message { + UserHandlerMessage::NewUser { user, addr } => { + // make this not async + self.set_up_new_user(user, addr).await + } + UserHandlerMessage::UserLogIn { username, addr } => self.login(username, addr).await, + } + } + + async fn set_up_new_user(&self, user: User, addr: SocketAddr) { // // Create, Register, and Hydrate new user // - let new_user = Arc::new(RwLock::new(User::new(&self.state))); + let tx = user.tx.clone(); + let new_user = Arc::new(RwLock::new(user)); // Notify client of new username - message - .sender - .send(user_client_self_update(&new_user)) - .await - .unwrap(); + tx.send(user_client_self_update(&new_user)).await.unwrap(); // Register using `addr` as key until something longer lived exists self.state .online_users .write() .unwrap() - .insert(message.addr, new_user.clone()); + .insert(addr, new_user.clone()); // Hydrate client - // this should probably be combined and sent as one - // - // message.sender.send(meta_chat_update(&self.state)).await; - // message - // .sender - // .send(meta_server_summary_update(&self.state)) - // .await - // .unwrap(); - message - .sender - .send(meta_games_browser_update(&self.state)) + tx.send(meta_games_browser_update(&self.state)) .await .unwrap(); - message - .sender - .send(meta_new_game_card_packs(&self.state)) + tx.send(meta_new_game_card_packs(&self.state)) .await .unwrap(); @@ -59,23 +58,145 @@ impl UserHandler { // this should probably be combined and sent as one let _ = &self .state - .broadcast_channel - .send(meta_announce_user_join(&self.state, &message.addr)) + .broadcast_tx + .send(meta_announce_user_join(&self.state, &addr)) .unwrap(); let _ = &self .state - .broadcast_channel + .broadcast_tx .send(meta_server_summary_update(&self.state)) .unwrap(); let _ = &self .state - .broadcast_channel + .broadcast_tx .send(meta_chat_update(&self.state)) .unwrap(); // this races the broadcasts but if it's done last it'll probably show up // last - message.sender.send(meta_motd()).await.unwrap(); + tx.send(meta_motd()).await.unwrap(); + } + + async fn login(&self, username: String, addr: SocketAddr) { + // User's DM channel + let dm = self + .state + .online_users + .read() + .unwrap() + .get(&addr) + .unwrap() + .read() + .unwrap() + .tx + .clone(); + + let broadcast = self.state.broadcast_tx.clone(); + + let old_name = self + .state + .online_users + .read() + .unwrap() + .get(&addr) + .unwrap() + .read() + .unwrap() + .name + .clone(); + + let new_name = username.clone(); + + if self + .state + .offline_users + .read() + .unwrap() + .contains_key(&new_name) + { + self.state + .online_users + .write() + .unwrap() + .insert( + addr, + self.state + .offline_users + .write() + .unwrap() + .remove(&new_name) + .unwrap(), + ) + .unwrap(); + + let msg = format! { + "{0} changed name to {1}. Welcome back!", + old_name, + new_name + }; + + tracing::debug!("{msg}"); + } else if self + .state + .reserved_names + .read() + .unwrap() + .contains(&new_name) + { + tracing::debug!("name is taken"); + } else { + self.state + .online_users + .write() + .unwrap() + .get_mut(&addr) + .unwrap() + .write() + .unwrap() + .change_name(username); + + let msg = format! { + "{0} changed name to {1}.", + old_name, + new_name + }; + + // Reserve name + self.state + .reserved_names + .write() + .unwrap() + .insert(new_name.clone()); + + tracing::debug!("{msg}"); + broadcast + .send(to_string::(&ChatMessage { text: msg }).unwrap()) + .unwrap(); + tracing::debug!("Name {} reserved.", &new_name); + } + + tracing::debug!( + "Online Users: {} Offline Users: {}", + self.state.online_users.read().unwrap().len(), + self.state.offline_users.read().unwrap().len() + ); + + broadcast + .send(meta_games_browser_update(&self.state)) + .unwrap(); + broadcast.send(meta_chat_update(&self.state)).unwrap(); + + // send the user their new name + dm.send( + to_string::(&UserUpdate { + username: new_name.clone(), + }) + .unwrap(), + ) + .await + .unwrap(); + + tracing::debug!(" HI! login received {} {} {}", addr, new_name, old_name) } } diff --git a/server/src/websocket.rs b/server/src/websocket.rs index 15211a6..ea61916 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -2,7 +2,7 @@ use crate::user_handler::*; use crate::AppState; use crate::Game; use crate::NewGameManifest; -use crate::NewUser; +use crate::User; use anyhow::Result; use axum::extract::ws::CloseFrame; use axum::{ @@ -14,7 +14,9 @@ use axum::{ }; use futures::{SinkExt, StreamExt}; use lib::*; +use rand::prelude::SliceRandom; use serde_json::{from_str, to_string}; +use std::collections::HashMap; use std::{ net::SocketAddr, sync::{Arc, RwLock}, @@ -40,16 +42,27 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc, ad // Create channel for direct messages let (dm_tx, mut dm_rx) = mpsc::channel(30); + let mut map = HashMap::new(); + map.insert(addr, dm_tx.clone()); + let _ = state .users_tx - .send(NewUser { - sender: dm_tx, + // add tx + .send(UserHandlerMessage::NewUser { + user: User::new( + format!( + "{} {}", + state.first_names.choose(&mut rand::thread_rng()).unwrap(), + state.last_names.choose(&mut rand::thread_rng()).unwrap(), + ), + dm_tx.clone(), + ), addr, }) .await; // Subscribe to receive from global broadcast channel - let mut rx = state.broadcast_channel.subscribe(); + let mut rx = state.broadcast_tx.subscribe(); // Send messages to this client let mut send_task = tokio::spawn(async move { @@ -79,6 +92,11 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc, ad // Receive messages from this client let mut recv_task = tokio::spawn(async move { while let Some(Ok(message)) = receiver.next().await { + state + .messages_tx + .send((addr.clone(), message.clone())) + .await + .unwrap(); websocket_message_handler(state.clone(), addr, message) .await .expect("Message Handler exploded!") @@ -98,7 +116,7 @@ pub async fn websocket_message_handler( addr: SocketAddr, message: Message, ) -> Result<()> { - let tx = &state.broadcast_channel; + let tx = &state.broadcast_tx; match message { Message::Text(text) => match text { @@ -106,12 +124,6 @@ pub async fn websocket_message_handler( tracing::debug!("New game request received."); game_handle_new_game(_new_game, &state, tx, addr)?; } - _chat_message if let Ok(_chat_message) = from_str::(&text) => { - websocket_handle_chat_message(_chat_message, &state, tx, addr)?; - } - _user_log_in if let Ok(_user_log_in) = from_str::(&text) => { - websocket_handle_user_log_in(_user_log_in, &state, tx, addr)?; - } _ => { tracing::debug!("Unhandled text message: {}", &text); } @@ -166,106 +178,6 @@ fn game_handle_new_game( Ok(()) } -/// This runs when a ChatMessage is received -fn websocket_handle_chat_message( - chat_message: ChatMessage, - state: &Arc, - tx: &Sender, - addr: SocketAddr, -) -> Result<()> { - let msg = format! {"{0}: {1}", state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text}; - tracing::debug!("{msg}"); - tx.send(to_string::(&ChatMessage { text: msg })?)?; - - Ok(()) -} - -/// This runs when a UserLogIn is received -fn websocket_handle_user_log_in( - user_log_in: UserLogIn, - state: &Arc, - tx: &Sender, - addr: SocketAddr, -) -> Result<()> { - let old_name = state - .online_users - .read() - .unwrap() - .get(&addr) - .unwrap() - .read() - .unwrap() - .name - .clone(); - let new_name = user_log_in.username.clone(); - - if state.offline_users.read().unwrap().contains_key(&new_name) { - state - .online_users - .write() - .unwrap() - .insert( - addr, - state - .offline_users - .write() - .unwrap() - .remove(&new_name) - .unwrap(), - ) - .unwrap(); - - let msg = format! { - "{0} changed name to {1}. Welcome back!", - old_name, - new_name - }; - - tracing::debug!("{msg}"); - } else if state.reserved_names.read().unwrap().contains(&new_name) { - tracing::debug!("name is taken"); - } else { - state - .online_users - .write() - .unwrap() - .get_mut(&addr) - .unwrap() - .write() - .unwrap() - .change_name(user_log_in.username); - - let msg = format! { - "{0} changed name to {1}.", - old_name, - new_name - }; - - // Reserve name - state - .reserved_names - .write() - .unwrap() - .insert(new_name.clone()); - - tracing::debug!("{msg}"); - tx.send(to_string::(&ChatMessage { text: msg })?)?; - tracing::debug!("Name {} reserved.", &new_name); - } - - tracing::debug!( - "Online Users: {} Offline Users: {}", - state.online_users.read().unwrap().len(), - state.offline_users.read().unwrap().len() - ); - tx.send(meta_games_browser_update(state))?; - tx.send(meta_chat_update(state))?; - - // send the user their new name - - Ok(()) -} - /// This runs when a connection closes fn websocket_handle_close( close_frame: Option, @@ -309,6 +221,8 @@ fn websocket_handle_close( tracing::debug!("{}", msg.text); tx.send(to_string::(&msg)?)?; + + // Move user to offline let name = state .online_users .read()