This commit is contained in:
Adam 2024-08-07 05:30:30 -04:00
parent 165adddc8d
commit 1f51d9d504
5 changed files with 110 additions and 82 deletions

View file

@ -2,7 +2,6 @@
use crate::mpsc::Sender; use crate::mpsc::Sender;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use axum::extract::ws::Message;
use lib::*; use lib::*;
use rand::prelude::IteratorRandom; use rand::prelude::IteratorRandom;
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
@ -17,6 +16,7 @@ use std::{
}; };
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
pub mod websocket; pub mod websocket;
pub mod user_handler;
/// User /// User
#[derive(Default, Debug, Eq, PartialEq, Hash)] #[derive(Default, Debug, Eq, PartialEq, Hash)]
@ -355,13 +355,13 @@ pub fn load_names(path: &str) -> Result<Vec<String>> {
#[derive(Debug)] #[derive(Debug)]
pub struct NewUser { pub struct NewUser {
pub sender: Sender<Message>, pub sender: Sender<String>,
pub addr: SocketAddr, pub addr: SocketAddr,
} }
// Our shared state // Our shared state
pub struct AppState { pub struct AppState {
pub tx: broadcast::Sender<String>, pub broadcast_channel: broadcast::Sender<String>,
pub users_tx: mpsc::Sender<NewUser>, pub users_tx: mpsc::Sender<NewUser>,
pub first_names: Vec<String>, pub first_names: Vec<String>,
pub last_names: Vec<String>, pub last_names: Vec<String>,

View file

@ -1,8 +1,5 @@
use crate::meta::*;
use crate::user::*;
use crate::websocket::*; use crate::websocket::*;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use axum::extract::ws::Message;
use axum::{routing::get, Router}; use axum::{routing::get, Router};
use server::*; use server::*;
use std::{ use std::{
@ -13,6 +10,7 @@ use std::{
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
use tower_http::services::ServeDir; use tower_http::services::ServeDir;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use user_handler::UserHandler;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@ -26,7 +24,7 @@ async fn main() -> Result<()> {
.init(); .init();
// Set up state // Set up state
let (tx, _rx) = broadcast::channel(100); let (broadcast_channel, _rx) = broadcast::channel(100);
let (users_tx, mut users_rx) = mpsc::channel(100); let (users_tx, mut users_rx) = mpsc::channel(100);
let first_names = load_names("data/first.txt")?; let first_names = load_names("data/first.txt")?;
let last_names = load_names("data/last.txt")?; let last_names = load_names("data/last.txt")?;
@ -37,7 +35,7 @@ async fn main() -> Result<()> {
let games = RwLock::new(HashMap::new()); let games = RwLock::new(HashMap::new());
let app_state = Arc::new(AppState { let app_state = Arc::new(AppState {
tx, broadcast_channel,
users_tx, users_tx,
first_names, first_names,
last_names, last_names,
@ -49,72 +47,11 @@ async fn main() -> Result<()> {
games, games,
}); });
let cloned_state = app_state.clone(); let user_handler = UserHandler::new(app_state.clone());
let _user_handler = tokio::spawn(async move { let _user_handler = tokio::spawn(async move {
while let Some(message) = users_rx.recv().await { while let Some(message) = users_rx.recv().await {
// user_handler.process(message).await.unwrap();
// Create, Register, and Hydrate new user
//
// Create
let new_user = Arc::new(RwLock::new(User::new(&cloned_state)));
// Notify client of new username
message
.sender
.send(Message::Text(user_client_self_update(&new_user)))
.await
.unwrap();
// Register using `addr` as key until something longer lived exists
cloned_state
.online_users
.write()
.unwrap()
.insert(message.addr, new_user.clone());
// Hydrate client
// this should probably be combined and sent as one
message
.sender
.send(Message::Text(meta_chat_update(&cloned_state)))
.await
.unwrap();
message
.sender
.send(Message::Text(meta_motd()))
.await
.unwrap();
message
.sender
.send(Message::Text(meta_server_summary_update(&cloned_state)))
.await
.unwrap();
message
.sender
.send(Message::Text(meta_games_browser_update(&cloned_state)))
.await
.unwrap();
message
.sender
.send(Message::Text(meta_new_game_card_packs(&cloned_state)))
.await
.unwrap();
// Broadcast new user's existence
// this should probably be combined and sent as one
let _ = &cloned_state
.tx
.send(meta_announce_user_join(&cloned_state, &message.addr))
.unwrap();
let _ = &cloned_state
.tx
.send(meta_server_summary_update(&cloned_state))
.unwrap();
let _ = &cloned_state
.tx
.send(meta_chat_update(&cloned_state))
.unwrap();
} }
}); });

View file

@ -0,0 +1,85 @@
use crate::websocket::meta::*;
use crate::websocket::user::*;
use crate::AppState;
use crate::NewUser;
use crate::User;
use anyhow::Result;
use std::sync::{Arc, RwLock};
pub struct UserHandler {
state: Arc<AppState>,
}
impl UserHandler {
pub fn new(state: Arc<AppState>) -> Self {
UserHandler { state }
}
pub async fn process(&self, message: NewUser) -> Result<()> {
//
// Create, Register, and Hydrate new user
//
let new_user = Arc::new(RwLock::new(User::new(&self.state)));
// Notify client of new username
message
.sender
.send(user_client_self_update(&new_user))
.await
.unwrap();
// Register using `addr` as key until something longer lived exists
self.state
.online_users
.write()
.unwrap()
.insert(message.addr, new_user.clone());
// Hydrate client
// this should probably be combined and sent as one
message
.sender
.send(meta_chat_update(&self.state))
.await
.unwrap();
message
.sender
.send(meta_server_summary_update(&self.state))
.await
.unwrap();
message
.sender
.send(meta_games_browser_update(&self.state))
.await
.unwrap();
message
.sender
.send(meta_new_game_card_packs(&self.state))
.await
.unwrap();
// Broadcast new user's existence
// this should probably be combined and sent as one
let _ = &self
.state
.broadcast_channel
.send(meta_announce_user_join(&self.state, &message.addr))
.unwrap();
let _ = &self
.state
.broadcast_channel
.send(meta_server_summary_update(&self.state))
.unwrap();
let _ = &self
.state
.broadcast_channel
.send(meta_chat_update(&self.state))
.unwrap();
// this races the broadcasts but if it's done last it'll probably show up
// last
message.sender.send(meta_motd()).await.unwrap();
Ok(())
}
}

View file

@ -51,21 +51,28 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, ad
.await; .await;
// Subscribe to receive from global broadcast channel // Subscribe to receive from global broadcast channel
let mut rx = state.tx.subscribe(); let mut rx = state.broadcast_channel.subscribe();
// Send messages to this client // Send messages to this client
let mut send_task = tokio::spawn(async move { let mut send_task = tokio::spawn(async move {
let mut broadcast = None;
let mut dm = None;
loop { loop {
// Global broadcast channel tokio::select! {
while let Ok(msg) = rx.recv().await { b = rx.recv() => broadcast = Some(b.unwrap()),
if sender.send(Message::Text(msg)).await.is_err() { d = dm_rx.recv() => dm = d,
};
if let Some(msg) = &dm {
if sender.send(Message::Text(msg.to_string())).await.is_err() {
break; break;
} else {
dm = Option::None;
} }
} } else if let Some(msg) = &broadcast {
// Direct message channel if sender.send(Message::Text(msg.to_string())).await.is_err() {
while let Some(msg) = dm_rx.recv().await { } else {
if sender.send(msg).await.is_err() { broadcast = Option::None;
break;
} }
} }
} }
@ -93,7 +100,7 @@ pub async fn websocket_message_handler(
addr: SocketAddr, addr: SocketAddr,
message: Message, message: Message,
) -> Result<()> { ) -> Result<()> {
let tx = &state.tx; let tx = &state.broadcast_channel;
match message { match message {
Message::Text(text) => match text { Message::Text(text) => match text {

View file

@ -21,7 +21,6 @@ pub fn meta_chat_update(state: &Arc<AppState>) -> String {
/// Generage cards meta message /// Generage cards meta message
pub fn meta_new_game_card_packs(state: &Arc<AppState>) -> String { pub fn meta_new_game_card_packs(state: &Arc<AppState>) -> String {
tracing::debug!("sending cards meta");
to_string::<CardPacksMeta>(&state.packs_meta).unwrap() to_string::<CardPacksMeta>(&state.packs_meta).unwrap()
} }