diff --git a/server/src/api.rs b/server/src/api.rs index e68679b..8085fd0 100644 --- a/server/src/api.rs +++ b/server/src/api.rs @@ -1,4 +1,5 @@ use crate::AppState; +use anyhow::Result; use axum::{ extract::{ ws::{Message, WebSocket}, @@ -6,6 +7,7 @@ use axum::{ }, response::IntoResponse, }; +use futures::stream::SplitSink; use futures::{SinkExt, StreamExt}; use lib::models::*; use rand::seq::SliceRandom; @@ -15,111 +17,116 @@ use std::{net::SocketAddr, sync::Arc}; pub mod message_handler; use crate::message_handler::*; -fn motd() -> ChatMessage { - ChatMessage { +fn motd() -> String { + to_string::(&ChatMessage { text: "Greetings from the game server!".to_string(), - } + }) + .unwrap() } -fn server_sum_update(state: &Arc) -> ServerStateSummary { - ServerStateSummary { +fn server_summary_update(state: &Arc) -> String { + to_string::(&ServerStateSummary { online_users: state.users.lock().unwrap().len(), active_games: state.games.lock().unwrap().len(), - } + }) + .unwrap() } -fn generate_chat_update(state: &Arc) -> ChatUpdate { +fn chat_update(state: &Arc) -> String { let mut names = vec![]; for user in state.users.lock().unwrap().iter() { names.push(user.1.name.clone()); } - ChatUpdate { + to_string::(&ChatUpdate { room: "Lobby".to_string(), users: names, - } + }) + .unwrap() } -fn generate_games_update(state: &Arc) -> GamesUpdate { +fn games_update(state: &Arc) -> String { let mut names = vec![]; for game in state.games.lock().unwrap().iter() { names.push(game.name.clone()); } - GamesUpdate { games: names } + to_string::(&GamesUpdate { games: names }).unwrap() } -pub async fn on_websocket_connection(stream: WebSocket, state: Arc, addr: SocketAddr) { - // Add User to users - let new_user = User { +fn announce_join(state: &Arc, addr: &SocketAddr) -> String { + let msg = format!( + "{} joined.", + state.users.lock().unwrap().get(addr).unwrap().name + ); + + tracing::debug!("{}", &msg); + + to_string::(&ChatMessage { text: msg }).unwrap() +} + +fn generate_new_user(state: &Arc) -> User { + User { name: format!( "{} {}", state.first_names.choose(&mut rand::thread_rng()).unwrap(), state.last_names.choose(&mut rand::thread_rng()).unwrap(), ), - }; + } +} - // By splitting, we can send and receive at the same time. +fn client_self_user_update(new_user: &User) -> String { + to_string::(&UserUpdate { + username: new_user.name.clone(), + }) + .unwrap() +} + +/// Create, Register, and Hydrate new user +async fn handle_new_user( + sender: &mut SplitSink, + state: &Arc, + addr: &SocketAddr, +) -> Result<()> { + // Create + let new_user = generate_new_user(state); + + // Notify client of new username + sender + .send(Message::Text(client_self_user_update(&new_user))) + .await?; + + // Register using `addr` as key until something longer lived exists + state.users.lock().unwrap().insert(*addr, new_user); + + // Hydrate client + sender.send(Message::Text(chat_update(state))).await?; + sender.send(Message::Text(motd())).await?; + sender + .send(Message::Text(server_summary_update(state))) + .await?; + sender.send(Message::Text(games_update(state))).await?; + state.tx.send(announce_join(state, addr))?; + state.tx.send(server_summary_update(state))?; + state.tx.send(chat_update(state))?; + + Ok(()) +} + +pub async fn on_websocket_connection(stream: WebSocket, state: Arc, addr: SocketAddr) { + // Split channels to send and receive asynchronously. let (mut sender, mut receiver) = stream.split(); - let _ = &sender - .send(Message::Text( - to_string::(&UserUpdate { - username: new_user.name.to_string(), - }) - .unwrap(), - )) - .await; + handle_new_user(&mut sender, &state, &addr) + .await + .expect("Error creating new user!"); - // Add user to users using `addr` as key until something longer lived exists - state.users.lock().unwrap().insert(addr, new_user); - - // hydrate user - let _ = &sender - .send(Message::Text( - to_string::(&generate_chat_update(&state)).unwrap(), - )) - .await; - let _ = &sender - .send(Message::Text(to_string::(&motd()).unwrap())) - .await; - let _ = &sender - .send(Message::Text( - to_string(&server_sum_update(&state)).unwrap(), - )) - .await; - let _ = &sender - .send(Message::Text( - to_string(&generate_games_update(&state)).unwrap(), - )) - .await; - - // ANNOUNCE THY PRESENCE - let msg = ChatMessage { - text: format!( - "{} joined.", - state.users.lock().unwrap().get(&addr).unwrap().name - ), - }; - tracing::debug!("{}", msg.text); - let _ = &state.tx.send(to_string::(&msg).unwrap()); - - // Broadcast server state summary update - let _ = &state - .tx - .send(to_string(&server_sum_update(&state)).unwrap()); - - // Broadcast users update - let _ = &state - .tx - .send(to_string(&generate_chat_update(&state)).unwrap()); - - // subscribe to broadcast channel + // Subscribe to global broadcast channel let mut rx = state.tx.subscribe(); - // handle broadcasting further awesome messages + // Submit new messages from this client to broadcast let mut send_task = tokio::spawn(async move { while let Ok(msg) = rx.recv().await { if sender.send(Message::Text(msg)).await.is_err() { @@ -128,14 +135,14 @@ pub async fn on_websocket_connection(stream: WebSocket, state: Arc, ad } }); - // handle new incoming messages + // Send messages from broadcast down to this client let mut recv_task = tokio::spawn(async move { while let Some(Ok(message)) = receiver.next().await { message_handler(message, &state, addr).await } }); - // if either task completes then abort the other + // If either task completes then abort the other tokio::select! { _ = (&mut send_task) => recv_task.abort(), _ = (&mut recv_task) => send_task.abort(), diff --git a/server/src/api/message_handler.rs b/server/src/api/message_handler.rs index 58b1dc3..6dc4222 100644 --- a/server/src/api/message_handler.rs +++ b/server/src/api/message_handler.rs @@ -21,8 +21,8 @@ pub async fn message_handler(message: Message, state: &Arc, addr: Sock tracing::error!("Failed to convert Game object to JSON.") } state.games.lock().unwrap().push(new_game_object); - let _ = tx.send(to_string(&generate_games_update(state)).unwrap()); - let _ = tx.send(to_string(&server_sum_update(state)).unwrap()); + let _ = tx.send(games_update(state)); + let _ = tx.send(server_summary_update(state)); // let _update = tx.send(motd()); } else { let _res = tx.send(String::from("error creating game")); @@ -77,8 +77,8 @@ pub async fn message_handler(message: Message, state: &Arc, addr: Sock tracing::debug!("{}", msg.text); let _ = tx.send(to_string::(&msg).unwrap()); state.users.lock().unwrap().remove(&addr).unwrap(); - let _ = tx.send(to_string(&server_sum_update(state)).unwrap()); - let _ = tx.send(to_string(&generate_chat_update(state)).unwrap()); + let _ = tx.send(server_summary_update(state)); + let _ = tx.send(chat_update(state)); } Message::Pong(ping) => {