start cleaning up

This commit is contained in:
Adam 2024-08-05 01:55:05 -04:00
parent d930583e12
commit 67e29f2a5d
6 changed files with 248 additions and 235 deletions

View file

@ -1,201 +0,0 @@
use crate::AppState;
use anyhow::Result;
use axum::{
extract::{
ws::{Message, WebSocket},
ConnectInfo, State, WebSocketUpgrade,
},
response::IntoResponse,
};
use futures::stream::SplitSink;
use futures::{SinkExt, StreamExt};
use lib::*;
use rand::seq::SliceRandom;
use serde_json::to_string;
use server::*;
use std::{
net::SocketAddr,
sync::{Arc, RwLock},
};
pub mod message_handler;
use crate::message_handler::*;
/// Establish the WebSocket connection
pub async fn websocket_connection_handler(
ws: WebSocketUpgrade,
// user_agent: Option<TypedHeader<headers::UserAgent>>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
tracing::debug!("New connection from {}", &addr);
ws.on_upgrade(move |socket| on_websocket_connection(socket, state, addr))
}
/// This runs right after a WebSocket connection is established
pub async fn on_websocket_connection(stream: WebSocket, state: Arc<AppState>, addr: SocketAddr) {
// Split channels to send and receive asynchronously.
let (mut sender, mut receiver) = stream.split();
// Set up new user
handle_new_user(&mut sender, &state, &addr)
.await
.expect("Error creating new user!");
// Subscribe to receive from global broadcast channel
let mut rx = state.tx.subscribe();
// Send messages to this client
let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if sender.send(Message::Text(msg)).await.is_err() {
break;
}
}
});
// Receive messages from this client
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(message)) = receiver.next().await {
message_handler(state.clone(), addr, message)
.await
.expect("Message Handler exploded!")
}
});
// If either task completes then abort the other
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(),
};
}
/// Create, Register, and Hydrate new user
async fn handle_new_user(
sender: &mut SplitSink<WebSocket, Message>,
state: &Arc<AppState>,
addr: &SocketAddr,
) -> Result<()> {
// Create
let new_user = Arc::new(RwLock::new(generate_new_user(state)));
// Notify client of new username
sender
.send(Message::Text(client_self_user_update(&new_user)))
.await?;
// Register using `addr` as key until something longer lived exists
state.online_users.write().unwrap().insert(*addr, new_user);
// Hydrate client
// this should probably be combined and sent as one
sender.send(Message::Text(chat_meta_update(state))).await?;
sender.send(Message::Text(motd())).await?;
sender
.send(Message::Text(server_summary_update(state)))
.await?;
sender.send(Message::Text(games_update(state))).await?;
sender.send(Message::Text(cards_meta_update(state))).await?;
// Broadcast new user's existence
// this should probably be combined and sent as one
state.tx.send(announce_join(state, addr))?;
state.tx.send(server_summary_update(state))?;
state.tx.send(chat_meta_update(state))?;
Ok(())
}
/// Create a new user object from incoming data
fn generate_new_user(state: &Arc<AppState>) -> User {
User {
name: format!(
"{} {}",
state.first_names.choose(&mut rand::thread_rng()).unwrap(),
state.last_names.choose(&mut rand::thread_rng()).unwrap(),
),
}
}
/// Generate message to notify client of user changes
fn client_self_user_update(new_user: &Arc<RwLock<User>>) -> String {
to_string::<UserUpdate>(&UserUpdate {
username: new_user.read().unwrap().name.clone(),
})
.unwrap()
}
/// Generate chatroom metadata update
fn chat_meta_update(state: &Arc<AppState>) -> String {
// this may get expensive if there are many users
let mut names = vec![];
for user in state.online_users.read().unwrap().iter() {
names.push(user.1.read().unwrap().name.clone());
}
to_string::<ChatUpdate>(&ChatUpdate {
room: "Lobby".to_string(),
users: names,
})
.unwrap()
}
/// Generage cards meta message
fn cards_meta_update(state: &Arc<AppState>) -> String {
tracing::debug!("sending cards meta");
to_string::<CardPacksMeta>(&state.packs_meta).unwrap()
}
/// Generate message-of-the-day server greeting
fn motd() -> String {
to_string::<ChatMessage>(&ChatMessage {
text: "Greetings from the game server!".to_string(),
})
.unwrap()
}
/// Generate server summary update - mostly debug stuff
fn server_summary_update(state: &Arc<AppState>) -> String {
let online_users = state.online_users.read().unwrap().len();
let active_games = state.games.read().unwrap().len();
to_string::<ServerStateSummary>(&ServerStateSummary {
online_users,
active_games,
})
.unwrap()
}
/// Generate games list update
fn games_update(state: &Arc<AppState>) -> String {
// this may get expensive if there are many games
let mut names = vec![];
for game in state.games.read().unwrap().iter() {
names.push(format!(
"Name: {} Host: {}",
game.name,
game.host.read().unwrap().name
));
}
to_string::<GamesUpdate>(&GamesUpdate { games: names }).unwrap()
}
/// Generate chatroom join announcement
fn announce_join(state: &Arc<AppState>, addr: &SocketAddr) -> String {
let msg = format!(
"{} joined.",
state
.online_users
.read()
.unwrap()
.get(addr)
.unwrap()
.read()
.unwrap()
.name
);
tracing::debug!("{}", &msg);
to_string::<ChatMessage>(&ChatMessage { text: msg }).unwrap()
}

