diff --git a/src/api.rs b/src/api.rs index 2fd8cd9..d68356d 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,5 +1,6 @@ -use crate::AppState; use crate::gamemaster::*; +use crate::AppState; +use axum::extract::ConnectInfo; use axum::{ extract::{ ws::{Message, WebSocket, WebSocketUpgrade}, @@ -9,7 +10,7 @@ use axum::{ }; use futures::{sink::SinkExt, stream::StreamExt}; use serde::Deserialize; -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc}; /// New game request structure #[derive(Debug, Deserialize)] @@ -34,66 +35,100 @@ pub struct GameJoinRequest { pub async fn websocket_handler( ws: WebSocketUpgrade, + // user_agent: Option>, + ConnectInfo(addr): ConnectInfo, State(state): State>, ) -> impl IntoResponse { - ws.on_upgrade(|socket| websocket(socket, state)) + tracing::debug!("New connection from {addr}"); + ws.on_upgrade(move |socket| websocket(socket, state, addr)) } -pub async fn websocket(stream: WebSocket, state: Arc) { +fn greeting(state: &Arc) -> String { + format!( + "{:#?} Card packs loaded\n\ + {:#?} Current active games", + state.all_cards.lock().unwrap().len(), + state.games.lock().unwrap().len(), + ) +} +pub async fn websocket(stream: WebSocket, state: Arc, who: SocketAddr) { // By splitting, we can send and receive at the same time. let (mut sender, mut receiver) = stream.split(); - let _greeting = sender - .send(Message::Text(format!( - "Greetings! \n\ - {:#?} Card packs loaded\n\ - {:#?} Current active games", - state.all_cards.lock().unwrap().len(), - state.games.lock().unwrap().len(), - ))) - .await; + let _greeting = sender.send(Message::Text(greeting(&state))).await; - // Loop until a text message is found. - while let Some(Ok(message)) = receiver.next().await { - match message { - Message::Text(text) => { - tracing::debug!("Text: {}", text); + // subscribe to channel + let mut rx = state.tx.subscribe(); - if let Ok(new_game) = serde_json::from_str::(&text) { - tracing::debug!("{:#?}", &new_game); - // create game - if let Ok(new_game_object) = CAHGame::new(new_game) { - state.games.lock().unwrap().push(new_game_object); - } else { - let _res = sender - .send(Message::Text(format!("error creating game"))) - .await; - } - } else { - // just echo - let _res = sender.send(Message::Text(text)).await; - } - } - Message::Binary(data) => { - tracing::debug!("Binary: {:?}", data) - } - Message::Close(c) => { - if let Some(cf) = c { - tracing::debug!( - "Close received with code: {} and reason: {}", - cf.code, - cf.reason - ) - } else { - tracing::debug!("close received without close frame") - } - } - Message::Pong(ping) => { - tracing::debug!("Pong received with: {:?}", ping); - } - Message::Ping(pong) => { - tracing::debug!("Pong received with: {:?}", pong); + // broadcast join + let msg = format!("{who} is here bitches"); + tracing::debug!("{msg}"); + let _ = state.tx.send(msg); + + let mut send_task = tokio::spawn(async move { + while let Ok(msg) = rx.recv().await { + if sender.send(Message::Text(msg)).await.is_err() { + break; } } - } + }); + + // clone things for receiving task + let tx = state.tx.clone(); + let name = who.clone(); + + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(message)) = receiver.next().await { + match message { + Message::Text(text) => { + tracing::debug!("{who}: {}", text); + + if let Ok(new_game) = serde_json::from_str::(&text) { + tracing::debug!("{:#?}", &new_game); + // create game + if let Ok(new_game_object) = CAHGame::new(new_game) { + state.games.lock().unwrap().push(new_game_object); + let _update = tx.send(greeting(&state)); + } else { + let _res = tx.send(format!("error creating game")); + } + } else { + // just echo + let msg = format!{"{who}: {text}"}; + tracing::debug!("{msg}"); + let _res = tx.send(msg); + } + } + Message::Binary(data) => { + tracing::debug!("Binary: {:?}", data) + } + Message::Close(c) => { + if let Some(cf) = c { + tracing::debug!( + "Close received from {who} with code: {} and reason: {}", + cf.code, + cf.reason + ) + } else { + tracing::debug!("close received without close frame") + } + let msg = format!("{name} left."); + tracing::debug!("{msg}"); + let _ = tx.send(msg); + } + Message::Pong(ping) => { + tracing::debug!("Pong received with: {:?}", ping); + } + Message::Ping(pong) => { + tracing::debug!("Pong received with: {:?}", pong); + } + } + } + }); + + // if either task completes then abort the other + tokio::select! { + _ = (&mut send_task) => recv_task.abort(), + _ = (&mut recv_task) => send_task.abort(), + }; } diff --git a/src/main.rs b/src/main.rs index 82a0c6a..e104e61 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ -use axum::{response::Html, routing::get, Router}; +use axum::{response::Html, routing::get, Router, ServiceExt}; use std::{ collections::HashSet, + net::SocketAddr, sync::{Arc, Mutex}, }; use std::{error::Error, fs, result::Result}; @@ -120,7 +121,11 @@ async fn main() -> Result<(), Box> { let listener = tokio::net::TcpListener::bind("0.0.0.0:3030").await?; tracing::debug!("listening on {}", listener.local_addr()?); - axum::serve(listener, app).await?; + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .await?; Ok(()) } diff --git a/test_client.html b/test_client.html index c0c0b57..2068b0a 100644 --- a/test_client.html +++ b/test_client.html @@ -37,7 +37,7 @@
- +

@@ -61,7 +61,6 @@