single thread > lockup

This commit is contained in:
Adam 2024-10-11 00:26:24 -04:00
parent 062c85eb68
commit b2f9e622ef
4 changed files with 54 additions and 47 deletions

View file

@ -391,7 +391,6 @@ impl GameHandler {
// Register game to user
self.register_user_in_game(game_id.clone(), host.read().unwrap().uuid.clone());
self.send_game_state_update_all(vec![game_id.clone()]);
self.send_game_meta_update(vec![game_id]);

View file

@ -15,22 +15,23 @@ use tokio::sync::{broadcast, mpsc};
use tower::ServiceBuilder;
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
use tower_http::{compression::CompressionLayer, services::ServeDir};
// use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tracing_subscriber::prelude::*;
use user_handler::UserHandler;
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
// #[tokio::main]
async fn main() -> Result<()> {
// // stuff for logging
// tracing_subscriber::registry()
// .with(
// tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
// "server=trace,tower_http=trace,lib=trace,tokio=trace,runtime=trace".into()
// }),
// )
// .with(tracing_subscriber::fmt::layer())
// .init();
console_subscriber::init();
// stuff for logging
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
"server=trace,tower_http=trace,lib=trace".into()
// "server=trace,tower_http=trace,lib=trace,tokio=trace,runtime=trace".into()
}),
)
// .with(console_subscriber::ConsoleLayer::builder().spawn())
.with(tracing_subscriber::fmt::layer())
.init();
// Handle command-line args
let matches = command!()
@ -67,10 +68,10 @@ async fn main() -> Result<()> {
});
// Set up state
let (broadcast_tx, _rx) = broadcast::channel(1000);
let (users_tx, mut users_rx) = mpsc::channel(1000);
let (messages_tx, mut messages_rx) = mpsc::channel(1000);
let (games_tx, mut games_rx) = mpsc::channel(1000);
let (broadcast_tx, _rx) = broadcast::channel(1000000);
let (users_tx, mut users_rx) = mpsc::channel(1000000);
let (messages_tx, mut messages_rx) = mpsc::channel(1000000);
let (games_tx, mut games_rx) = mpsc::channel(1000000);
let first_names = load_names("data/first.txt")?;
let last_names = load_names("data/last.txt")?;
let reserved_names = RwLock::new(HashSet::<String>::new());
@ -101,30 +102,39 @@ async fn main() -> Result<()> {
// Spawn task to handle incoming messages, also handles outging messages
let message_handler = MessageHandler::new(app_state.clone());
tokio::spawn(async move {
while let Some((addr, message)) = messages_rx.recv().await {
message_handler.handle(addr, message).await;
}
});
tokio::task::Builder::new()
.name("Message Handler")
.spawn(async move {
while let Some((addr, message)) = messages_rx.recv().await {
message_handler.handle(addr, message).await
}
})
.unwrap();
// TODO: Restart handler threads if they crash
// TODO: Make an outgoing message handler handler?
// Spawn task to handle User things
let user_handler = UserHandler::new(app_state.clone());
tokio::spawn(async move {
while let Some(message) = users_rx.recv().await {
user_handler.handle(message).await;
}
});
tokio::task::Builder::new()
.name("User Handler")
.spawn(async move {
while let Some(message) = users_rx.recv().await {
user_handler.handle(message).await
}
})
.unwrap();
// Spawn task to handle Game things
let game_handler = GameHandler::new(app_state.clone());
tokio::spawn(async move {
while let Some(message) = games_rx.recv().await {
game_handler.handle(message).await;
}
});
tokio::task::Builder::new()
.name("Game Handler")
.spawn(async move {
while let Some(message) = games_rx.recv().await {
game_handler.handle(message).await
}
})
.unwrap();
// Router
let app = Router::new()

View file

@ -376,17 +376,20 @@ pub fn meta_server_summary_update(state: &Arc<AppState>) -> String {
/// Generate games list update
pub fn meta_games_browser_update(state: &Arc<AppState>) -> String {
// TODO: this may get expensive if there are many games
let mut games = vec![];
for game in state.games.read().unwrap().values() {
games.push(GameBrowserMeta {
let games = state
.games
.read()
.unwrap()
.values()
.map(|game| GameBrowserMeta {
uuid: game.read().unwrap().uuid.to_string(),
name: game.read().unwrap().name.clone(),
host: game.read().unwrap().host.read().unwrap().name.clone(),
players: game.read().unwrap().players.len(),
packs: game.read().unwrap().packs.clone(),
});
}
})
.collect::<Vec<GameBrowserMeta>>();
to_string::<GamesUpdate>(&GamesUpdate { games }).unwrap()
}

View file

@ -31,7 +31,7 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, ad
let (mut sender, mut receiver) = stream.split();
// Create channel for direct messages
let (dm_tx, mut dm_rx) = mpsc::channel(30);
let (dm_tx, mut dm_rx) = mpsc::channel(1000000);
let mut map = HashMap::new();
map.insert(addr, dm_tx.clone());
@ -51,14 +51,9 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, ad
}
}
state
.users_tx
.send(UserHandlerMessage::NewUser(
User::new(username, dm_tx.clone()),
addr,
))
.await
.expect("User handler is down");
let tx = state.users_tx.clone();
let msg = UserHandlerMessage::NewUser(User::new(username, dm_tx.clone()), addr);
tokio::spawn(async move { tx.send(msg).await.expect("User handler is down") });
// Subscribe to receive from global broadcast channel
let mut rx = state.broadcast_tx.subscribe();