cards/server/src/websocket.rs

179 lines
5.2 KiB
Rust
Raw Normal View History

2024-08-07 06:23:27 -04:00
use crate::user_handler::*;
2024-05-04 02:23:40 -04:00
use crate::AppState;
2024-08-05 01:55:05 -04:00
use crate::Game;
use crate::NewGameManifest;
use crate::User;
2024-07-30 03:22:32 -04:00
use anyhow::Result;
2024-08-05 01:55:05 -04:00
use axum::{
extract::{
ws::{Message, WebSocket},
ConnectInfo, State, WebSocketUpgrade,
},
response::IntoResponse,
};
use futures::{SinkExt, StreamExt};
use lib::*;
use rand::prelude::SliceRandom;
2024-08-08 04:59:29 -04:00
use serde_json::from_str;
use std::collections::HashMap;
2024-08-06 00:33:37 -04:00
use std::{
net::SocketAddr,
sync::{Arc, RwLock},
};
use tokio::sync::{broadcast::Sender, mpsc};
2024-07-30 03:22:32 -04:00
2024-08-05 01:55:05 -04:00
/// Establish the WebSocket connection
pub async fn websocket_connection_handler(
ws: WebSocketUpgrade,
// user_agent: Option<TypedHeader<headers::UserAgent>>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
tracing::debug!("New connection from {}", &addr);
ws.on_upgrade(move |socket| websocket_on_connection(socket, state, addr))
}
/// This runs right after a WebSocket connection is established
pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, addr: SocketAddr) {
// Split channels to send and receive asynchronously.
let (mut sender, mut receiver) = stream.split();
2024-08-06 02:26:57 -04:00
// Create channel for direct messages
let (dm_tx, mut dm_rx) = mpsc::channel(30);
let mut map = HashMap::new();
map.insert(addr, dm_tx.clone());
let _ = state
.users_tx
// add tx
.send(UserHandlerMessage::NewUser {
user: User::new(
format!(
"{} {}",
state.first_names.choose(&mut rand::thread_rng()).unwrap(),
state.last_names.choose(&mut rand::thread_rng()).unwrap(),
),
dm_tx.clone(),
),
addr,
})
.await;
2024-08-05 01:55:05 -04:00
// Subscribe to receive from global broadcast channel
let mut rx = state.broadcast_tx.subscribe();
2024-08-05 01:55:05 -04:00
// Send messages to this client
let mut send_task = tokio::spawn(async move {
2024-08-07 05:30:30 -04:00
let mut broadcast = None;
let mut dm = None;
loop {
2024-08-07 05:30:30 -04:00
tokio::select! {
b = rx.recv() => broadcast = Some(b.unwrap()),
d = dm_rx.recv() => dm = d,
};
if let Some(msg) = &dm {
if sender.send(Message::Text(msg.to_string())).await.is_err() {
break;
2024-08-07 05:30:30 -04:00
} else {
dm = Option::None;
}
2024-08-07 05:30:30 -04:00
} else if let Some(msg) = &broadcast {
if sender.send(Message::Text(msg.to_string())).await.is_err() {
} else {
broadcast = Option::None;
}
2024-08-05 01:55:05 -04:00
}
}
});
// Receive messages from this client
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(message)) = receiver.next().await {
state
.messages_tx
.send((addr.clone(), message.clone()))
.await
.unwrap();
2024-08-05 01:55:05 -04:00
websocket_message_handler(state.clone(), addr, message)
.await
.expect("Message Handler exploded!")
}
});
// If either task completes then abort the other
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(),
};
}
2024-08-02 00:41:48 -04:00
/// Handle incoming messages over the WebSocket
2024-08-05 01:55:05 -04:00
pub async fn websocket_message_handler(
2024-08-02 00:41:48 -04:00
state: Arc<AppState>,
addr: SocketAddr,
message: Message,
) -> Result<()> {
let tx = &state.broadcast_tx;
2024-08-02 00:41:48 -04:00
match message {
Message::Text(text) => match text {
_new_game if let Ok(_new_game) = from_str::<NewGameRequest>(&text) => {
tracing::debug!("New game request received.");
2024-08-05 01:55:05 -04:00
game_handle_new_game(_new_game, &state, tx, addr)?;
2024-08-02 00:41:48 -04:00
}
_ => {
tracing::debug!("Unhandled text message: {}", &text);
}
},
Message::Binary(data) => {
tracing::debug!("Binary: {:?}", data)
}
2024-08-08 04:54:02 -04:00
Message::Close(_close_frame) => {
// websocket_handle_close(close_frame, &state, tx, addr)?;
2024-08-02 00:41:48 -04:00
}
Message::Pong(ping) => {
tracing::debug!("Pong received with: {:?}", ping);
}
Message::Ping(pong) => {
tracing::debug!("Pong received with: {:?}", pong);
}
}
Ok(())
}
2024-07-30 03:29:20 -04:00
/// This runs when a NewGameRequest is received
2024-08-05 01:55:05 -04:00
fn game_handle_new_game(
2024-07-30 03:22:32 -04:00
new_game: NewGameRequest,
state: &Arc<AppState>,
tx: &Sender<String>,
2024-08-01 23:47:28 -04:00
addr: SocketAddr,
2024-07-30 03:22:32 -04:00
) -> Result<()> {
2024-08-01 23:47:28 -04:00
let manifest = NewGameManifest {
name: new_game.name,
2024-08-02 02:35:31 -04:00
host: state
.online_users
2024-08-03 00:42:32 -04:00
.read()
2024-08-02 02:35:31 -04:00
.unwrap()
.get(&addr)
.unwrap()
.clone(),
2024-08-01 23:47:28 -04:00
};
2024-08-04 03:13:34 -04:00
tracing::debug!("Game Packs {:?}", new_game.packs);
2024-08-01 23:47:28 -04:00
// create game
2024-08-02 02:35:31 -04:00
if let Ok(new_game_object) = Game::new(manifest) {
2024-08-06 00:33:37 -04:00
state
.games
.write()
.unwrap()
.insert(new_game_object.name.clone(), RwLock::new(new_game_object));
2024-08-05 01:55:05 -04:00
tx.send(meta_games_browser_update(state))?;
tx.send(meta_server_summary_update(state))?;
2024-08-02 02:35:31 -04:00
}
2024-07-30 03:22:32 -04:00
Ok(())
}