use crate::AppState; use axum::{ extract::{ ws::{Message, WebSocket}, ConnectInfo, State, WebSocketUpgrade, }, response::IntoResponse, }; use futures::{SinkExt, StreamExt}; use lib::models::*; use rand::seq::SliceRandom; use serde_json::to_string; use std::{net::SocketAddr, sync::Arc}; pub mod message_handler; use crate::message_handler::*; fn motd() -> ChatMessage { ChatMessage { text: "Greetings from the game server!".to_string(), } } fn server_sum_update(state: &Arc) -> ServerStateSummary { ServerStateSummary { online_users: state.users.lock().unwrap().len(), active_games: state.games.lock().unwrap().len(), } } pub async fn on_websocket_connection(stream: WebSocket, state: Arc, addr: SocketAddr) { // Add User to users let new_user = User { name: format!( "{} {}", state.first_names.choose(&mut rand::thread_rng()).unwrap(), state.last_names.choose(&mut rand::thread_rng()).unwrap(), ), }; // By splitting, we can send and receive at the same time. let (mut sender, mut receiver) = stream.split(); let _ = &sender .send(Message::Text( to_string::(&UserUpdate { username: new_user.name.to_string(), }) .unwrap(), )) .await; // Add user to users using `addr` until something longer lived exists state.users.lock().unwrap().insert(addr, new_user); // hydrate user let _ = &sender .send(Message::Text(to_string::(&motd()).unwrap())) .await; let _ = &sender .send(Message::Text( to_string(&server_sum_update(&state)).unwrap(), )) .await; // ANNOUNCE THY PRESENCE let msg = ChatMessage { text: format!( "{} joined.", state.users.lock().unwrap().get(&addr).unwrap().name ), }; tracing::debug!("{}", msg.text); let _ = &state.tx.send(to_string::(&msg).unwrap()); // Broadcast server state summary update let _ = &state .tx .send(to_string(&server_sum_update(&state)).unwrap()); // subscribe to broadcast channel let mut rx = state.tx.subscribe(); // handle broadcasting further awesome messages let mut send_task = tokio::spawn(async move { while let Ok(msg) = rx.recv().await { if sender.send(Message::Text(msg)).await.is_err() { break; } } }); // handle new incoming messages let mut recv_task = tokio::spawn(async move { while let Some(Ok(message)) = receiver.next().await { message_handler(message, &state, addr).await } }); // if either task completes then abort the other tokio::select! { _ = (&mut send_task) => recv_task.abort(), _ = (&mut recv_task) => send_task.abort(), }; } pub async fn websocket_connection_handler( ws: WebSocketUpgrade, // user_agent: Option>, ConnectInfo(addr): ConnectInfo, State(state): State>, ) -> impl IntoResponse { tracing::debug!("New connection from {}", &addr); ws.on_upgrade(move |socket| on_websocket_connection(socket, state, addr)) }