use crate::AppState; use axum::{ extract::{ ws::{Message, WebSocket}, ConnectInfo, State, WebSocketUpgrade, }, response::IntoResponse, }; use futures::{SinkExt, StreamExt}; use std::{net::SocketAddr, sync::Arc}; pub mod message_handler; use crate::message_handler::*; fn greeting(state: &Arc) -> String { format!( "{:#?} Card packs loaded\n\ {:#?} Current active games", state.all_cards.lock().unwrap().len(), state.games.lock().unwrap().len(), ) } pub struct User { name: String, addr: SocketAddr, } pub async fn websocket(stream: WebSocket, state: Arc, who: User) { // By splitting, we can send and receive at the same time. let (mut sender, mut receiver) = stream.split(); // sup let _greeting = sender.send(Message::Text(greeting(&state))).await; // subscribe to broadcast channel let mut rx = state.tx.subscribe(); // ANNOUNCE THY PRESENCE let msg = format!("{} joined.", who.name); tracing::debug!("{msg}"); let _ = state.tx.send(msg); // handle broadcasting further awesome messages let mut send_task = tokio::spawn(async move { while let Ok(msg) = rx.recv().await { if sender.send(Message::Text(msg)).await.is_err() { break; } } }); // handle new incoming messages let mut recv_task = tokio::spawn(async move { while let Some(Ok(message)) = receiver.next().await { message_handler(message, &state, &who).await } }); // if either task completes then abort the other tokio::select! { _ = (&mut send_task) => recv_task.abort(), _ = (&mut recv_task) => send_task.abort(), }; } pub async fn websocket_handler( ws: WebSocketUpgrade, // user_agent: Option>, ConnectInfo(addr): ConnectInfo, State(state): State>, ) -> impl IntoResponse { tracing::debug!("New connection from {addr}"); ws.on_upgrade(move |socket| { websocket( socket, state, User { name: "Anonymous".to_string(), addr, }, ) }) }