diff --git a/server/src/api.rs b/server/src/api.rs deleted file mode 100644 index 9ed8c9f..0000000 --- a/server/src/api.rs +++ /dev/null @@ -1,201 +0,0 @@ -use crate::AppState; -use anyhow::Result; -use axum::{ - extract::{ - ws::{Message, WebSocket}, - ConnectInfo, State, WebSocketUpgrade, - }, - response::IntoResponse, -}; -use futures::stream::SplitSink; -use futures::{SinkExt, StreamExt}; -use lib::*; -use rand::seq::SliceRandom; -use serde_json::to_string; -use server::*; -use std::{ - net::SocketAddr, - sync::{Arc, RwLock}, -}; -pub mod message_handler; -use crate::message_handler::*; - -/// Establish the WebSocket connection -pub async fn websocket_connection_handler( - ws: WebSocketUpgrade, - // user_agent: Option>, - ConnectInfo(addr): ConnectInfo, - State(state): State>, -) -> impl IntoResponse { - tracing::debug!("New connection from {}", &addr); - ws.on_upgrade(move |socket| on_websocket_connection(socket, state, addr)) -} - -/// This runs right after a WebSocket connection is established -pub async fn on_websocket_connection(stream: WebSocket, state: Arc, addr: SocketAddr) { - // Split channels to send and receive asynchronously. - let (mut sender, mut receiver) = stream.split(); - - // Set up new user - handle_new_user(&mut sender, &state, &addr) - .await - .expect("Error creating new user!"); - - // Subscribe to receive from global broadcast channel - let mut rx = state.tx.subscribe(); - - // Send messages to this client - let mut send_task = tokio::spawn(async move { - while let Ok(msg) = rx.recv().await { - if sender.send(Message::Text(msg)).await.is_err() { - break; - } - } - }); - - // Receive messages from this client - let mut recv_task = tokio::spawn(async move { - while let Some(Ok(message)) = receiver.next().await { - message_handler(state.clone(), addr, message) - .await - .expect("Message Handler exploded!") - } - }); - - // If either task completes then abort the other - tokio::select! { - _ = (&mut send_task) => recv_task.abort(), - _ = (&mut recv_task) => send_task.abort(), - }; -} - -/// Create, Register, and Hydrate new user -async fn handle_new_user( - sender: &mut SplitSink, - state: &Arc, - addr: &SocketAddr, -) -> Result<()> { - // Create - let new_user = Arc::new(RwLock::new(generate_new_user(state))); - - // Notify client of new username - sender - .send(Message::Text(client_self_user_update(&new_user))) - .await?; - - // Register using `addr` as key until something longer lived exists - state.online_users.write().unwrap().insert(*addr, new_user); - - // Hydrate client - // this should probably be combined and sent as one - sender.send(Message::Text(chat_meta_update(state))).await?; - sender.send(Message::Text(motd())).await?; - sender - .send(Message::Text(server_summary_update(state))) - .await?; - sender.send(Message::Text(games_update(state))).await?; - sender.send(Message::Text(cards_meta_update(state))).await?; - - // Broadcast new user's existence - // this should probably be combined and sent as one - state.tx.send(announce_join(state, addr))?; - state.tx.send(server_summary_update(state))?; - state.tx.send(chat_meta_update(state))?; - - Ok(()) -} - -/// Create a new user object from incoming data -fn generate_new_user(state: &Arc) -> User { - User { - name: format!( - "{} {}", - state.first_names.choose(&mut rand::thread_rng()).unwrap(), - state.last_names.choose(&mut rand::thread_rng()).unwrap(), - ), - } -} - -/// Generate message to notify client of user changes -fn client_self_user_update(new_user: &Arc>) -> String { - to_string::(&UserUpdate { - username: new_user.read().unwrap().name.clone(), - }) - .unwrap() -} - -/// Generate chatroom metadata update -fn chat_meta_update(state: &Arc) -> String { - // this may get expensive if there are many users - let mut names = vec![]; - - for user in state.online_users.read().unwrap().iter() { - names.push(user.1.read().unwrap().name.clone()); - } - - to_string::(&ChatUpdate { - room: "Lobby".to_string(), - users: names, - }) - .unwrap() -} - -/// Generage cards meta message -fn cards_meta_update(state: &Arc) -> String { - tracing::debug!("sending cards meta"); - to_string::(&state.packs_meta).unwrap() -} - -/// Generate message-of-the-day server greeting -fn motd() -> String { - to_string::(&ChatMessage { - text: "Greetings from the game server!".to_string(), - }) - .unwrap() -} - -/// Generate server summary update - mostly debug stuff -fn server_summary_update(state: &Arc) -> String { - let online_users = state.online_users.read().unwrap().len(); - let active_games = state.games.read().unwrap().len(); - to_string::(&ServerStateSummary { - online_users, - active_games, - }) - .unwrap() -} - -/// Generate games list update -fn games_update(state: &Arc) -> String { - // this may get expensive if there are many games - let mut names = vec![]; - - for game in state.games.read().unwrap().iter() { - names.push(format!( - "Name: {} Host: {}", - game.name, - game.host.read().unwrap().name - )); - } - - to_string::(&GamesUpdate { games: names }).unwrap() -} - -/// Generate chatroom join announcement -fn announce_join(state: &Arc, addr: &SocketAddr) -> String { - let msg = format!( - "{} joined.", - state - .online_users - .read() - .unwrap() - .get(addr) - .unwrap() - .read() - .unwrap() - .name - ); - - tracing::debug!("{}", &msg); - to_string::(&ChatMessage { text: msg }).unwrap() -} diff --git a/server/src/lib.rs b/server/src/lib.rs index 280e46b..831e450 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(if_let_guard)] + use anyhow::{Context, Result}; use lib::*; use rand::prelude::IteratorRandom; @@ -11,6 +13,10 @@ use std::{ sync::{Arc, RwLock}, }; use tokio::sync::broadcast; +pub mod websocket; +pub mod meta; +pub mod user; +use crate::user::*; /// Card Set #[derive(Debug)] @@ -68,18 +74,6 @@ pub struct NewGameManifest { pub host: Arc>, } -/// User -#[derive(Default, Debug, Eq, PartialEq, Hash)] -pub struct User { - pub name: String, -} - -impl User { - pub fn change_name(&mut self, new_name: String) { - self.name = new_name; - } -} - /// A struct that represents a player #[derive(Debug)] pub struct Player { diff --git a/server/src/main.rs b/server/src/main.rs index cdf8e48..d8e5421 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,7 +1,8 @@ -#![feature(if_let_guard)] - +use crate::websocket::*; use anyhow::{Context, Result}; use axum::{routing::get, Router}; +use server::user::*; +use server::*; use std::{ collections::{HashMap, HashSet}, net::SocketAddr, @@ -10,9 +11,6 @@ use std::{ use tokio::sync::broadcast; use tower_http::services::ServeDir; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -pub mod api; -use crate::api::*; -use server::*; #[tokio::main] async fn main() -> Result<()> { diff --git a/server/src/meta.rs b/server/src/meta.rs new file mode 100644 index 0000000..88a4f76 --- /dev/null +++ b/server/src/meta.rs @@ -0,0 +1,80 @@ +use crate::AppState; +use lib::*; +use serde_json::to_string; +use std::{net::SocketAddr, sync::Arc}; + +/// Generate chatroom metadata update +pub fn meta_chat_update(state: &Arc) -> String { + // this may get expensive if there are many users + let mut names = vec![]; + + for user in state.online_users.read().unwrap().iter() { + names.push(user.1.read().unwrap().name.clone()); + } + + to_string::(&ChatUpdate { + room: "Lobby".to_string(), + users: names, + }) + .unwrap() +} + +/// Generage cards meta message +pub fn meta_new_game_card_packs(state: &Arc) -> String { + tracing::debug!("sending cards meta"); + to_string::(&state.packs_meta).unwrap() +} + +/// Generate message-of-the-day server greeting +pub fn meta_motd() -> String { + to_string::(&ChatMessage { + text: "Greetings from the game server!".to_string(), + }) + .unwrap() +} + +/// Generate server summary update - mostly debug stuff +pub fn meta_server_summary_update(state: &Arc) -> String { + let online_users = state.online_users.read().unwrap().len(); + let active_games = state.games.read().unwrap().len(); + to_string::(&ServerStateSummary { + online_users, + active_games, + }) + .unwrap() +} + +/// Generate games list update +pub fn meta_games_browser_update(state: &Arc) -> String { + // this may get expensive if there are many games + let mut names = vec![]; + + for game in state.games.read().unwrap().iter() { + names.push(format!( + "Name: {} Host: {}", + game.name, + game.host.read().unwrap().name + )); + } + + to_string::(&GamesUpdate { games: names }).unwrap() +} + +/// Generate chatroom join announcement +pub fn meta_announce_user_join(state: &Arc, addr: &SocketAddr) -> String { + let msg = format!( + "{} joined.", + state + .online_users + .read() + .unwrap() + .get(addr) + .unwrap() + .read() + .unwrap() + .name + ); + + tracing::debug!("{}", &msg); + to_string::(&ChatMessage { text: msg }).unwrap() +} diff --git a/server/src/user.rs b/server/src/user.rs new file mode 100644 index 0000000..734729c --- /dev/null +++ b/server/src/user.rs @@ -0,0 +1,37 @@ +use crate::AppState; +use lib::*; +use rand::prelude::SliceRandom; +use serde_json::to_string; +use std::sync::Arc; +use std::sync::RwLock; + +/// User +#[derive(Default, Debug, Eq, PartialEq, Hash)] +pub struct User { + pub name: String, +} + +impl User { + /// Create a new user object from incoming data + pub fn new(state: &Arc) -> User { + User { + name: format!( + "{} {}", + state.first_names.choose(&mut rand::thread_rng()).unwrap(), + state.last_names.choose(&mut rand::thread_rng()).unwrap(), + ), + } + } + + pub fn change_name(&mut self, new_name: String) { + self.name = new_name; + } +} + +/// Generate message to notify client of user changes +pub fn user_client_self_update(new_user: &Arc>) -> String { + to_string::(&UserUpdate { + username: new_user.read().unwrap().name.clone(), + }) + .unwrap() +} diff --git a/server/src/api/message_handler.rs b/server/src/websocket.rs similarity index 58% rename from server/src/api/message_handler.rs rename to server/src/websocket.rs index c5248b1..bfbf571 100644 --- a/server/src/api/message_handler.rs +++ b/server/src/websocket.rs @@ -1,13 +1,118 @@ -use crate::api::*; +use crate::meta::*; +use crate::user::*; use crate::AppState; -use crate::Arc; +use crate::Game; +use crate::NewGameManifest; use anyhow::Result; use axum::extract::ws::CloseFrame; +use axum::{ + extract::{ + ws::{Message, WebSocket}, + ConnectInfo, State, WebSocketUpgrade, + }, + response::IntoResponse, +}; +use futures::stream::SplitSink; +use futures::{SinkExt, StreamExt}; +use lib::*; use serde_json::{from_str, to_string}; +use std::{ + net::SocketAddr, + sync::{Arc, RwLock}, +}; use tokio::sync::broadcast::Sender; +/// Establish the WebSocket connection +pub async fn websocket_connection_handler( + ws: WebSocketUpgrade, + // user_agent: Option>, + ConnectInfo(addr): ConnectInfo, + State(state): State>, +) -> impl IntoResponse { + tracing::debug!("New connection from {}", &addr); + ws.on_upgrade(move |socket| websocket_on_connection(socket, state, addr)) +} + +/// This runs right after a WebSocket connection is established +pub async fn websocket_on_connection(stream: WebSocket, state: Arc, addr: SocketAddr) { + // Split channels to send and receive asynchronously. + let (mut sender, mut receiver) = stream.split(); + + // Set up new user + user_handle_new(&mut sender, &state, &addr) + .await + .expect("Error creating new user!"); + + // Subscribe to receive from global broadcast channel + let mut rx = state.tx.subscribe(); + + // Send messages to this client + let mut send_task = tokio::spawn(async move { + while let Ok(msg) = rx.recv().await { + if sender.send(Message::Text(msg)).await.is_err() { + break; + } + } + }); + + // Receive messages from this client + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(message)) = receiver.next().await { + websocket_message_handler(state.clone(), addr, message) + .await + .expect("Message Handler exploded!") + } + }); + + // If either task completes then abort the other + tokio::select! { + _ = (&mut send_task) => recv_task.abort(), + _ = (&mut recv_task) => send_task.abort(), + }; +} + +/// Create, Register, and Hydrate new user +pub async fn user_handle_new( + sender: &mut SplitSink, + state: &Arc, + addr: &SocketAddr, +) -> Result<()> { + // Create + let new_user = Arc::new(RwLock::new(User::new(state))); + + // Notify client of new username + sender + .send(Message::Text(user_client_self_update(&new_user))) + .await?; + + // Register using `addr` as key until something longer lived exists + state.online_users.write().unwrap().insert(*addr, new_user); + + // Hydrate client + // this should probably be combined and sent as one + sender.send(Message::Text(meta_chat_update(state))).await?; + sender.send(Message::Text(meta_motd())).await?; + sender + .send(Message::Text(meta_server_summary_update(state))) + .await?; + sender + .send(Message::Text(meta_games_browser_update(state))) + .await?; + sender + .send(Message::Text(meta_new_game_card_packs(state))) + .await?; + + // Broadcast new user's existence + // this should probably be combined and sent as one + state.tx.send(meta_announce_user_join(state, addr))?; + state.tx.send(meta_server_summary_update(state))?; + state.tx.send(meta_chat_update(state))?; + + Ok(()) +} + /// Handle incoming messages over the WebSocket -pub async fn message_handler( +pub async fn websocket_message_handler( state: Arc, addr: SocketAddr, message: Message, @@ -18,13 +123,13 @@ pub async fn message_handler( Message::Text(text) => match text { _new_game if let Ok(_new_game) = from_str::(&text) => { tracing::debug!("New game request received."); - handle_new_game(_new_game, &state, tx, addr)?; + game_handle_new_game(_new_game, &state, tx, addr)?; } _chat_message if let Ok(_chat_message) = from_str::(&text) => { - handle_chat_message(_chat_message, &state, tx, addr)?; + websocket_handle_chat_message(_chat_message, &state, tx, addr)?; } _user_log_in if let Ok(_user_log_in) = from_str::(&text) => { - handle_user_log_in(_user_log_in, &state, tx, addr)?; + websocket_handle_user_log_in(_user_log_in, &state, tx, addr)?; } _ => { tracing::debug!("Unhandled text message: {}", &text); @@ -34,7 +139,7 @@ pub async fn message_handler( tracing::debug!("Binary: {:?}", data) } Message::Close(close_frame) => { - handle_close(close_frame, &state, tx, addr)?; + websocket_handle_close(close_frame, &state, tx, addr)?; } Message::Pong(ping) => { tracing::debug!("Pong received with: {:?}", ping); @@ -48,7 +153,7 @@ pub async fn message_handler( } /// This runs when a NewGameRequest is received -fn handle_new_game( +fn game_handle_new_game( new_game: NewGameRequest, state: &Arc, tx: &Sender, @@ -69,15 +174,15 @@ fn handle_new_game( // create game if let Ok(new_game_object) = Game::new(manifest) { state.games.write().unwrap().push(new_game_object); - tx.send(games_update(state))?; - tx.send(server_summary_update(state))?; + tx.send(meta_games_browser_update(state))?; + tx.send(meta_server_summary_update(state))?; } Ok(()) } /// This runs when a ChatMessage is received -fn handle_chat_message( +fn websocket_handle_chat_message( chat_message: ChatMessage, state: &Arc, tx: &Sender, @@ -91,7 +196,7 @@ fn handle_chat_message( } /// This runs when a UserLogIn is received -fn handle_user_log_in( +fn websocket_handle_user_log_in( user_log_in: UserLogIn, state: &Arc, tx: &Sender, @@ -168,8 +273,8 @@ fn handle_user_log_in( state.online_users.read().unwrap().len(), state.offline_users.read().unwrap().len() ); - tx.send(games_update(state))?; - tx.send(chat_meta_update(state))?; + tx.send(meta_games_browser_update(state))?; + tx.send(meta_chat_update(state))?; // send the user their new name @@ -177,7 +282,7 @@ fn handle_user_log_in( } /// This runs when a connection closes -fn handle_close( +fn websocket_handle_close( close_frame: Option, state: &Arc, tx: &Sender, @@ -235,8 +340,8 @@ fn handle_close( state.online_users.write().unwrap().remove(&addr).unwrap(), ); - tx.send(server_summary_update(state))?; - tx.send(chat_meta_update(state))?; + tx.send(meta_server_summary_update(state))?; + tx.send(meta_chat_update(state))?; Ok(()) }