use crate::user_handler::*; use crate::AppState; use crate::Game; use crate::NewGameManifest; use crate::NewUser; use anyhow::Result; use axum::extract::ws::CloseFrame; use axum::{ extract::{ ws::{Message, WebSocket}, ConnectInfo, State, WebSocketUpgrade, }, response::IntoResponse, }; use futures::{SinkExt, StreamExt}; use lib::*; use serde_json::{from_str, to_string}; use std::{ net::SocketAddr, sync::{Arc, RwLock}, }; use tokio::sync::{broadcast::Sender, mpsc}; /// Establish the WebSocket connection pub async fn websocket_connection_handler( ws: WebSocketUpgrade, // user_agent: Option>, ConnectInfo(addr): ConnectInfo, State(state): State>, ) -> impl IntoResponse { tracing::debug!("New connection from {}", &addr); ws.on_upgrade(move |socket| websocket_on_connection(socket, state, addr)) } /// This runs right after a WebSocket connection is established pub async fn websocket_on_connection(stream: WebSocket, state: Arc, addr: SocketAddr) { // Split channels to send and receive asynchronously. let (mut sender, mut receiver) = stream.split(); // Create channel for direct messages let (dm_tx, mut dm_rx) = mpsc::channel(30); let _ = state .users_tx .send(NewUser { sender: dm_tx, addr, }) .await; // Subscribe to receive from global broadcast channel let mut rx = state.broadcast_channel.subscribe(); // Send messages to this client let mut send_task = tokio::spawn(async move { let mut broadcast = None; let mut dm = None; loop { tokio::select! { b = rx.recv() => broadcast = Some(b.unwrap()), d = dm_rx.recv() => dm = d, }; if let Some(msg) = &dm { if sender.send(Message::Text(msg.to_string())).await.is_err() { break; } else { dm = Option::None; } } else if let Some(msg) = &broadcast { if sender.send(Message::Text(msg.to_string())).await.is_err() { } else { broadcast = Option::None; } } } }); // Receive messages from this client let mut recv_task = tokio::spawn(async move { while let Some(Ok(message)) = receiver.next().await { websocket_message_handler(state.clone(), addr, message) .await .expect("Message Handler exploded!") } }); // If either task completes then abort the other tokio::select! { _ = (&mut send_task) => recv_task.abort(), _ = (&mut recv_task) => send_task.abort(), }; } /// Handle incoming messages over the WebSocket pub async fn websocket_message_handler( state: Arc, addr: SocketAddr, message: Message, ) -> Result<()> { let tx = &state.broadcast_channel; match message { Message::Text(text) => match text { _new_game if let Ok(_new_game) = from_str::(&text) => { tracing::debug!("New game request received."); game_handle_new_game(_new_game, &state, tx, addr)?; } _chat_message if let Ok(_chat_message) = from_str::(&text) => { websocket_handle_chat_message(_chat_message, &state, tx, addr)?; } _user_log_in if let Ok(_user_log_in) = from_str::(&text) => { websocket_handle_user_log_in(_user_log_in, &state, tx, addr)?; } _ => { tracing::debug!("Unhandled text message: {}", &text); } }, Message::Binary(data) => { tracing::debug!("Binary: {:?}", data) } Message::Close(close_frame) => { websocket_handle_close(close_frame, &state, tx, addr)?; } Message::Pong(ping) => { tracing::debug!("Pong received with: {:?}", ping); } Message::Ping(pong) => { tracing::debug!("Pong received with: {:?}", pong); } } Ok(()) } /// This runs when a NewGameRequest is received fn game_handle_new_game( new_game: NewGameRequest, state: &Arc, tx: &Sender, addr: SocketAddr, ) -> Result<()> { let manifest = NewGameManifest { name: new_game.name, host: state .online_users .read() .unwrap() .get(&addr) .unwrap() .clone(), }; tracing::debug!("Game Packs {:?}", new_game.packs); // create game if let Ok(new_game_object) = Game::new(manifest) { state .games .write() .unwrap() .insert(new_game_object.name.clone(), RwLock::new(new_game_object)); tx.send(meta_games_browser_update(state))?; tx.send(meta_server_summary_update(state))?; } Ok(()) } /// This runs when a ChatMessage is received fn websocket_handle_chat_message( chat_message: ChatMessage, state: &Arc, tx: &Sender, addr: SocketAddr, ) -> Result<()> { let msg = format! {"{0}: {1}", state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text}; tracing::debug!("{msg}"); tx.send(to_string::(&ChatMessage { text: msg })?)?; Ok(()) } /// This runs when a UserLogIn is received fn websocket_handle_user_log_in( user_log_in: UserLogIn, state: &Arc, tx: &Sender, addr: SocketAddr, ) -> Result<()> { let old_name = state .online_users .read() .unwrap() .get(&addr) .unwrap() .read() .unwrap() .name .clone(); let new_name = user_log_in.username.clone(); if state.offline_users.read().unwrap().contains_key(&new_name) { state .online_users .write() .unwrap() .insert( addr, state .offline_users .write() .unwrap() .remove(&new_name) .unwrap(), ) .unwrap(); let msg = format! { "{0} changed name to {1}. Welcome back!", old_name, new_name }; tracing::debug!("{msg}"); } else if state.reserved_names.read().unwrap().contains(&new_name) { tracing::debug!("name is taken"); } else { state .online_users .write() .unwrap() .get_mut(&addr) .unwrap() .write() .unwrap() .change_name(user_log_in.username); let msg = format! { "{0} changed name to {1}.", old_name, new_name }; // Reserve name state .reserved_names .write() .unwrap() .insert(new_name.clone()); tracing::debug!("{msg}"); tx.send(to_string::(&ChatMessage { text: msg })?)?; tracing::debug!("Name {} reserved.", &new_name); } tracing::debug!( "Online Users: {} Offline Users: {}", state.online_users.read().unwrap().len(), state.offline_users.read().unwrap().len() ); tx.send(meta_games_browser_update(state))?; tx.send(meta_chat_update(state))?; // send the user their new name Ok(()) } /// This runs when a connection closes fn websocket_handle_close( close_frame: Option, state: &Arc, tx: &Sender, addr: SocketAddr, ) -> Result<()> { if let Some(cf) = close_frame { tracing::debug!( "Close received from {0} with code: {1} and reason: {2}", state .online_users .read() .unwrap() .get(&addr) .unwrap() .read() .unwrap() .name, cf.code, cf.reason ) } else { tracing::debug!("close received without close frame") } let msg = ChatMessage { text: format!( "{0} left.", state .online_users .read() .unwrap() .get(&addr) .unwrap() .read() .unwrap() .name ), }; tracing::debug!("{}", msg.text); tx.send(to_string::(&msg)?)?; let name = state .online_users .read() .unwrap() .get(&addr) .unwrap() .read() .unwrap() .name .clone(); state.offline_users.write().unwrap().insert( name.clone(), state.online_users.write().unwrap().remove(&addr).unwrap(), ); tx.send(meta_server_summary_update(state))?; tx.send(meta_chat_update(state))?; Ok(()) }