diff --git a/Cargo.lock b/Cargo.lock index 0090758..36c8faa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -730,18 +730,18 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.198" +version = "1.0.199" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" +checksum = "0c9f6e76df036c77cd94996771fb40db98187f096dd0b9af39c6c6e452ba966a" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.198" +version = "1.0.199" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" +checksum = "11bd257a6541e141e42ca6d24ae26f7714887b47e89aa739099104c7e4d3b7fc" dependencies = [ "proc-macro2", "quote", diff --git a/src/api.rs b/src/api.rs new file mode 100644 index 0000000..88048cd --- /dev/null +++ b/src/api.rs @@ -0,0 +1,102 @@ +use futures_util::{SinkExt, StreamExt, TryFutureExt}; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; +use tokio::sync::{mpsc, RwLock}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use warp::ws::{Message, WebSocket}; + +/// Our global unique user id counter. +static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); + +/// Our state of currently connected users. +/// +/// - Key is their id +/// - Value is a sender of `warp::ws::Message` +pub type Users = Arc>>>; + +pub async fn on_user_connected(ws: WebSocket, users: Users) { + // Use a counter to assign a new unique ID for this user. + let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); + + eprintln!("User {} connected!", my_id); + + // Split the socket into a sender and receive of messages. + let (mut user_ws_tx, mut user_ws_rx) = ws.split(); + + // Use an unbounded channel to handle buffering and flushing of messages + // to the websocket... + let (tx, rx) = mpsc::unbounded_channel(); + let mut rx = UnboundedReceiverStream::new(rx); + + let _ = user_ws_tx + .send(Message::text(format!("Server: Welcome User {}", my_id))) + .await; + + tokio::task::spawn(async move { + while let Some(message) = rx.next().await { + user_ws_tx + .send(message) + .unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }) + .await; + } + }); + + // Save the sender in our list of connected users. + users.write().await.insert(my_id, tx); + + // Return a `Future` that is basically a state machine managing + // this specific user's connection. + + // Every time the user sends a message, broadcast it to + // all other users... + while let Some(result) = user_ws_rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(e) => { + eprintln!("websocket error(uid={}): {}", my_id, e); + break; + } + }; + handle_incoming_message(my_id, msg, &users).await; + } + + // user_ws_rx stream will keep processing as long as the user stays + // connected. Once they disconnect, then... + on_user_disconnected(my_id, &users).await; +} + +pub async fn handle_incoming_message(my_id: usize, msg: Message, users: &Users) { + // Skip any non-Text messages... + let msg = if let Ok(s) = msg.to_str() { + s + } else { + return; + }; + + let new_msg = format!("User {}: {}", my_id, msg); + + // New message from this user, send it to everyone else (except same uid)... + for (&uid, tx) in users.read().await.iter() { + if my_id != uid { + if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) { + // The tx is disconnected, our `on_user_disconnected` code + // should be happening in another task, nothing more to + // do here. + } + } + } +} + +pub async fn on_user_disconnected(my_id: usize, users: &Users) { + eprintln!("User {} left.", my_id); + + // Stream closed up, so remove from the user list + users.write().await.remove(&my_id); +} diff --git a/src/main.rs b/src/main.rs index c1691c9..84d2202 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,25 +1,14 @@ -use futures_util::{SinkExt, StreamExt, TryFutureExt}; -use std::{ - collections::HashMap, - error::Error, - fs, - result::Result, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, -}; -use tokio::sync::{mpsc, RwLock}; -use tokio_stream::wrappers::UnboundedReceiverStream; -use warp::{ - ws::{Message, WebSocket}, - Filter, -}; +use std::fs::{read, read_to_string}; +use std::{error::Error, fs, result::Result}; +use warp::Filter; #[allow(non_snake_case)] pub mod CAHd_game; use crate::CAHd_game::*; +pub mod api; +use crate::api::*; + /// Parse json for card data fn load_json(path: &str) -> Result, Box> { let data: String = fs::read_to_string(path).expect("Error reading file"); @@ -77,15 +66,6 @@ fn test() -> Result<(), Box> { Ok(()) } -/// Our global unique user id counter. -static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); - -/// Our state of currently connected users. -/// -/// - Key is their id -/// - Value is a sender of `warp::ws::Message` -type Users = Arc>>>; - #[tokio::main] async fn main() -> Result<(), Box> { pretty_env_logger::init(); @@ -93,7 +73,7 @@ async fn main() -> Result<(), Box> { // Keep track of all connected users, key is usize, value // is a websocket sender. - let users = Users::default(); + let users = api::Users::default(); // Turn our "state" into a new Filter... let users = warp::any().map(move || users.clone()); @@ -104,153 +84,17 @@ async fn main() -> Result<(), Box> { .and(users) .map(|ws: warp::ws::Ws, users| { // This will call our function if the handshake succeeds. - ws.on_upgrade(move |socket| user_connected(socket, users)) + ws.on_upgrade(move |socket| on_user_connected(socket, users)) }); // GET / -> index html - let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML)); + let index = warp::get() + .and(warp::path::end()) + .and(warp::fs::file("./test_client.html")); let routes = index.or(chat); - warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; + warp::serve(routes).run(([0, 0, 0, 0], 3030)).await; Ok(()) } - -async fn user_connected(ws: WebSocket, users: Users) { - // Use a counter to assign a new unique ID for this user. - let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); - - eprintln!("User {} connected!", my_id); - - // Split the socket into a sender and receive of messages. - let (mut user_ws_tx, mut user_ws_rx) = ws.split(); - - // Use an unbounded channel to handle buffering and flushing of messages - // to the websocket... - let (tx, rx) = mpsc::unbounded_channel(); - let mut rx = UnboundedReceiverStream::new(rx); - - let _ = user_ws_tx - .send(Message::text(format!( - "Server: Welcome User {}", - my_id - ))) - .await; - - tokio::task::spawn(async move { - while let Some(message) = rx.next().await { - user_ws_tx - .send(message) - .unwrap_or_else(|e| { - eprintln!("websocket send error: {}", e); - }) - .await; - } - }); - - // Save the sender in our list of connected users. - users.write().await.insert(my_id, tx); - - // Return a `Future` that is basically a state machine managing - // this specific user's connection. - - // Every time the user sends a message, broadcast it to - // all other users... - while let Some(result) = user_ws_rx.next().await { - let msg = match result { - Ok(msg) => msg, - Err(e) => { - eprintln!("websocket error(uid={}): {}", my_id, e); - break; - } - }; - user_message(my_id, msg, &users).await; - } - - // user_ws_rx stream will keep processing as long as the user stays - // connected. Once they disconnect, then... - user_disconnected(my_id, &users).await; -} - -async fn user_message(my_id: usize, msg: Message, users: &Users) { - // Skip any non-Text messages... - let msg = if let Ok(s) = msg.to_str() { - s - } else { - return; - }; - - let new_msg = format!("User {}: {}", my_id, msg); - - // New message from this user, send it to everyone else (except same uid)... - for (&uid, tx) in users.read().await.iter() { - if my_id != uid { - if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) { - // The tx is disconnected, our `user_disconnected` code - // should be happening in another task, nothing more to - // do here. - } - } - } -} - -async fn user_disconnected(my_id: usize, users: &Users) { - eprintln!("User {} left.", my_id); - - // Stream closed up, so remove from the user list - users.write().await.remove(&my_id); -} - -static INDEX_HTML: &str = r#" - - - Cards For Humanity Test Client - - -

Cards

-
-

Connecting...

-
- Chat: -
- -
- -
- -
- - - -"#; diff --git a/test_client.html b/test_client.html new file mode 100644 index 0000000..53508c8 --- /dev/null +++ b/test_client.html @@ -0,0 +1,51 @@ + + + + Cards For Humanity Test Client + + +

Cards

+
+

Connecting...

+
+ Chat: +
+ +
+ +
+ +
+ + +