View file

@ -1,3 +1,5 @@
#![feature(if_let_guard)]
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use lib::*; use lib::*;
use rand::prelude::IteratorRandom; use rand::prelude::IteratorRandom;
@ -11,6 +13,10 @@ use std::{
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use tokio::sync::broadcast; use tokio::sync::broadcast;
pub mod websocket;
pub mod meta;
pub mod user;
use crate::user::*;
/// Card Set /// Card Set
#[derive(Debug)] #[derive(Debug)]
@ -68,18 +74,6 @@ pub struct NewGameManifest {
pub host: Arc<RwLock<User>>, pub host: Arc<RwLock<User>>,
} }
/// User
#[derive(Default, Debug, Eq, PartialEq, Hash)]
pub struct User {
pub name: String,
}
impl User {
pub fn change_name(&mut self, new_name: String) {
self.name = new_name;
}
}
/// A struct that represents a player /// A struct that represents a player
#[derive(Debug)] #[derive(Debug)]
pub struct Player { pub struct Player {

View file

@ -1,7 +1,8 @@
#![feature(if_let_guard)] use crate::websocket::*;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use axum::{routing::get, Router}; use axum::{routing::get, Router};
use server::user::*;
use server::*;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
net::SocketAddr, net::SocketAddr,
@ -10,9 +11,6 @@ use std::{
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tower_http::services::ServeDir; use tower_http::services::ServeDir;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
pub mod api;
use crate::api::*;
use server::*;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {

80
server/src/meta.rs Normal file
View file

@ -0,0 +1,80 @@
use crate::AppState;
use lib::*;
use serde_json::to_string;
use std::{net::SocketAddr, sync::Arc};
/// Generate chatroom metadata update
pub fn meta_chat_update(state: &Arc<AppState>) -> String {
// this may get expensive if there are many users
let mut names = vec![];
for user in state.online_users.read().unwrap().iter() {
names.push(user.1.read().unwrap().name.clone());
}
to_string::<ChatUpdate>(&ChatUpdate {
room: "Lobby".to_string(),
users: names,
})
.unwrap()
}
/// Generage cards meta message
pub fn meta_new_game_card_packs(state: &Arc<AppState>) -> String {
tracing::debug!("sending cards meta");
to_string::<CardPacksMeta>(&state.packs_meta).unwrap()
}
/// Generate message-of-the-day server greeting
pub fn meta_motd() -> String {
to_string::<ChatMessage>(&ChatMessage {
text: "Greetings from the game server!".to_string(),
})
.unwrap()
}
/// Generate server summary update - mostly debug stuff
pub fn meta_server_summary_update(state: &Arc<AppState>) -> String {
let online_users = state.online_users.read().unwrap().len();
let active_games = state.games.read().unwrap().len();
to_string::<ServerStateSummary>(&ServerStateSummary {
online_users,
active_games,
})
.unwrap()
}
/// Generate games list update
pub fn meta_games_browser_update(state: &Arc<AppState>) -> String {
// this may get expensive if there are many games
let mut names = vec![];
for game in state.games.read().unwrap().iter() {
names.push(format!(
"Name: {} Host: {}",
game.name,
game.host.read().unwrap().name
));
}
to_string::<GamesUpdate>(&GamesUpdate { games: names }).unwrap()
}
/// Generate chatroom join announcement
pub fn meta_announce_user_join(state: &Arc<AppState>, addr: &SocketAddr) -> String {
let msg = format!(
"{} joined.",
state
.online_users
.read()
.unwrap()
.get(addr)
.unwrap()
.read()
.unwrap()
.name
);
tracing::debug!("{}", &msg);
to_string::<ChatMessage>(&ChatMessage { text: msg }).unwrap()
}

37
server/src/user.rs Normal file
View file

@ -0,0 +1,37 @@
use crate::AppState;
use lib::*;
use rand::prelude::SliceRandom;
use serde_json::to_string;
use std::sync::Arc;
use std::sync::RwLock;
/// User
#[derive(Default, Debug, Eq, PartialEq, Hash)]
pub struct User {
pub name: String,
}
impl User {
/// Create a new user object from incoming data
pub fn new(state: &Arc<AppState>) -> User {
User {
name: format!(
"{} {}",
state.first_names.choose(&mut rand::thread_rng()).unwrap(),
state.last_names.choose(&mut rand::thread_rng()).unwrap(),
),
}
}
pub fn change_name(&mut self, new_name: String) {
self.name = new_name;
}
}
/// Generate message to notify client of user changes
pub fn user_client_self_update(new_user: &Arc<RwLock<User>>) -> String {
to_string::<UserUpdate>(&UserUpdate {
username: new_user.read().unwrap().name.clone(),
})
.unwrap()
}

View file

@ -1,13 +1,118 @@
use crate::api::*; use crate::meta::*;
use crate::user::*;
use crate::AppState; use crate::AppState;
use crate::Arc; use crate::Game;
use crate::NewGameManifest;
use anyhow::Result; use anyhow::Result;
use axum::extract::ws::CloseFrame; use axum::extract::ws::CloseFrame;
use axum::{
extract::{
ws::{Message, WebSocket},
ConnectInfo, State, WebSocketUpgrade,
},
response::IntoResponse,
};
use futures::stream::SplitSink;
use futures::{SinkExt, StreamExt};
use lib::*;
use serde_json::{from_str, to_string}; use serde_json::{from_str, to_string};
use std::{
net::SocketAddr,
sync::{Arc, RwLock},
};
use tokio::sync::broadcast::Sender; use tokio::sync::broadcast::Sender;
/// Establish the WebSocket connection
pub async fn websocket_connection_handler(
ws: WebSocketUpgrade,
// user_agent: Option<TypedHeader<headers::UserAgent>>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
tracing::debug!("New connection from {}", &addr);
ws.on_upgrade(move |socket| websocket_on_connection(socket, state, addr))
}
/// This runs right after a WebSocket connection is established
pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, addr: SocketAddr) {
// Split channels to send and receive asynchronously.
let (mut sender, mut receiver) = stream.split();
// Set up new user
user_handle_new(&mut sender, &state, &addr)
.await
.expect("Error creating new user!");
// Subscribe to receive from global broadcast channel
let mut rx = state.tx.subscribe();
// Send messages to this client
let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if sender.send(Message::Text(msg)).await.is_err() {
break;
}
}
});
// Receive messages from this client
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(message)) = receiver.next().await {
websocket_message_handler(state.clone(), addr, message)
.await
.expect("Message Handler exploded!")
}
});
// If either task completes then abort the other
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(),
};
}
/// Create, Register, and Hydrate new user
pub async fn user_handle_new(
sender: &mut SplitSink<WebSocket, Message>,
state: &Arc<AppState>,
addr: &SocketAddr,
) -> Result<()> {
// Create
let new_user = Arc::new(RwLock::new(User::new(state)));
// Notify client of new username
sender
.send(Message::Text(user_client_self_update(&new_user)))
.await?;
// Register using `addr` as key until something longer lived exists
state.online_users.write().unwrap().insert(*addr, new_user);
// Hydrate client
// this should probably be combined and sent as one
sender.send(Message::Text(meta_chat_update(state))).await?;
sender.send(Message::Text(meta_motd())).await?;
sender
.send(Message::Text(meta_server_summary_update(state)))
.await?;
sender
.send(Message::Text(meta_games_browser_update(state)))
.await?;
sender
.send(Message::Text(meta_new_game_card_packs(state)))
.await?;
// Broadcast new user's existence
// this should probably be combined and sent as one
state.tx.send(meta_announce_user_join(state, addr))?;
state.tx.send(meta_server_summary_update(state))?;
state.tx.send(meta_chat_update(state))?;
Ok(())
}
/// Handle incoming messages over the WebSocket /// Handle incoming messages over the WebSocket
pub async fn message_handler( pub async fn websocket_message_handler(
state: Arc<AppState>, state: Arc<AppState>,
addr: SocketAddr, addr: SocketAddr,
message: Message, message: Message,
@ -18,13 +123,13 @@ pub async fn message_handler(
Message::Text(text) => match text { Message::Text(text) => match text {
_new_game if let Ok(_new_game) = from_str::<NewGameRequest>(&text) => { _new_game if let Ok(_new_game) = from_str::<NewGameRequest>(&text) => {
tracing::debug!("New game request received."); tracing::debug!("New game request received.");
handle_new_game(_new_game, &state, tx, addr)?; game_handle_new_game(_new_game, &state, tx, addr)?;
} }
_chat_message if let Ok(_chat_message) = from_str::<ChatMessage>(&text) => { _chat_message if let Ok(_chat_message) = from_str::<ChatMessage>(&text) => {
handle_chat_message(_chat_message, &state, tx, addr)?; websocket_handle_chat_message(_chat_message, &state, tx, addr)?;
} }
_user_log_in if let Ok(_user_log_in) = from_str::<UserLogIn>(&text) => { _user_log_in if let Ok(_user_log_in) = from_str::<UserLogIn>(&text) => {
handle_user_log_in(_user_log_in, &state, tx, addr)?; websocket_handle_user_log_in(_user_log_in, &state, tx, addr)?;
} }
_ => { _ => {
tracing::debug!("Unhandled text message: {}", &text); tracing::debug!("Unhandled text message: {}", &text);
@ -34,7 +139,7 @@ pub async fn message_handler(
tracing::debug!("Binary: {:?}", data) tracing::debug!("Binary: {:?}", data)
} }
Message::Close(close_frame) => { Message::Close(close_frame) => {
handle_close(close_frame, &state, tx, addr)?; websocket_handle_close(close_frame, &state, tx, addr)?;
} }
Message::Pong(ping) => { Message::Pong(ping) => {
tracing::debug!("Pong received with: {:?}", ping); tracing::debug!("Pong received with: {:?}", ping);
@ -48,7 +153,7 @@ pub async fn message_handler(
} }
/// This runs when a NewGameRequest is received /// This runs when a NewGameRequest is received
fn handle_new_game( fn game_handle_new_game(
new_game: NewGameRequest, new_game: NewGameRequest,
state: &Arc<AppState>, state: &Arc<AppState>,
tx: &Sender<String>, tx: &Sender<String>,
@ -69,15 +174,15 @@ fn handle_new_game(
// create game // create game
if let Ok(new_game_object) = Game::new(manifest) { if let Ok(new_game_object) = Game::new(manifest) {
state.games.write().unwrap().push(new_game_object); state.games.write().unwrap().push(new_game_object);
tx.send(games_update(state))?; tx.send(meta_games_browser_update(state))?;
tx.send(server_summary_update(state))?; tx.send(meta_server_summary_update(state))?;
} }
Ok(()) Ok(())
} }
/// This runs when a ChatMessage is received /// This runs when a ChatMessage is received
fn handle_chat_message( fn websocket_handle_chat_message(
chat_message: ChatMessage, chat_message: ChatMessage,
state: &Arc<AppState>, state: &Arc<AppState>,
tx: &Sender<String>, tx: &Sender<String>,
@ -91,7 +196,7 @@ fn handle_chat_message(
} }
/// This runs when a UserLogIn is received /// This runs when a UserLogIn is received
fn handle_user_log_in( fn websocket_handle_user_log_in(
user_log_in: UserLogIn, user_log_in: UserLogIn,
state: &Arc<AppState>, state: &Arc<AppState>,
tx: &Sender<String>, tx: &Sender<String>,
@ -168,8 +273,8 @@ fn handle_user_log_in(
state.online_users.read().unwrap().len(), state.online_users.read().unwrap().len(),
state.offline_users.read().unwrap().len() state.offline_users.read().unwrap().len()
); );
tx.send(games_update(state))?; tx.send(meta_games_browser_update(state))?;
tx.send(chat_meta_update(state))?; tx.send(meta_chat_update(state))?;
// send the user their new name // send the user their new name
@ -177,7 +282,7 @@ fn handle_user_log_in(
} }
/// This runs when a connection closes /// This runs when a connection closes
fn handle_close( fn websocket_handle_close(
close_frame: Option<CloseFrame>, close_frame: Option<CloseFrame>,
state: &Arc<AppState>, state: &Arc<AppState>,
tx: &Sender<String>, tx: &Sender<String>,
@ -235,8 +340,8 @@ fn handle_close(
state.online_users.write().unwrap().remove(&addr).unwrap(), state.online_users.write().unwrap().remove(&addr).unwrap(),
); );
tx.send(server_summary_update(state))?; tx.send(meta_server_summary_update(state))?;
tx.send(chat_meta_update(state))?; tx.send(meta_chat_update(state))?;
Ok(()) Ok(())
} }