diff --git a/server/src/lib.rs b/server/src/lib.rs index fa5e268..a2acd4f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,6 +1,8 @@ #![feature(if_let_guard)] +use crate::mpsc::Sender; use anyhow::{Context, Result}; +use axum::extract::ws::Message; use lib::*; use rand::prelude::IteratorRandom; use rand::prelude::SliceRandom; @@ -13,7 +15,7 @@ use std::{ net::SocketAddr, sync::{Arc, RwLock}, }; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; pub mod websocket; /// User @@ -351,9 +353,16 @@ pub fn load_names(path: &str) -> Result> { Ok(buf) } +#[derive(Debug)] +pub struct NewUser { + pub sender: Sender, + pub addr: SocketAddr, +} + // Our shared state pub struct AppState { pub tx: broadcast::Sender, + pub users_tx: mpsc::Sender, pub first_names: Vec, pub last_names: Vec, pub reserved_names: RwLock>, diff --git a/server/src/main.rs b/server/src/main.rs index 2e17cbd..8b380e6 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,5 +1,8 @@ +use crate::meta::*; +use crate::user::*; use crate::websocket::*; use anyhow::{Context, Result}; +use axum::extract::ws::Message; use axum::{routing::get, Router}; use server::*; use std::{ @@ -7,7 +10,7 @@ use std::{ net::SocketAddr, sync::{Arc, RwLock}, }; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use tower_http::services::ServeDir; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -24,6 +27,7 @@ async fn main() -> Result<()> { // Set up state let (tx, _rx) = broadcast::channel(100); + let (users_tx, mut users_rx) = mpsc::channel(100); let first_names = load_names("data/first.txt")?; let last_names = load_names("data/last.txt")?; let reserved_names = RwLock::new(HashSet::::new()); @@ -34,6 +38,7 @@ async fn main() -> Result<()> { let app_state = Arc::new(AppState { tx, + users_tx, first_names, last_names, reserved_names, @@ -44,6 +49,75 @@ async fn main() -> Result<()> { games, }); + let cloned_state = app_state.clone(); + + let _user_handler = tokio::spawn(async move { + while let Some(message) = users_rx.recv().await { + // + // Create, Register, and Hydrate new user + // + // Create + let new_user = Arc::new(RwLock::new(User::new(&cloned_state))); + + // Notify client of new username + message + .sender + .send(Message::Text(user_client_self_update(&new_user))) + .await + .unwrap(); + + // Register using `addr` as key until something longer lived exists + cloned_state + .online_users + .write() + .unwrap() + .insert(message.addr, new_user.clone()); + + // Hydrate client + // this should probably be combined and sent as one + message + .sender + .send(Message::Text(meta_chat_update(&cloned_state))) + .await + .unwrap(); + message + .sender + .send(Message::Text(meta_motd())) + .await + .unwrap(); + message + .sender + .send(Message::Text(meta_server_summary_update(&cloned_state))) + .await + .unwrap(); + message + .sender + .send(Message::Text(meta_games_browser_update(&cloned_state))) + .await + .unwrap(); + message + .sender + .send(Message::Text(meta_new_game_card_packs(&cloned_state))) + .await + .unwrap(); + + // Broadcast new user's existence + // this should probably be combined and sent as one + let _ = &cloned_state + .tx + .send(meta_announce_user_join(&cloned_state, &message.addr)) + .unwrap(); + let _ = &cloned_state + .tx + .send(meta_server_summary_update(&cloned_state)) + .unwrap(); + let _ = &cloned_state + .tx + .send(meta_chat_update(&cloned_state)) + .unwrap(); + } + }); + // Router let app = Router::new() .route("/websocket", get(websocket_connection_handler)) diff --git a/server/src/websocket.rs b/server/src/websocket.rs index 9876ff5..b549df7 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -1,8 +1,8 @@ use crate::websocket::meta::*; -use crate::websocket::user::*; use crate::AppState; use crate::Game; use crate::NewGameManifest; +use crate::NewUser; use anyhow::Result; use axum::extract::ws::CloseFrame; use axum::{ @@ -19,7 +19,7 @@ use std::{ net::SocketAddr, sync::{Arc, RwLock}, }; -use tokio::sync::broadcast::Sender; +use tokio::sync::{broadcast::Sender, mpsc}; pub mod meta; pub mod user; @@ -39,19 +39,31 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc, ad // Split channels to send and receive asynchronously. let (mut sender, mut receiver) = stream.split(); - // Set up new user - user_handle_new(&mut sender, &state, &addr) - .await - .expect("Error creating new user!"); + let (dm_tx, mut dm_rx) = mpsc::channel(30); + + let _ = state + .users_tx + .send(NewUser { + sender: dm_tx, + addr, + }) + .await; // Subscribe to receive from global broadcast channel let mut rx = state.tx.subscribe(); // Send messages to this client let mut send_task = tokio::spawn(async move { - while let Ok(msg) = rx.recv().await { - if sender.send(Message::Text(msg)).await.is_err() { - break; + loop { + while let Ok(msg) = rx.recv().await { + if sender.send(Message::Text(msg)).await.is_err() { + break; + } + } + while let Some(message) = dm_rx.recv().await { + if sender.send(message).await.is_err() { + break; + } } } }); diff --git a/server/src/websocket/user.rs b/server/src/websocket/user.rs index de7509f..17b57e2 100644 --- a/server/src/websocket/user.rs +++ b/server/src/websocket/user.rs @@ -1,13 +1,6 @@ -use crate::websocket::meta::*; -use crate::AppState; use crate::User; -use anyhow::Result; -use axum::extract::ws::{Message, WebSocket}; -use futures::stream::SplitSink; -use futures::SinkExt; use lib::*; use serde_json::to_string; -use std::net::SocketAddr; use std::sync::Arc; use std::sync::RwLock; @@ -18,43 +11,3 @@ pub fn user_client_self_update(new_user: &Arc>) -> String { }) .unwrap() } - -/// Create, Register, and Hydrate new user -pub async fn user_handle_new( - sender: &mut SplitSink, - state: &Arc, - addr: &SocketAddr, -) -> Result<()> { - // Create - let new_user = Arc::new(RwLock::new(User::new(state))); - - // Notify client of new username - sender - .send(Message::Text(user_client_self_update(&new_user))) - .await?; - - // Register using `addr` as key until something longer lived exists - state.online_users.write().unwrap().insert(*addr, new_user); - - // Hydrate client - // this should probably be combined and sent as one - sender.send(Message::Text(meta_chat_update(state))).await?; - sender.send(Message::Text(meta_motd())).await?; - sender - .send(Message::Text(meta_server_summary_update(state))) - .await?; - sender - .send(Message::Text(meta_games_browser_update(state))) - .await?; - sender - .send(Message::Text(meta_new_game_card_packs(state))) - .await?; - - // Broadcast new user's existence - // this should probably be combined and sent as one - state.tx.send(meta_announce_user_join(state, addr))?; - state.tx.send(meta_server_summary_update(state))?; - state.tx.send(meta_chat_update(state))?; - - Ok(()) -}