This commit is contained in:
Adam 2024-07-30 01:52:03 -04:00
parent c2897d5ec4
commit 7c10ab9370
2 changed files with 82 additions and 75 deletions

View file

@ -1,4 +1,5 @@
use crate::AppState; use crate::AppState;
use anyhow::Result;
use axum::{ use axum::{
extract::{ extract::{
ws::{Message, WebSocket}, ws::{Message, WebSocket},
@ -6,6 +7,7 @@ use axum::{
}, },
response::IntoResponse, response::IntoResponse,
}; };
use futures::stream::SplitSink;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use lib::models::*; use lib::models::*;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
@ -15,111 +17,116 @@ use std::{net::SocketAddr, sync::Arc};
pub mod message_handler; pub mod message_handler;
use crate::message_handler::*; use crate::message_handler::*;
fn motd() -> ChatMessage { fn motd() -> String {
ChatMessage { to_string::<ChatMessage>(&ChatMessage {
text: "Greetings from the game server!".to_string(), text: "Greetings from the game server!".to_string(),
} })
.unwrap()
} }
fn server_sum_update(state: &Arc<AppState>) -> ServerStateSummary { fn server_summary_update(state: &Arc<AppState>) -> String {
ServerStateSummary { to_string::<ServerStateSummary>(&ServerStateSummary {
online_users: state.users.lock().unwrap().len(), online_users: state.users.lock().unwrap().len(),
active_games: state.games.lock().unwrap().len(), active_games: state.games.lock().unwrap().len(),
} })
.unwrap()
} }
fn generate_chat_update(state: &Arc<AppState>) -> ChatUpdate { fn chat_update(state: &Arc<AppState>) -> String {
let mut names = vec![]; let mut names = vec![];
for user in state.users.lock().unwrap().iter() { for user in state.users.lock().unwrap().iter() {
names.push(user.1.name.clone()); names.push(user.1.name.clone());
} }
ChatUpdate { to_string::<ChatUpdate>(&ChatUpdate {
room: "Lobby".to_string(), room: "Lobby".to_string(),
users: names, users: names,
} })
.unwrap()
} }
fn generate_games_update(state: &Arc<AppState>) -> GamesUpdate { fn games_update(state: &Arc<AppState>) -> String {
let mut names = vec![]; let mut names = vec![];
for game in state.games.lock().unwrap().iter() { for game in state.games.lock().unwrap().iter() {
names.push(game.name.clone()); names.push(game.name.clone());
} }
GamesUpdate { games: names } to_string::<GamesUpdate>(&GamesUpdate { games: names }).unwrap()
} }
pub async fn on_websocket_connection(stream: WebSocket, state: Arc<AppState>, addr: SocketAddr) { fn announce_join(state: &Arc<AppState>, addr: &SocketAddr) -> String {
// Add User to users let msg = format!(
let new_user = User { "{} joined.",
state.users.lock().unwrap().get(addr).unwrap().name
);
tracing::debug!("{}", &msg);
to_string::<ChatMessage>(&ChatMessage { text: msg }).unwrap()
}
fn generate_new_user(state: &Arc<AppState>) -> User {
User {
name: format!( name: format!(
"{} {}", "{} {}",
state.first_names.choose(&mut rand::thread_rng()).unwrap(), state.first_names.choose(&mut rand::thread_rng()).unwrap(),
state.last_names.choose(&mut rand::thread_rng()).unwrap(), state.last_names.choose(&mut rand::thread_rng()).unwrap(),
), ),
}; }
}
// By splitting, we can send and receive at the same time. fn client_self_user_update(new_user: &User) -> String {
to_string::<UserUpdate>(&UserUpdate {
username: new_user.name.clone(),
})
.unwrap()
}
/// Create, Register, and Hydrate new user
async fn handle_new_user(
sender: &mut SplitSink<WebSocket, Message>,
state: &Arc<AppState>,
addr: &SocketAddr,
) -> Result<()> {
// Create
let new_user = generate_new_user(state);
// Notify client of new username
sender
.send(Message::Text(client_self_user_update(&new_user)))
.await?;
// Register using `addr` as key until something longer lived exists
state.users.lock().unwrap().insert(*addr, new_user);
// Hydrate client
sender.send(Message::Text(chat_update(state))).await?;
sender.send(Message::Text(motd())).await?;
sender
.send(Message::Text(server_summary_update(state)))
.await?;
sender.send(Message::Text(games_update(state))).await?;
state.tx.send(announce_join(state, addr))?;
state.tx.send(server_summary_update(state))?;
state.tx.send(chat_update(state))?;
Ok(())
}
pub async fn on_websocket_connection(stream: WebSocket, state: Arc<AppState>, addr: SocketAddr) {
// Split channels to send and receive asynchronously.
let (mut sender, mut receiver) = stream.split(); let (mut sender, mut receiver) = stream.split();
let _ = &sender handle_new_user(&mut sender, &state, &addr)
.send(Message::Text( .await
to_string::<UserUpdate>(&UserUpdate { .expect("Error creating new user!");
username: new_user.name.to_string(),
})
.unwrap(),
))
.await;
// Add user to users using `addr` as key until something longer lived exists // Subscribe to global broadcast channel
state.users.lock().unwrap().insert(addr, new_user);
// hydrate user
let _ = &sender
.send(Message::Text(
to_string::<ChatUpdate>(&generate_chat_update(&state)).unwrap(),
))
.await;
let _ = &sender
.send(Message::Text(to_string::<ChatMessage>(&motd()).unwrap()))
.await;
let _ = &sender
.send(Message::Text(
to_string(&server_sum_update(&state)).unwrap(),
))
.await;
let _ = &sender
.send(Message::Text(
to_string(&generate_games_update(&state)).unwrap(),
))
.await;
// ANNOUNCE THY PRESENCE
let msg = ChatMessage {
text: format!(
"{} joined.",
state.users.lock().unwrap().get(&addr).unwrap().name
),
};
tracing::debug!("{}", msg.text);
let _ = &state.tx.send(to_string::<ChatMessage>(&msg).unwrap());
// Broadcast server state summary update
let _ = &state
.tx
.send(to_string(&server_sum_update(&state)).unwrap());
// Broadcast users update
let _ = &state
.tx
.send(to_string(&generate_chat_update(&state)).unwrap());
// subscribe to broadcast channel
let mut rx = state.tx.subscribe(); let mut rx = state.tx.subscribe();
// handle broadcasting further awesome messages // Submit new messages from this client to broadcast
let mut send_task = tokio::spawn(async move { let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await { while let Ok(msg) = rx.recv().await {
if sender.send(Message::Text(msg)).await.is_err() { if sender.send(Message::Text(msg)).await.is_err() {
@ -128,14 +135,14 @@ pub async fn on_websocket_connection(stream: WebSocket, state: Arc<AppState>, ad
} }
}); });
// handle new incoming messages // Send messages from broadcast down to this client
let mut recv_task = tokio::spawn(async move { let mut recv_task = tokio::spawn(async move {
while let Some(Ok(message)) = receiver.next().await { while let Some(Ok(message)) = receiver.next().await {
message_handler(message, &state, addr).await message_handler(message, &state, addr).await
} }
}); });
// if either task completes then abort the other // If either task completes then abort the other
tokio::select! { tokio::select! {
_ = (&mut send_task) => recv_task.abort(), _ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(), _ = (&mut recv_task) => send_task.abort(),

View file

@ -21,8 +21,8 @@ pub async fn message_handler(message: Message, state: &Arc<AppState>, addr: Sock
tracing::error!("Failed to convert Game object to JSON.") tracing::error!("Failed to convert Game object to JSON.")
} }
state.games.lock().unwrap().push(new_game_object); state.games.lock().unwrap().push(new_game_object);
let _ = tx.send(to_string(&generate_games_update(state)).unwrap()); let _ = tx.send(games_update(state));
let _ = tx.send(to_string(&server_sum_update(state)).unwrap()); let _ = tx.send(server_summary_update(state));
// let _update = tx.send(motd()); // let _update = tx.send(motd());
} else { } else {
let _res = tx.send(String::from("error creating game")); let _res = tx.send(String::from("error creating game"));
@ -77,8 +77,8 @@ pub async fn message_handler(message: Message, state: &Arc<AppState>, addr: Sock
tracing::debug!("{}", msg.text); tracing::debug!("{}", msg.text);
let _ = tx.send(to_string::<ChatMessage>(&msg).unwrap()); let _ = tx.send(to_string::<ChatMessage>(&msg).unwrap());
state.users.lock().unwrap().remove(&addr).unwrap(); state.users.lock().unwrap().remove(&addr).unwrap();
let _ = tx.send(to_string(&server_sum_update(state)).unwrap()); let _ = tx.send(server_summary_update(state));
let _ = tx.send(to_string(&generate_chat_update(state)).unwrap()); let _ = tx.send(chat_update(state));
} }
Message::Pong(ping) => { Message::Pong(ping) => {