shitty user direct messaging and user handler task

This commit is contained in:
Adam 2024-08-06 02:23:31 -04:00
parent cc332c63ae
commit 5ca0c8250f
4 changed files with 106 additions and 58 deletions

View file

@ -1,6 +1,8 @@
#![feature(if_let_guard)] #![feature(if_let_guard)]
use crate::mpsc::Sender;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use axum::extract::ws::Message;
use lib::*; use lib::*;
use rand::prelude::IteratorRandom; use rand::prelude::IteratorRandom;
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
@ -13,7 +15,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use tokio::sync::broadcast; use tokio::sync::{broadcast, mpsc};
pub mod websocket; pub mod websocket;
/// User /// User
@ -351,9 +353,16 @@ pub fn load_names(path: &str) -> Result<Vec<String>> {
Ok(buf) Ok(buf)
} }
#[derive(Debug)]
pub struct NewUser {
pub sender: Sender<Message>,
pub addr: SocketAddr,
}
// Our shared state // Our shared state
pub struct AppState { pub struct AppState {
pub tx: broadcast::Sender<String>, pub tx: broadcast::Sender<String>,
pub users_tx: mpsc::Sender<NewUser>,
pub first_names: Vec<String>, pub first_names: Vec<String>,
pub last_names: Vec<String>, pub last_names: Vec<String>,
pub reserved_names: RwLock<HashSet<String>>, pub reserved_names: RwLock<HashSet<String>>,

View file

@ -1,5 +1,8 @@
use crate::meta::*;
use crate::user::*;
use crate::websocket::*; use crate::websocket::*;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use axum::extract::ws::Message;
use axum::{routing::get, Router}; use axum::{routing::get, Router};
use server::*; use server::*;
use std::{ use std::{
@ -7,7 +10,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use tokio::sync::broadcast; use tokio::sync::{broadcast, mpsc};
use tower_http::services::ServeDir; use tower_http::services::ServeDir;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@ -24,6 +27,7 @@ async fn main() -> Result<()> {
// Set up state // Set up state
let (tx, _rx) = broadcast::channel(100); let (tx, _rx) = broadcast::channel(100);
let (users_tx, mut users_rx) = mpsc::channel(100);
let first_names = load_names("data/first.txt")?; let first_names = load_names("data/first.txt")?;
let last_names = load_names("data/last.txt")?; let last_names = load_names("data/last.txt")?;
let reserved_names = RwLock::new(HashSet::<String>::new()); let reserved_names = RwLock::new(HashSet::<String>::new());
@ -34,6 +38,7 @@ async fn main() -> Result<()> {
let app_state = Arc::new(AppState { let app_state = Arc::new(AppState {
tx, tx,
users_tx,
first_names, first_names,
last_names, last_names,
reserved_names, reserved_names,
@ -44,6 +49,75 @@ async fn main() -> Result<()> {
games, games,
}); });
let cloned_state = app_state.clone();
let _user_handler = tokio::spawn(async move {
while let Some(message) = users_rx.recv().await {
//
// Create, Register, and Hydrate new user
//
// Create
let new_user = Arc::new(RwLock::new(User::new(&cloned_state)));
// Notify client of new username
message
.sender
.send(Message::Text(user_client_self_update(&new_user)))
.await
.unwrap();
// Register using `addr` as key until something longer lived exists
cloned_state
.online_users
.write()
.unwrap()
.insert(message.addr, new_user.clone());
// Hydrate client
// this should probably be combined and sent as one
message
.sender
.send(Message::Text(meta_chat_update(&cloned_state)))
.await
.unwrap();
message
.sender
.send(Message::Text(meta_motd()))
.await
.unwrap();
message
.sender
.send(Message::Text(meta_server_summary_update(&cloned_state)))
.await
.unwrap();
message
.sender
.send(Message::Text(meta_games_browser_update(&cloned_state)))
.await
.unwrap();
message
.sender
.send(Message::Text(meta_new_game_card_packs(&cloned_state)))
.await
.unwrap();
// Broadcast new user's existence
// this should probably be combined and sent as one
let _ = &cloned_state
.tx
.send(meta_announce_user_join(&cloned_state, &message.addr))
.unwrap();
let _ = &cloned_state
.tx
.send(meta_server_summary_update(&cloned_state))
.unwrap();
let _ = &cloned_state
.tx
.send(meta_chat_update(&cloned_state))
.unwrap();
}
});
// Router // Router
let app = Router::new() let app = Router::new()
.route("/websocket", get(websocket_connection_handler)) .route("/websocket", get(websocket_connection_handler))

View file

@ -1,8 +1,8 @@
use crate::websocket::meta::*; use crate::websocket::meta::*;
use crate::websocket::user::*;
use crate::AppState; use crate::AppState;
use crate::Game; use crate::Game;
use crate::NewGameManifest; use crate::NewGameManifest;
use crate::NewUser;
use anyhow::Result; use anyhow::Result;
use axum::extract::ws::CloseFrame; use axum::extract::ws::CloseFrame;
use axum::{ use axum::{
@ -19,7 +19,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use tokio::sync::broadcast::Sender; use tokio::sync::{broadcast::Sender, mpsc};
pub mod meta; pub mod meta;
pub mod user; pub mod user;
@ -39,19 +39,31 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, ad
// Split channels to send and receive asynchronously. // Split channels to send and receive asynchronously.
let (mut sender, mut receiver) = stream.split(); let (mut sender, mut receiver) = stream.split();
// Set up new user let (dm_tx, mut dm_rx) = mpsc::channel(30);
user_handle_new(&mut sender, &state, &addr)
.await let _ = state
.expect("Error creating new user!"); .users_tx
.send(NewUser {
sender: dm_tx,
addr,
})
.await;
// Subscribe to receive from global broadcast channel // Subscribe to receive from global broadcast channel
let mut rx = state.tx.subscribe(); let mut rx = state.tx.subscribe();
// Send messages to this client // Send messages to this client
let mut send_task = tokio::spawn(async move { let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await { loop {
if sender.send(Message::Text(msg)).await.is_err() { while let Ok(msg) = rx.recv().await {
break; if sender.send(Message::Text(msg)).await.is_err() {
break;
}
}
while let Some(message) = dm_rx.recv().await {
if sender.send(message).await.is_err() {
break;
}
} }
} }
}); });

View file

@ -1,13 +1,6 @@
use crate::websocket::meta::*;
use crate::AppState;
use crate::User; use crate::User;
use anyhow::Result;
use axum::extract::ws::{Message, WebSocket};
use futures::stream::SplitSink;
use futures::SinkExt;
use lib::*; use lib::*;
use serde_json::to_string; use serde_json::to_string;
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
@ -18,43 +11,3 @@ pub fn user_client_self_update(new_user: &Arc<RwLock<User>>) -> String {
}) })
.unwrap() .unwrap()
} }
/// Create, Register, and Hydrate new user
pub async fn user_handle_new(
sender: &mut SplitSink<WebSocket, Message>,
state: &Arc<AppState>,
addr: &SocketAddr,
) -> Result<()> {
// Create
let new_user = Arc::new(RwLock::new(User::new(state)));
// Notify client of new username
sender
.send(Message::Text(user_client_self_update(&new_user)))
.await?;
// Register using `addr` as key until something longer lived exists
state.online_users.write().unwrap().insert(*addr, new_user);
// Hydrate client
// this should probably be combined and sent as one
sender.send(Message::Text(meta_chat_update(state))).await?;
sender.send(Message::Text(meta_motd())).await?;
sender
.send(Message::Text(meta_server_summary_update(state)))
.await?;
sender
.send(Message::Text(meta_games_browser_update(state)))
.await?;
sender
.send(Message::Text(meta_new_game_card_packs(state)))
.await?;
// Broadcast new user's existence
// this should probably be combined and sent as one
state.tx.send(meta_announce_user_join(state, addr))?;
state.tx.send(meta_server_summary_update(state))?;
state.tx.send(meta_chat_update(state))?;
Ok(())
}