mostly move message handler and update new user name to client

This commit is contained in:
Adam 2024-08-08 04:07:23 -04:00
parent 7a278cda42
commit 0e30551221
6 changed files with 270 additions and 164 deletions

View file

@ -1,3 +1,5 @@
use std::net::SocketAddr;
use serde::{Deserialize, Serialize};
/// Card Pack Meta

View file

@ -1,10 +1,9 @@
#![feature(if_let_guard)]
use crate::mpsc::Sender;
use anyhow::{Context, Result};
use axum::extract::ws::Message;
use lib::*;
use rand::prelude::IteratorRandom;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use serde::Deserialize;
use std::{
@ -14,26 +13,24 @@ use std::{
net::SocketAddr,
sync::{Arc, RwLock},
};
use tokio::sync::mpsc::Sender;
use tokio::sync::{broadcast, mpsc};
pub mod websocket;
use user_handler::UserHandlerMessage;
pub mod message_handler;
pub mod user_handler;
pub mod websocket;
/// User
#[derive(Default, Debug, Eq, PartialEq, Hash)]
#[derive(Debug)]
pub struct User {
pub name: String,
pub tx: Sender<String>,
}
impl User {
/// Create a new user object from incoming data
pub fn new(state: &Arc<AppState>) -> User {
User {
name: format!(
"{} {}",
state.first_names.choose(&mut rand::thread_rng()).unwrap(),
state.last_names.choose(&mut rand::thread_rng()).unwrap(),
),
}
pub fn new(name: String, tx: Sender<String>) -> User {
User { name, tx }
}
pub fn change_name(&mut self, new_name: String) {
@ -109,7 +106,7 @@ pub struct Player {
}
/// The game master
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct Game {
/// The name of the game
pub name: String,
@ -151,7 +148,15 @@ impl Game {
pub fn new(request: NewGameManifest) -> Result<Self> {
let mut game = Game {
..Default::default()
name: request.host.read().unwrap().name.clone(),
host: request.host.clone(),
white: vec![],
black: vec![],
white_discard: vec![],
black_discard: vec![],
game_active: false,
players: vec![],
current_black: Option::None,
};
tracing::debug!(
"Creating game {} with {} as host",
@ -353,16 +358,11 @@ pub fn load_names(path: &str) -> Result<Vec<String>> {
Ok(buf)
}
#[derive(Debug)]
pub struct NewUser {
pub sender: Sender<String>,
pub addr: SocketAddr,
}
// Our shared state
pub struct AppState {
pub broadcast_channel: broadcast::Sender<String>,
pub users_tx: mpsc::Sender<NewUser>,
pub broadcast_tx: broadcast::Sender<String>,
pub users_tx: mpsc::Sender<UserHandlerMessage>,
pub messages_tx: mpsc::Sender<(SocketAddr, Message)>,
pub first_names: Vec<String>,
pub last_names: Vec<String>,
pub reserved_names: RwLock<HashSet<String>>,

View file

@ -1,3 +1,4 @@
use crate::message_handler::*;
use crate::websocket::*;
use anyhow::{Context, Result};
use axum::{routing::get, Router};
@ -24,8 +25,9 @@ async fn main() -> Result<()> {
.init();
// Set up state
let (broadcast_channel, _rx) = broadcast::channel(100);
let (broadcast_tx, _rx) = broadcast::channel(100);
let (users_tx, mut users_rx) = mpsc::channel(100);
let (messages_tx, mut messages_rx) = mpsc::channel(100);
let first_names = load_names("data/first.txt")?;
let last_names = load_names("data/last.txt")?;
let reserved_names = RwLock::new(HashSet::<String>::new());
@ -35,8 +37,9 @@ async fn main() -> Result<()> {
let games = RwLock::new(HashMap::new());
let app_state = Arc::new(AppState {
broadcast_channel,
broadcast_tx,
users_tx,
messages_tx,
first_names,
last_names,
reserved_names,
@ -47,10 +50,17 @@ async fn main() -> Result<()> {
games,
});
let message_handler = MessageHandler::new(app_state.clone());
tokio::spawn(async move {
while let Some((addr, message)) = messages_rx.recv().await {
message_handler.handle(addr, message).await;
}
});
let user_handler = UserHandler::new(app_state.clone());
tokio::spawn(async move {
while let Some(message) = users_rx.recv().await {
user_handler.process(message).await;
user_handler.handle(message).await;
}
});

View file

@ -0,0 +1,59 @@
use crate::user_handler::*;
use crate::AppState;
use axum::extract::ws::Message;
use lib::*;
use serde_json::{from_str, to_string};
use std::net::SocketAddr;
use std::sync::Arc;
/// Handle incoming messages over the WebSocket
pub struct MessageHandler {
state: Arc<AppState>,
}
impl MessageHandler {
pub fn new(state: Arc<AppState>) -> Self {
MessageHandler { state }
}
pub async fn handle(&self, addr: SocketAddr, message: Message) {
match message {
Message::Text(text) => match text {
_chat_message if let Ok(chat_message) = from_str::<ChatMessage>(&text) => {
// This should be delegated to user handler and an outgoing message and/or chat handler
let msg = format! {"{0}: {1}", self.state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text};
tracing::debug!("{msg}");
self.state
.broadcast_tx
.send(to_string::<ChatMessage>(&ChatMessage { text: msg }).unwrap())
.unwrap();
}
_user_log_in if let Ok(user_log_in) = from_str::<UserLogIn>(&text) => {
self.state
.users_tx
.send(UserHandlerMessage::UserLogIn {
username: user_log_in.username,
addr,
})
.await
.unwrap();
tracing::debug!("passed login to user handler");
}
_ => tracing::debug!("Unhandled text from {}", addr),
},
Message::Binary(data) => tracing::debug!("{} sent binary: {:?}", addr, data),
Message::Close(_close_frame) => {
// make this
// websocket_handle_close(close_frame, &state, tx, addr)?;
}
Message::Pong(ping) => {
tracing::debug!("Pong received with: {:?}", ping);
}
Message::Ping(pong) => {
tracing::debug!("Pong received with: {:?}", pong);
}
}
}
}

View file

@ -1,11 +1,15 @@
use crate::AppState;
use crate::NewUser;
use crate::User;
use lib::*;
use serde_json::to_string;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
pub enum UserHandlerMessage {
NewUser { user: User, addr: SocketAddr },
UserLogIn { username: String, addr: SocketAddr },
}
pub struct UserHandler {
state: Arc<AppState>,
}
@ -15,43 +19,38 @@ impl UserHandler {
UserHandler { state }
}
pub async fn process(&self, message: NewUser) {
pub async fn handle(&self, message: UserHandlerMessage) {
match message {
UserHandlerMessage::NewUser { user, addr } => {
// make this not async
self.set_up_new_user(user, addr).await
}
UserHandlerMessage::UserLogIn { username, addr } => self.login(username, addr).await,
}
}
async fn set_up_new_user(&self, user: User, addr: SocketAddr) {
//
// Create, Register, and Hydrate new user
//
let new_user = Arc::new(RwLock::new(User::new(&self.state)));
let tx = user.tx.clone();
let new_user = Arc::new(RwLock::new(user));
// Notify client of new username
message
.sender
.send(user_client_self_update(&new_user))
.await
.unwrap();
tx.send(user_client_self_update(&new_user)).await.unwrap();
// Register using `addr` as key until something longer lived exists
self.state
.online_users
.write()
.unwrap()
.insert(message.addr, new_user.clone());
.insert(addr, new_user.clone());
// Hydrate client
// this should probably be combined and sent as one
//
// message.sender.send(meta_chat_update(&self.state)).await;
// message
// .sender
// .send(meta_server_summary_update(&self.state))
// .await
// .unwrap();
message
.sender
.send(meta_games_browser_update(&self.state))
tx.send(meta_games_browser_update(&self.state))
.await
.unwrap();
message
.sender
.send(meta_new_game_card_packs(&self.state))
tx.send(meta_new_game_card_packs(&self.state))
.await
.unwrap();
@ -59,23 +58,145 @@ impl UserHandler {
// this should probably be combined and sent as one
let _ = &self
.state
.broadcast_channel
.send(meta_announce_user_join(&self.state, &message.addr))
.broadcast_tx
.send(meta_announce_user_join(&self.state, &addr))
.unwrap();
let _ = &self
.state
.broadcast_channel
.broadcast_tx
.send(meta_server_summary_update(&self.state))
.unwrap();
let _ = &self
.state
.broadcast_channel
.broadcast_tx
.send(meta_chat_update(&self.state))
.unwrap();
// this races the broadcasts but if it's done last it'll probably show up
// last
message.sender.send(meta_motd()).await.unwrap();
tx.send(meta_motd()).await.unwrap();
}
async fn login(&self, username: String, addr: SocketAddr) {
// User's DM channel
let dm = self
.state
.online_users
.read()
.unwrap()
.get(&addr)
.unwrap()
.read()
.unwrap()
.tx
.clone();
let broadcast = self.state.broadcast_tx.clone();
let old_name = self
.state
.online_users
.read()
.unwrap()
.get(&addr)
.unwrap()
.read()
.unwrap()
.name
.clone();
let new_name = username.clone();
if self
.state
.offline_users
.read()
.unwrap()
.contains_key(&new_name)
{
self.state
.online_users
.write()
.unwrap()
.insert(
addr,
self.state
.offline_users
.write()
.unwrap()
.remove(&new_name)
.unwrap(),
)
.unwrap();
let msg = format! {
"{0} changed name to {1}. Welcome back!",
old_name,
new_name
};
tracing::debug!("{msg}");
} else if self
.state
.reserved_names
.read()
.unwrap()
.contains(&new_name)
{
tracing::debug!("name is taken");
} else {
self.state
.online_users
.write()
.unwrap()
.get_mut(&addr)
.unwrap()
.write()
.unwrap()
.change_name(username);
let msg = format! {
"{0} changed name to {1}.",
old_name,
new_name
};
// Reserve name
self.state
.reserved_names
.write()
.unwrap()
.insert(new_name.clone());
tracing::debug!("{msg}");
broadcast
.send(to_string::<ChatMessage>(&ChatMessage { text: msg }).unwrap())
.unwrap();
tracing::debug!("Name {} reserved.", &new_name);
}
tracing::debug!(
"Online Users: {} Offline Users: {}",
self.state.online_users.read().unwrap().len(),
self.state.offline_users.read().unwrap().len()
);
broadcast
.send(meta_games_browser_update(&self.state))
.unwrap();
broadcast.send(meta_chat_update(&self.state)).unwrap();
// send the user their new name
dm.send(
to_string::<UserUpdate>(&UserUpdate {
username: new_name.clone(),
})
.unwrap(),
)
.await
.unwrap();
tracing::debug!(" HI! login received {} {} {}", addr, new_name, old_name)
}
}

View file

@ -2,7 +2,7 @@ use crate::user_handler::*;
use crate::AppState;
use crate::Game;
use crate::NewGameManifest;
use crate::NewUser;
use crate::User;
use anyhow::Result;
use axum::extract::ws::CloseFrame;
use axum::{
@ -14,7 +14,9 @@ use axum::{
};
use futures::{SinkExt, StreamExt};
use lib::*;
use rand::prelude::SliceRandom;
use serde_json::{from_str, to_string};
use std::collections::HashMap;
use std::{
net::SocketAddr,
sync::{Arc, RwLock},
@ -40,16 +42,27 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, ad
// Create channel for direct messages
let (dm_tx, mut dm_rx) = mpsc::channel(30);
let mut map = HashMap::new();
map.insert(addr, dm_tx.clone());
let _ = state
.users_tx
.send(NewUser {
sender: dm_tx,
// add tx
.send(UserHandlerMessage::NewUser {
user: User::new(
format!(
"{} {}",
state.first_names.choose(&mut rand::thread_rng()).unwrap(),
state.last_names.choose(&mut rand::thread_rng()).unwrap(),
),
dm_tx.clone(),
),
addr,
})
.await;
// Subscribe to receive from global broadcast channel
let mut rx = state.broadcast_channel.subscribe();
let mut rx = state.broadcast_tx.subscribe();
// Send messages to this client
let mut send_task = tokio::spawn(async move {
@ -79,6 +92,11 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, ad
// Receive messages from this client
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(message)) = receiver.next().await {
state
.messages_tx
.send((addr.clone(), message.clone()))
.await
.unwrap();
websocket_message_handler(state.clone(), addr, message)
.await
.expect("Message Handler exploded!")
@ -98,7 +116,7 @@ pub async fn websocket_message_handler(
addr: SocketAddr,
message: Message,
) -> Result<()> {
let tx = &state.broadcast_channel;
let tx = &state.broadcast_tx;
match message {
Message::Text(text) => match text {
@ -106,12 +124,6 @@ pub async fn websocket_message_handler(
tracing::debug!("New game request received.");
game_handle_new_game(_new_game, &state, tx, addr)?;
}
_chat_message if let Ok(_chat_message) = from_str::<ChatMessage>(&text) => {
websocket_handle_chat_message(_chat_message, &state, tx, addr)?;
}
_user_log_in if let Ok(_user_log_in) = from_str::<UserLogIn>(&text) => {
websocket_handle_user_log_in(_user_log_in, &state, tx, addr)?;
}
_ => {
tracing::debug!("Unhandled text message: {}", &text);
}
@ -166,106 +178,6 @@ fn game_handle_new_game(
Ok(())
}
/// This runs when a ChatMessage is received
fn websocket_handle_chat_message(
chat_message: ChatMessage,
state: &Arc<AppState>,
tx: &Sender<String>,
addr: SocketAddr,
) -> Result<()> {
let msg = format! {"{0}: {1}", state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text};
tracing::debug!("{msg}");
tx.send(to_string::<ChatMessage>(&ChatMessage { text: msg })?)?;
Ok(())
}
/// This runs when a UserLogIn is received
fn websocket_handle_user_log_in(
user_log_in: UserLogIn,
state: &Arc<AppState>,
tx: &Sender<String>,
addr: SocketAddr,
) -> Result<()> {
let old_name = state
.online_users
.read()
.unwrap()
.get(&addr)
.unwrap()
.read()
.unwrap()
.name
.clone();
let new_name = user_log_in.username.clone();
if state.offline_users.read().unwrap().contains_key(&new_name) {
state
.online_users
.write()
.unwrap()
.insert(
addr,
state
.offline_users
.write()
.unwrap()
.remove(&new_name)
.unwrap(),
)
.unwrap();
let msg = format! {
"{0} changed name to {1}. Welcome back!",
old_name,
new_name
};
tracing::debug!("{msg}");
} else if state.reserved_names.read().unwrap().contains(&new_name) {
tracing::debug!("name is taken");
} else {
state
.online_users
.write()
.unwrap()
.get_mut(&addr)
.unwrap()
.write()
.unwrap()
.change_name(user_log_in.username);
let msg = format! {
"{0} changed name to {1}.",
old_name,
new_name
};
// Reserve name
state
.reserved_names
.write()
.unwrap()
.insert(new_name.clone());
tracing::debug!("{msg}");
tx.send(to_string::<ChatMessage>(&ChatMessage { text: msg })?)?;
tracing::debug!("Name {} reserved.", &new_name);
}
tracing::debug!(
"Online Users: {} Offline Users: {}",
state.online_users.read().unwrap().len(),
state.offline_users.read().unwrap().len()
);
tx.send(meta_games_browser_update(state))?;
tx.send(meta_chat_update(state))?;
// send the user their new name
Ok(())
}
/// This runs when a connection closes
fn websocket_handle_close(
close_frame: Option<CloseFrame>,
@ -309,6 +221,8 @@ fn websocket_handle_close(
tracing::debug!("{}", msg.text);
tx.send(to_string::<ChatMessage>(&msg)?)?;
// Move user to offline
let name = state
.online_users
.read()