use futures_util::{SinkExt, StreamExt, TryFutureExt}; use std::{ collections::HashMap, error::Error, fs, result::Result, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, }; use tokio::sync::{mpsc, RwLock}; use tokio_stream::wrappers::UnboundedReceiverStream; use warp::{ ws::{Message, WebSocket}, Filter, }; #[allow(non_snake_case)] pub mod CAHd_game; use crate::CAHd_game::*; /// Parse json for card data fn load_json(path: &str) -> Result, Box> { let data: String = fs::read_to_string(path).expect("Error reading file"); let jayson: Vec = serde_json::from_str(&data)?; Ok(jayson) } fn test() -> Result<(), Box> { // choose decks let cards_input_path: &str = "data/cah-cards-full.json"; // TODO: this should be a master card database and pointers // to the cards should be passed to the game instead of actual cards let chosen_packs: Vec = load_json(cards_input_path)?; println!("{}", &chosen_packs.len()); let test_player0 = CAHPlayer { player_name: "Adam".to_string(), role: PlayerRole::Host, white: vec![], black: vec![], }; let test_player1 = CAHPlayer { player_name: "Ferris".to_string(), role: PlayerRole::Player, white: vec![], black: vec![], }; // make some games // use hashmap? let mut games: Vec = vec![]; // create game with/for player 0 let test_game0 = NewGameRequest { name: "Test0".to_string(), host: test_player0, packs: chosen_packs, }; games.push(CAHGame::new(test_game0)?); // a new game request struct but this player is a player games[0].create_player(test_player1)?; // start round games[0].game_start()?; println!("----------------------"); for card in &games[0].players[0].white { println!("{}", card.text); } Ok(()) } /// Our global unique user id counter. static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); /// Our state of currently connected users. /// /// - Key is their id /// - Value is a sender of `warp::ws::Message` type Users = Arc>>>; #[tokio::main] async fn main() -> Result<(), Box> { pretty_env_logger::init(); test()?; // Keep track of all connected users, key is usize, value // is a websocket sender. let users = Users::default(); // Turn our "state" into a new Filter... let users = warp::any().map(move || users.clone()); // GET /chat -> websocket upgrade let chat = warp::path("chat") // The `ws()` filter will prepare Websocket handshake... .and(warp::ws()) .and(users) .map(|ws: warp::ws::Ws, users| { // This will call our function if the handshake succeeds. ws.on_upgrade(move |socket| user_connected(socket, users)) }); // GET / -> index html let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML)); let routes = index.or(chat); warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; Ok(()) } async fn user_connected(ws: WebSocket, users: Users) { // Use a counter to assign a new unique ID for this user. let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); eprintln!("User {} connected!", my_id); // Split the socket into a sender and receive of messages. let (mut user_ws_tx, mut user_ws_rx) = ws.split(); // Use an unbounded channel to handle buffering and flushing of messages // to the websocket... let (tx, rx) = mpsc::unbounded_channel(); let mut rx = UnboundedReceiverStream::new(rx); let _ = user_ws_tx .send(Message::text(format!( "Server Message: Welcome User {}", my_id ))) .await; tokio::task::spawn(async move { while let Some(message) = rx.next().await { user_ws_tx .send(message) .unwrap_or_else(|e| { eprintln!("websocket send error: {}", e); }) .await; } }); // Save the sender in our list of connected users. users.write().await.insert(my_id, tx); // Return a `Future` that is basically a state machine managing // this specific user's connection. // Every time the user sends a message, broadcast it to // all other users... while let Some(result) = user_ws_rx.next().await { let msg = match result { Ok(msg) => msg, Err(e) => { eprintln!("websocket error(uid={}): {}", my_id, e); break; } }; user_message(my_id, msg, &users).await; } // user_ws_rx stream will keep processing as long as the user stays // connected. Once they disconnect, then... user_disconnected(my_id, &users).await; } async fn user_message(my_id: usize, msg: Message, users: &Users) { // Skip any non-Text messages... let msg = if let Ok(s) = msg.to_str() { s } else { return; }; let new_msg = format!("User {}: {}", my_id, msg); // New message from this user, send it to everyone else (except same uid)... for (&uid, tx) in users.read().await.iter() { if my_id != uid { if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) { // The tx is disconnected, our `user_disconnected` code // should be happening in another task, nothing more to // do here. } } } } async fn user_disconnected(my_id: usize, users: &Users) { eprintln!("User {} left.", my_id); // Stream closed up, so remove from the user list users.write().await.remove(&my_id); } static INDEX_HTML: &str = r#" Cards For Humanity Test Client

Cards

Connecting...

"#;