use crate::AppState; use anyhow::Result; use axum::{ extract::{ ws::{Message, WebSocket}, ConnectInfo, State, WebSocketUpgrade, }, response::IntoResponse, }; use futures::stream::SplitSink; use futures::{SinkExt, StreamExt}; use lib::models::*; use rand::seq::SliceRandom; use serde_json::to_string; use std::{net::SocketAddr, sync::Arc}; pub mod message_handler; use crate::message_handler::*; /// Generate message-of-the-day server greeting fn motd() -> String { to_string::(&ChatMessage { text: "Greetings from the game server!".to_string(), }) .unwrap() } /// Generate server summary update - mostly debug stuff fn server_summary_update(state: &Arc) -> String { to_string::(&ServerStateSummary { online_users: state.users.lock().unwrap().len(), active_games: state.games.lock().unwrap().len(), }) .unwrap() } /// Generate chatroom metadata update fn chat_update(state: &Arc) -> String { let mut names = vec![]; for user in state.users.lock().unwrap().iter() { names.push(user.1.name.clone()); } to_string::(&ChatUpdate { room: "Lobby".to_string(), users: names, }) .unwrap() } /// Generate games list update fn games_update(state: &Arc) -> String { let mut names = vec![]; for game in state.games.lock().unwrap().iter() { names.push(game.name.clone()); } to_string::(&GamesUpdate { games: names }).unwrap() } /// Generate chatroom join announcement fn announce_join(state: &Arc, addr: &SocketAddr) -> String { let msg = format!( "{} joined.", state.users.lock().unwrap().get(addr).unwrap().name ); tracing::debug!("{}", &msg); to_string::(&ChatMessage { text: msg }).unwrap() } /// Create a new user object from incoming data fn generate_new_user(state: &Arc) -> User { User { name: format!( "{} {}", state.first_names.choose(&mut rand::thread_rng()).unwrap(), state.last_names.choose(&mut rand::thread_rng()).unwrap(), ), } } /// Generate message to notify client of user changes fn client_self_user_update(new_user: &User) -> String { to_string::(&UserUpdate { username: new_user.name.clone(), }) .unwrap() } /// Create, Register, and Hydrate new user async fn handle_new_user( sender: &mut SplitSink, state: &Arc, addr: &SocketAddr, ) -> Result<()> { // Create let new_user = generate_new_user(state); // Notify client of new username sender .send(Message::Text(client_self_user_update(&new_user))) .await?; // Register using `addr` as key until something longer lived exists state.users.lock().unwrap().insert(*addr, new_user); // Hydrate client sender.send(Message::Text(chat_update(state))).await?; sender.send(Message::Text(motd())).await?; sender .send(Message::Text(server_summary_update(state))) .await?; sender.send(Message::Text(games_update(state))).await?; state.tx.send(announce_join(state, addr))?; state.tx.send(server_summary_update(state))?; state.tx.send(chat_update(state))?; Ok(()) } /// This runs right after a WebSocket connection is established pub async fn on_websocket_connection(stream: WebSocket, state: Arc, addr: SocketAddr) { // Split channels to send and receive asynchronously. let (mut sender, mut receiver) = stream.split(); handle_new_user(&mut sender, &state, &addr) .await .expect("Error creating new user!"); // Subscribe to receive from global broadcast channel let mut rx = state.tx.subscribe(); // Submit new messages from this client to broadcast let mut send_task = tokio::spawn(async move { while let Ok(msg) = rx.recv().await { if sender.send(Message::Text(msg)).await.is_err() { break; } } }); // Pass messages from broadcast down to this client let mut recv_task = tokio::spawn(async move { while let Some(Ok(message)) = receiver.next().await { message_handler(&state, addr, message) .await .expect("Message Handler exploded!") } }); // If either task completes then abort the other tokio::select! { _ = (&mut send_task) => recv_task.abort(), _ = (&mut recv_task) => send_task.abort(), }; } /// Establish the WebSocket connection pub async fn websocket_connection_handler( ws: WebSocketUpgrade, // user_agent: Option>, ConnectInfo(addr): ConnectInfo, State(state): State>, ) -> impl IntoResponse { tracing::debug!("New connection from {}", &addr); ws.on_upgrade(move |socket| on_websocket_connection(socket, state, addr)) }