let user handler own user data

This commit is contained in:
Adam 2024-12-13 04:38:14 -05:00
parent 3b2862b675
commit e1c0cf185c
6 changed files with 403 additions and 304 deletions

View file

@ -5,18 +5,22 @@ use crate::DmUserMethod::*;
use crate::GameHandlerMessage::*; use crate::GameHandlerMessage::*;
use crate::OutgoingMessageHandlerMessage::*; use crate::OutgoingMessageHandlerMessage::*;
use crate::SendUserMessage::*; use crate::SendUserMessage::*;
use crate::UserHandlerMessage::*; use crate::User;
use crate::UserHandlerMessage;
use axum::extract::ws::Message; use axum::extract::ws::Message;
use lib::*; use lib::*;
use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
/// For interacting with the game handler /// For interacting with the game handler
pub enum GameHandlerMessage { pub enum GameHandlerMessage {
NewGame(NewGameRequest, SocketAddr), NewGame(NewGameRequest, Arc<RwLock<User>>),
JoinGame(String, SocketAddr), JoinGame(String, Arc<RwLock<User>>),
MoveRequest(PlayerMoveRequest, SocketAddr), MoveRequest(PlayerMoveRequest, Arc<RwLock<User>>),
JudgeDecision(JudgeDecisionRequest, SocketAddr), JudgeDecision(JudgeDecisionRequest, String),
DeleteGame(GameDeleteRequest), DeleteGame(GameDeleteRequest),
SendGameStateUpdate(Vec<String>), SendGameStateUpdate(Vec<String>),
SendGameMetaUpdate(Vec<String>), SendGameMetaUpdate(Vec<String>),
@ -54,10 +58,14 @@ impl GameHandler {
/// Handles incoming messages /// Handles incoming messages
pub async fn handle(&mut self, message: GameHandlerMessage) { pub async fn handle(&mut self, message: GameHandlerMessage) {
match message { match message {
NewGame(request, addr) => self.create_new_game(request, addr).await, NewGame(request, host) => self.create_new_game(request, host).await,
JoinGame(request, addr) => self.join_game(request, addr).await, JoinGame(request, user) => self.join_game(request, user).await,
MoveRequest(request, addr) => self.handle_player_move(request, addr).await, MoveRequest(move_request, player_user) => {
JudgeDecision(request, addr) => self.handle_judging(request, addr).await, self.handle_player_move(move_request, player_user).await
}
JudgeDecision(request, player_user_id) => {
self.handle_judging(request, player_user_id).await
}
DeleteGame(request) => self.delete_game(request).await, DeleteGame(request) => self.delete_game(request).await,
SendGameStateUpdate(game_ids) => self.send_game_state_update_all(game_ids), SendGameStateUpdate(game_ids) => self.send_game_state_update_all(game_ids),
SendGameMetaUpdate(game_ids) => self.send_game_meta_update(game_ids), SendGameMetaUpdate(game_ids) => self.send_game_meta_update(game_ids),
@ -78,10 +86,8 @@ impl GameHandler {
} }
/// Process judging /// Process judging
async fn handle_judging(&mut self, request: JudgeDecisionRequest, addr: SocketAddr) { async fn handle_judging(&mut self, request: JudgeDecisionRequest, player_user_id: String) {
if let Some(this_game) = self.games.get_mut(&request.game_id) { if let Some(this_game) = self.games.get_mut(&request.game_id) {
if let Some(player_user) = self.state.online_users.read().unwrap().get(&addr) {
let player_user_id = player_user.read().unwrap().uuid.to_string();
let game_id = request.game_id.to_string(); let game_id = request.game_id.to_string();
// Send to game // Send to game
@ -89,9 +95,6 @@ impl GameHandler {
self.send_game_state_update_all(vec![game_id.clone()]); self.send_game_state_update_all(vec![game_id.clone()]);
self.send_game_meta_update(vec![game_id]); self.send_game_meta_update(vec![game_id]);
} else {
tracing::error!("Received judge request for nonexistent judge player!");
}
} else { } else {
tracing::error!("Received judge request for nonexistent game!"); tracing::error!("Received judge request for nonexistent game!");
} }
@ -102,19 +105,25 @@ impl GameHandler {
} }
/// Process player move request /// Process player move request
async fn handle_player_move(&mut self, request: PlayerMoveRequest, addr: SocketAddr) { async fn handle_player_move(
&mut self,
request: PlayerMoveRequest,
player_user: Arc<RwLock<User>>,
) {
if let Some(this_game) = self.games.get_mut(&request.game_id) { if let Some(this_game) = self.games.get_mut(&request.game_id) {
if let Some(player_user) = self.state.online_users.read().unwrap().get(&addr) {
let player_user_id = player_user.read().unwrap().uuid.to_string(); let player_user_id = player_user.read().unwrap().uuid.to_string();
// Do the stuff // Do the stuff
match this_game.player_move(&request, player_user_id) { match this_game.player_move(&request, player_user_id.clone()) {
Err(err) => { Err(err) => {
let message = ChatMessage { text: err }; let message = ChatMessage { text: err };
let users_tx = self.state.tx_user_handler.clone(); let users_tx = self.state.tx_user_handler.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = users_tx if let Err(e) = users_tx
.send(DmUser(SendChatMessage(message), Addr(addr))) .send(UserHandlerMessage::DmUser(
SendChatMessage(message),
Id(player_user_id),
))
.await .await
{ {
tracing::error!("Could not send message: {}", e); tracing::error!("Could not send message: {}", e);
@ -128,7 +137,10 @@ impl GameHandler {
let users_tx = self.state.tx_user_handler.clone(); let users_tx = self.state.tx_user_handler.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = users_tx if let Err(e) = users_tx
.send(DmUser(SendJudgeRound(judge_round), Id(czar_id))) .send(UserHandlerMessage::DmUser(
SendJudgeRound(judge_round),
Id(czar_id),
))
.await .await
{ {
tracing::error!("Could not send message: {}", e); tracing::error!("Could not send message: {}", e);
@ -136,9 +148,6 @@ impl GameHandler {
}); });
} }
}; };
} else {
tracing::error!("Nonexistent player tried to submit move for game!");
}
} else { } else {
tracing::error!("Player tried to submit move for nonexistent game!"); tracing::error!("Player tried to submit move for nonexistent game!");
} }
@ -146,9 +155,8 @@ impl GameHandler {
} }
/// Puts a user in a game /// Puts a user in a game
async fn join_game(&mut self, game_id: String, addr: SocketAddr) { async fn join_game(&mut self, game_id: String, this_user: Arc<RwLock<User>>) {
if self.games.contains_key(&game_id) { if self.games.contains_key(&game_id) {
if let Some(this_user) = self.state.online_users.read().unwrap().get(&addr) {
let this_user_id = this_user.read().unwrap().uuid.clone(); let this_user_id = this_user.read().unwrap().uuid.clone();
// Register game to user // Register game to user
@ -170,10 +178,6 @@ impl GameHandler {
// Send cards // Send cards
self.send_game_state_update_single(this_user_id, game_id.clone()) self.send_game_state_update_single(this_user_id, game_id.clone())
} else {
tracing::error!("Tried to add a nonexistent user to game!");
return;
}
} else { } else {
tracing::error!("User tried to join a nonexistent game!"); tracing::error!("User tried to join a nonexistent game!");
return; return;
@ -324,7 +328,7 @@ impl GameHandler {
} }
/// Creates a new game /// Creates a new game
async fn create_new_game(&mut self, new_game: NewGameRequest, addr: SocketAddr) { async fn create_new_game(&mut self, new_game: NewGameRequest, host: Arc<RwLock<User>>) {
if new_game.game_packs.is_empty() { if new_game.game_packs.is_empty() {
tracing::error!("New game cards are empty!"); tracing::error!("New game cards are empty!");
return; return;
@ -333,7 +337,6 @@ impl GameHandler {
return; return;
} }
if let Some(host) = self.state.online_users.read().unwrap().get(&addr) {
let new_game_name; let new_game_name;
let max_game_name_len = 32; let max_game_name_len = 32;
@ -382,9 +385,6 @@ impl GameHandler {
self.broadcast_game_browser_update(); self.broadcast_game_browser_update();
self.broadcast_game_count(); self.broadcast_game_count();
} else {
tracing::error!("Attempted to create game for nonexistent player!");
}
} }
/// Generate games list update /// Generate games list update

View file

@ -1,7 +1,6 @@
use crate::user_handler::*; use crate::user_handler::*;
use crate::AppState; use crate::AppState;
use crate::GameHandlerMessage::*; use crate::GameHandlerMessage::*;
use crate::OutgoingMessageHandlerMessage::*;
use crate::UserHandlerMessage::*; use crate::UserHandlerMessage::*;
use axum::extract::ws::{CloseFrame, Message}; use axum::extract::ws::{CloseFrame, Message};
use lib::*; use lib::*;
@ -26,54 +25,59 @@ impl IncomingMessageHandler {
Message::Text(text) => match text { Message::Text(text) => match text {
_chat_message if let Ok(chat_message) = from_str::<ChatMessage>(&text) => { _chat_message if let Ok(chat_message) = from_str::<ChatMessage>(&text) => {
// TODO: This should be delegated to user handler and an outgoing message and/or chat handler // TODO: This should be delegated to user handler and an outgoing message and/or chat handler
let msg; // let msg;
if chat_message.text.len() > 1024 { if chat_message.text.len() > 1024 {
msg = format! {"{0}: {1}", self.state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text[..1024].to_string()}; if let Err(e) = self
} else { .state
msg = format! {"{0}: {1}", self.state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text}; .tx_user_handler
} .send(PassChatMessage(chat_message.text[..1024].to_string(), addr))
// Broadcast incoming chat message
let msg = ChatMessage { text: msg };
let tx = self.state.tx_outgoing_message_handler.clone();
tokio::spawn(async move {
if let Err(e) = tx
.send(Broadcast(ServerToClientMessage::ChatMessage(msg)))
.await .await
{ {
tracing::error!("Error broadcasting Chat message: {}", e) tracing::error!("Error contacting outgoing message handler: {}", e);
}
} else {
if let Err(e) = self
.state
.tx_user_handler
.send(PassChatMessage(chat_message.text, addr))
.await
{
tracing::error!("Error contacting outgoing message handler: {}", e);
}
} }
});
} }
_user_log_in_request _user_log_in_request
if let Ok(user_log_in) = from_str::<UserLogInRequest>(&text) => if let Ok(user_log_in) = from_str::<UserLogInRequest>(&text) =>
{ {
let msg = UserLogIn(user_log_in, addr); if let Err(e) = self
let tx = self.state.tx_user_handler.clone(); .state
tokio::spawn(async move { .tx_user_handler
if let Err(e) = tx.send(msg).await { .send(UserLogIn(user_log_in, addr))
.await
{
tracing::error!("Error sending user login: {}", e) tracing::error!("Error sending user login: {}", e)
} }
});
} }
_new_game_request if let Ok(new_game) = from_str::<NewGameRequest>(&text) => { _new_game_request if let Ok(new_game) = from_str::<NewGameRequest>(&text) => {
let msg = NewGame(new_game, addr); if let Err(e) = self
let tx = self.state.tx_game_handler.clone(); .state
tokio::spawn(async move { .tx_user_handler
if let Err(e) = tx.send(msg).await { .send(GetUser(new_game, addr))
.await
{
tracing::error!("Error requesting new game: {}", e) tracing::error!("Error requesting new game: {}", e)
} }
});
} }
_join_game_request if let Ok(join_request) = from_str::<GameJoinRequest>(&text) => { _join_game_request if let Ok(join_request) = from_str::<GameJoinRequest>(&text) => {
let msg = JoinGame(join_request.id, addr); if let Err(e) = self
let tx = self.state.tx_game_handler.clone(); .state
tokio::spawn(async move { .tx_user_handler
if let Err(e) = tx.send(msg).await { .send(UserHandlerMessage::JoinGame(join_request.id, addr))
.await
{
tracing::error!("Error requesting game join: {}", e) tracing::error!("Error requesting game join: {}", e)
} }
});
} }
_player_move_request _player_move_request
@ -87,40 +91,43 @@ impl IncomingMessageHandler {
tracing::error!("Move request game_id is empty! Ignoring..."); tracing::error!("Move request game_id is empty! Ignoring...");
return; return;
} else { } else {
let msg = MoveRequest(move_request, addr); if let Err(e) = self
let tx = self.state.tx_game_handler.clone(); .state
tokio::spawn(async move { .tx_user_handler
if let Err(e) = tx.send(msg).await { .send(UserHandlerMessage::MoveRequest(move_request, addr))
.await
{
tracing::error!("Error sending move request: {}", e) tracing::error!("Error sending move request: {}", e)
} }
});
} }
} }
_judge_decision _judge_decision
if let Ok(judge_request) = from_str::<JudgeDecisionRequest>(&text) => if let Ok(judge_decision) = from_str::<JudgeDecisionRequest>(&text) =>
{ {
if !judge_request.winning_cards.is_empty() { if !judge_decision.winning_cards.is_empty() {
let msg = JudgeDecision(judge_request, addr); if let Err(e) = self
let tx = self.state.tx_game_handler.clone(); .state
tokio::spawn(async move { .tx_user_handler
if let Err(e) = tx.send(msg).await { .send(GetPlayerUserId(judge_decision, addr))
tracing::error!("Error sending Judge Decision: {}", e) .await
{
tracing::error!("Error contacting user handler: {}", e)
} }
});
} else { } else {
tracing::error!("Judge request received with empty cards"); tracing::error!("Judge request received with empty cards");
} }
} }
_delete_game if let Ok(delete_request) = from_str::<GameDeleteRequest>(&text) => { _delete_game if let Ok(delete_request) = from_str::<GameDeleteRequest>(&text) => {
let msg = DeleteGame(delete_request); if let Err(e) = self
let tx = self.state.tx_game_handler.clone(); .state
tokio::spawn(async move { .tx_game_handler
if let Err(e) = tx.send(msg).await { .send(DeleteGame(delete_request))
.await
{
tracing::error!("Error sending delete game: {}", e) tracing::error!("Error sending delete game: {}", e)
} }
});
} }
_ => tracing::error!( _ => tracing::error!(
@ -136,9 +143,7 @@ impl IncomingMessageHandler {
data data
), ),
Message::Close(close_frame) => { Message::Close(close_frame) => self.handle_close(close_frame, addr).await,
self.handle_close(close_frame, addr);
}
Message::Ping(ping) => { Message::Ping(ping) => {
tracing::info!("Pong received with: {:?}", ping); tracing::info!("Pong received with: {:?}", ping);
@ -151,11 +156,7 @@ impl IncomingMessageHandler {
} }
/// This runs when a connection closes /// This runs when a connection closes
fn handle_close(&self, close_frame: Option<CloseFrame>, addr: SocketAddr) { async fn handle_close(&self, close_frame: Option<CloseFrame<'static>>, addr: SocketAddr) {
let msg = UserHandlerMessage::Cleanup(addr);
let tx = self.state.tx_user_handler.clone();
tokio::spawn(async move { tx.send(msg).await });
// Process close frame // Process close frame
if let Some(cf) = close_frame { if let Some(cf) = close_frame {
tracing::info!( tracing::info!(
@ -166,5 +167,14 @@ impl IncomingMessageHandler {
} else { } else {
tracing::info!("close received without close frame") tracing::info!("close received without close frame")
} }
if let Err(e) = self
.state
.tx_user_handler
.send(UserHandlerMessage::Cleanup(addr))
.await
{
tracing::info!("Error contacting user handler: {}", e)
}
} }
} }

View file

@ -4,11 +4,7 @@ use crate::outgoing_message_handler::*;
use axum::extract::ws::Message; use axum::extract::ws::Message;
use lib::*; use lib::*;
use std::fmt::Debug; use std::fmt::Debug;
use std::{ use std::net::SocketAddr;
collections::HashMap,
net::SocketAddr,
sync::{Arc, RwLock},
};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use user_handler::*; use user_handler::*;
use uuid::Uuid; use uuid::Uuid;
@ -46,7 +42,6 @@ impl User {
/// Shared state /// Shared state
pub struct AppState { pub struct AppState {
pub online_users: RwLock<HashMap<SocketAddr, Arc<RwLock<User>>>>,
pub tx_broadcast: tokio::sync::broadcast::Sender<Message>, pub tx_broadcast: tokio::sync::broadcast::Sender<Message>,
pub tx_game_handler: mpsc::Sender<GameHandlerMessage>, pub tx_game_handler: mpsc::Sender<GameHandlerMessage>,
pub tx_incoming_message_handler: mpsc::Sender<(SocketAddr, Message)>, pub tx_incoming_message_handler: mpsc::Sender<(SocketAddr, Message)>,

View file

@ -6,12 +6,7 @@ use anyhow::{Context, Result};
use axum::{routing::get, Router}; use axum::{routing::get, Router};
use clap::{arg, command}; use clap::{arg, command};
use server::*; use server::*;
use std::{ use std::{net::SocketAddr, sync::Arc, time::Duration};
collections::HashMap,
net::SocketAddr,
sync::{Arc, RwLock},
time::Duration,
};
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
use tower::ServiceBuilder; use tower::ServiceBuilder;
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
@ -82,10 +77,8 @@ async fn main() -> Result<()> {
let (tx_incoming_message_handler, mut rx_incoming_message_handler) = mpsc::channel(32); let (tx_incoming_message_handler, mut rx_incoming_message_handler) = mpsc::channel(32);
let (tx_outgoing_message_handler, mut rx_outgoing_message_handler) = mpsc::channel(32); let (tx_outgoing_message_handler, mut rx_outgoing_message_handler) = mpsc::channel(32);
let (tx_user_handler, mut rx_user_handler) = mpsc::channel(32); let (tx_user_handler, mut rx_user_handler) = mpsc::channel(32);
let online_users = RwLock::new(HashMap::<SocketAddr, Arc<RwLock<User>>>::new());
let app_state = Arc::new(AppState { let app_state = Arc::new(AppState {
online_users,
tx_broadcast: tx_broadcast.clone(), tx_broadcast: tx_broadcast.clone(),
tx_game_handler, tx_game_handler,
tx_incoming_message_handler, tx_incoming_message_handler,

View file

@ -2,6 +2,7 @@ use crate::name_generator::*;
use crate::AppState; use crate::AppState;
use crate::DmUserMethod::*; use crate::DmUserMethod::*;
use crate::GameHandlerMessage; use crate::GameHandlerMessage;
use crate::OutgoingMessageHandlerMessage;
use crate::OutgoingMessageHandlerMessage::*; use crate::OutgoingMessageHandlerMessage::*;
use crate::SendUserMessage::*; use crate::SendUserMessage::*;
use crate::User; use crate::User;
@ -17,16 +18,6 @@ use tokio::sync::mpsc::Sender;
// TODO: clean up all this tx/msg mess // TODO: clean up all this tx/msg mess
/// Handles users
pub struct UserHandler {
/// Pointer to global state
state: Arc<AppState>,
users_by_id: HashMap<String, Arc<RwLock<User>>>,
name_generator: NameGenerator,
reserved_names: HashSet<String>,
offline_users: HashMap<String, Arc<RwLock<User>>>,
}
pub enum DmUserMethod { pub enum DmUserMethod {
Addr(SocketAddr), Addr(SocketAddr),
Id(String), Id(String),
@ -38,6 +29,11 @@ pub enum UserHandlerMessage {
UserLogIn(UserLogInRequest, SocketAddr), UserLogIn(UserLogInRequest, SocketAddr),
DmUser(SendUserMessage, DmUserMethod), DmUser(SendUserMessage, DmUserMethod),
Cleanup(SocketAddr), Cleanup(SocketAddr),
PassChatMessage(String, SocketAddr),
GetPlayerUserId(JudgeDecisionRequest, SocketAddr),
GetUser(NewGameRequest, SocketAddr),
MoveRequest(PlayerMoveRequest, SocketAddr),
JoinGame(String, SocketAddr),
} }
/// Types of messages that can be sent to a user as a DM /// Types of messages that can be sent to a user as a DM
@ -48,12 +44,25 @@ pub enum SendUserMessage {
SendJudgeRound(JudgeRound), SendJudgeRound(JudgeRound),
} }
/// Handles users
pub struct UserHandler {
/// Pointer to global state
state: Arc<AppState>,
users_by_id: HashMap<String, Arc<RwLock<User>>>,
name_generator: NameGenerator,
reserved_names: HashSet<String>,
offline_users: HashMap<String, Arc<RwLock<User>>>,
online_users: RwLock<HashMap<SocketAddr, Arc<RwLock<User>>>>,
}
impl UserHandler { impl UserHandler {
/// Returns new UserHandler /// Returns new UserHandler
pub fn new(state: Arc<AppState>) -> Self { pub fn new(state: Arc<AppState>) -> Self {
let offline_users = HashMap::<String, Arc<RwLock<User>>>::new(); let offline_users = HashMap::<String, Arc<RwLock<User>>>::new();
let online_users = RwLock::new(HashMap::<SocketAddr, Arc<RwLock<User>>>::new());
UserHandler { UserHandler {
offline_users, offline_users,
online_users,
state, state,
users_by_id: HashMap::<String, Arc<RwLock<User>>>::new(), users_by_id: HashMap::<String, Arc<RwLock<User>>>::new(),
reserved_names: HashSet::<String>::new(), reserved_names: HashSet::<String>::new(),
@ -68,6 +77,107 @@ impl UserHandler {
UserLogIn(request, addr) => self.login(request, addr).await, UserLogIn(request, addr) => self.login(request, addr).await,
DmUser(message, method) => self.dm_user(message, method).await, DmUser(message, method) => self.dm_user(message, method).await,
Cleanup(addr) => self.user_cleanup(addr).await, Cleanup(addr) => self.user_cleanup(addr).await,
PassChatMessage(msg, addr) => self.pass_chat_message(msg, addr).await,
GetPlayerUserId(judge_decision, addr) => {
self.get_player_user_id(judge_decision, addr).await
}
GetUser(new_game, addr) => self.get_user(new_game, addr).await,
MoveRequest(move_request, addr) => self.move_request(move_request, addr).await,
JoinGame(game_id, addr) => self.join_game(game_id, addr).await,
}
}
async fn join_game(&self, game_id: String, addr: SocketAddr) {
let user = self
.online_users
.read()
.unwrap()
.get(&addr)
.unwrap()
.clone();
if let Err(e) = self
.state
.tx_game_handler
.send(GameHandlerMessage::JoinGame(game_id, user))
.await
{
tracing::error!("Error: {}", e)
}
}
async fn move_request(&self, move_request: PlayerMoveRequest, addr: SocketAddr) {
let player_user = self
.online_users
.read()
.unwrap()
.get(&addr)
.unwrap()
.clone();
if let Err(e) = self
.state
.tx_game_handler
.send(GameHandlerMessage::MoveRequest(move_request, player_user))
.await
{
tracing::error!("Error: {}", e)
}
}
async fn get_user(&self, new_game: NewGameRequest, addr: SocketAddr) {
let host = self
.online_users
.read()
.unwrap()
.get(&addr)
.unwrap()
.clone();
if let Err(e) = self
.state
.tx_game_handler
.send(GameHandlerMessage::NewGame(new_game, host))
.await
{
tracing::error!("Error requesting new game: {}", e)
}
}
async fn get_player_user_id(&self, judge_decision: JudgeDecisionRequest, addr: SocketAddr) {
let player_user_id = self
.online_users
.read()
.unwrap()
.get(&addr)
.unwrap()
.read()
.unwrap()
.uuid
.clone();
if let Err(e) = self
.state
.tx_game_handler
.send(GameHandlerMessage::JudgeDecision(
judge_decision,
player_user_id,
))
.await
{
tracing::error!("Error contacting game handler: {}", e)
}
}
async fn pass_chat_message(&self, msg: String, addr: SocketAddr) {
let text = format! {"{0}: {1}", self.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, msg};
if let Err(e) = self
.state
.tx_outgoing_message_handler
.send(OutgoingMessageHandlerMessage::Broadcast(
ServerToClientMessage::ChatMessage(ChatMessage { text }),
))
.await
{
tracing::error!("Can't contact outgoing message handler: {}", e);
} }
} }
@ -88,7 +198,7 @@ impl UserHandler {
} }
} }
Addr(addr) => { Addr(addr) => {
if let Some(user) = self.state.online_users.read().unwrap().get(&addr) { if let Some(user) = self.online_users.read().unwrap().get(&addr) {
tx = user.read().unwrap().tx.clone(); tx = user.read().unwrap().tx.clone();
} else { } else {
tracing::error!("Attempted to send message to offline user!"); tracing::error!("Attempted to send message to offline user!");
@ -186,7 +296,7 @@ impl UserHandler {
.state .state
.tx_outgoing_message_handler .tx_outgoing_message_handler
.send(Broadcast(ServerToClientMessage::ChatMessage( .send(Broadcast(ServerToClientMessage::ChatMessage(
meta_announce_user_join(&self.state, &addr), self.meta_announce_user_join(&addr),
))) )))
.await .await
{ {
@ -226,7 +336,7 @@ impl UserHandler {
let old_name; let old_name;
if let Some(user) = self.state.online_users.read().unwrap().get(&addr) { if let Some(user) = self.online_users.read().unwrap().get(&addr) {
old_name = user.read().unwrap().name.clone(); old_name = user.read().unwrap().name.clone();
// User abandons current name by requesting a new one // User abandons current name by requesting a new one
@ -240,7 +350,7 @@ impl UserHandler {
if self.offline_users.contains_key(&new_name) { if self.offline_users.contains_key(&new_name) {
let buf; let buf;
// Copy over new tx // Copy over new tx
if let Some(online_user) = self.state.online_users.write().unwrap().remove(&addr) { if let Some(online_user) = self.online_users.write().unwrap().remove(&addr) {
if let Some(offline_user) = self.offline_users.remove(&new_name) { if let Some(offline_user) = self.offline_users.remove(&new_name) {
offline_user.write().unwrap().tx = online_user.write().unwrap().tx.clone(); offline_user.write().unwrap().tx = online_user.write().unwrap().tx.clone();
buf = offline_user; buf = offline_user;
@ -254,7 +364,7 @@ impl UserHandler {
} }
// Move offline user object to online // Move offline user object to online
self.state.online_users.write().unwrap().insert(addr, buf); self.online_users.write().unwrap().insert(addr, buf);
// Send welcome back messages // Send welcome back messages
let msg = format! { let msg = format! {
@ -296,7 +406,7 @@ impl UserHandler {
self.reserved_names.insert(new_name.clone()); self.reserved_names.insert(new_name.clone());
// Change user's name // Change user's name
if let Some(user) = self.state.online_users.write().unwrap().get_mut(&addr) { if let Some(user) = self.online_users.write().unwrap().get_mut(&addr) {
user.write().unwrap().change_name(new_name.clone()); user.write().unwrap().change_name(new_name.clone());
} else { } else {
tracing::error!("Error updating username: Can't find user!"); tracing::error!("Error updating username: Can't find user!");
@ -331,7 +441,7 @@ impl UserHandler {
) )
.await; .await;
// Update games this user is in // Update games this user is in
if let Some(user) = &self.state.online_users.read().unwrap().get(&addr) { if let Some(user) = &self.online_users.read().unwrap().get(&addr) {
let user_id = user.read().unwrap().uuid.to_string(); let user_id = user.read().unwrap().uuid.to_string();
let msg = GameHandlerMessage::SendGameUserUpdate(user_id); let msg = GameHandlerMessage::SendGameUserUpdate(user_id);
let tx = self.state.tx_game_handler.clone(); let tx = self.state.tx_game_handler.clone();
@ -348,14 +458,7 @@ impl UserHandler {
/// Broadcast updated user count /// Broadcast updated user count
fn broadcast_user_count(&self) { fn broadcast_user_count(&self) {
let tx = self.state.tx_outgoing_message_handler.clone(); let tx = self.state.tx_outgoing_message_handler.clone();
let online_users: u32 = self let online_users: u32 = self.online_users.read().unwrap().len().try_into().unwrap();
.state
.online_users
.read()
.unwrap()
.len()
.try_into()
.unwrap();
let msg = ServerToClientMessage::ServerOnlineUsers(ServerOnlineUsers { online_users }); let msg = ServerToClientMessage::ServerOnlineUsers(ServerOnlineUsers { online_users });
tokio::spawn(async move { tx.send(Broadcast(msg)).await }); tokio::spawn(async move { tx.send(Broadcast(msg)).await });
} }
@ -363,7 +466,6 @@ impl UserHandler {
/// Clean up after a user when they disconnect /// Clean up after a user when they disconnect
async fn user_cleanup(&mut self, addr: SocketAddr) { async fn user_cleanup(&mut self, addr: SocketAddr) {
let user_name = self let user_name = self
.state
.online_users .online_users
.read() .read()
.unwrap() .unwrap()
@ -398,19 +500,14 @@ impl UserHandler {
/// Set user status to online /// Set user status to online
fn set_user_online(&self, addr: SocketAddr, user: Arc<RwLock<User>>) { fn set_user_online(&self, addr: SocketAddr, user: Arc<RwLock<User>>) {
self.state.online_users.write().unwrap().insert(addr, user); self.online_users.write().unwrap().insert(addr, user);
} }
/// Set user status to offline /// Set user status to offline
fn set_user_offline(&mut self, user_name: String, addr: &SocketAddr) { fn set_user_offline(&mut self, user_name: String, addr: &SocketAddr) {
self.offline_users.insert( self.offline_users.insert(
user_name, user_name,
self.state self.online_users.write().unwrap().remove(addr).unwrap(),
.online_users
.write()
.unwrap()
.remove(addr)
.unwrap(),
); );
} }
@ -420,13 +517,39 @@ impl UserHandler {
.state .state
.tx_outgoing_message_handler .tx_outgoing_message_handler
.send(Broadcast(ServerToClientMessage::ChatUpdate( .send(Broadcast(ServerToClientMessage::ChatUpdate(
generate_chat_update(&self.state), self.generate_chat_update(),
))) )))
.await .await
{ {
tracing::error!("Error contacting outgoing message handler: {}", e); tracing::error!("Error contacting outgoing message handler: {}", e);
} }
} }
/// Generate chatroom join announcement
fn meta_announce_user_join(&self, addr: &SocketAddr) -> ChatMessage {
let msg = format!("{} joined.", {
if let Some(user) = self.online_users.read().unwrap().get(addr) {
user.read().unwrap().name.clone()
} else {
"Error!".to_string()
}
});
ChatMessage { text: msg }
}
/// Generate chatroom metadata update
fn generate_chat_update(&self) -> ChatUpdate {
// TODO: this may get expensive if there are many users
let mut names = vec![];
for user in self.online_users.read().unwrap().iter() {
names.push(user.1.read().unwrap().name.clone());
}
ChatUpdate {
room: "Lobby".to_string(),
users: names,
}
}
} }
/// Generate message to notify client of user changes /// Generate message to notify client of user changes
@ -436,34 +559,6 @@ pub fn user_client_self_update(new_user: &Arc<RwLock<User>>) -> UserUpdate {
} }
} }
/// Generate chatroom metadata update
pub fn generate_chat_update(state: &Arc<AppState>) -> ChatUpdate {
// TODO: this may get expensive if there are many users
let mut names = vec![];
for user in state.online_users.read().unwrap().iter() {
names.push(user.1.read().unwrap().name.clone());
}
ChatUpdate {
room: "Lobby".to_string(),
users: names,
}
}
/// Generate chatroom join announcement
pub fn meta_announce_user_join(state: &Arc<AppState>, addr: &SocketAddr) -> ChatMessage {
let msg = format!("{} joined.", {
if let Some(user) = state.online_users.read().unwrap().get(addr) {
user.read().unwrap().name.clone()
} else {
"Error!".to_string()
}
});
ChatMessage { text: msg }
}
/// Generate message-of-the-day server greeting /// Generate message-of-the-day server greeting
pub fn meta_motd() -> ChatMessage { pub fn meta_motd() -> ChatMessage {
ChatMessage { ChatMessage {

View file

@ -42,7 +42,9 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, ad
let mut rx = state.tx_broadcast.subscribe(); let mut rx = state.tx_broadcast.subscribe();
// Send messages to this client // Send messages to this client
let mut send_task = tokio::spawn(async move { let mut send_task = tokio::task::Builder::new()
.name("User send task")
.spawn(async move {
let mut broadcast = None; let mut broadcast = None;
let mut dm: Option<Message> = None; let mut dm: Option<Message> = None;
loop { loop {
@ -64,10 +66,13 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, ad
} }
} }
} }
}); })
.expect("Can't start user send task");
// Receive messages from this client // Receive messages from this client
let mut recv_task = tokio::spawn(async move { let mut recv_task = tokio::task::Builder::new()
.name("User recv task")
.spawn(async move {
while let Some(Ok(message)) = receiver.next().await { while let Some(Ok(message)) = receiver.next().await {
if let Err(e) = state if let Err(e) = state
.tx_incoming_message_handler .tx_incoming_message_handler
@ -77,7 +82,8 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, ad
tracing::error!("Error relaying received message: {}", e) tracing::error!("Error relaying received message: {}", e)
}; };
} }
}); })
.expect("Can't start user recv task");
// If either task completes then abort the other // If either task completes then abort the other
tokio::select! { tokio::select! {