diff --git a/src/main.rs b/src/main.rs index e568da8..0afff49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,101 +1,33 @@ -use serde_json::Result; -use std::fs; +use futures_channel::mpsc::{unbounded, UnboundedSender}; +use futures_util::{future, pin_mut, stream::TryStreamExt, SinkExt, StreamExt}; use std::{ collections::HashMap, env, + error::Error, + fs, net::SocketAddr, + result::Result, sync::{Arc, Mutex}, }; - -use futures_channel::mpsc::{unbounded, UnboundedSender}; -use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt}; - use tokio::net::{TcpListener, TcpStream}; use tokio_tungstenite::tungstenite::protocol::Message; type Tx = UnboundedSender; type PeerMap = Arc>>; -async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr) { - println!("Incoming TCP connection from: {}", addr); - - let ws_stream = tokio_tungstenite::accept_async(raw_stream) - .await - .expect("Error during the websocket handshake occurred"); - println!("WebSocket connection established: {}", addr); - - // Insert the write part of this peer to the peer map. - let (tx, rx) = unbounded(); - peer_map.lock().unwrap().insert(addr, tx); - - let (outgoing, incoming) = ws_stream.split(); - - let broadcast_incoming = incoming.try_for_each(|msg| { - println!( - "Received a message from {}: {}", - addr, - msg.to_text().unwrap() - ); - let peers = peer_map.lock().unwrap(); - - // We want to broadcast the message to everyone except ourselves. - let broadcast_recipients = peers - .iter() - .filter(|(peer_addr, _)| peer_addr != &&addr) - .map(|(_, ws_sink)| ws_sink); - - for recp in broadcast_recipients { - recp.unbounded_send(msg.clone()).unwrap(); - } - - future::ok(()) - }); - - let receive_from_others = rx.map(Ok).forward(outgoing); - - pin_mut!(broadcast_incoming, receive_from_others); - future::select(broadcast_incoming, receive_from_others).await; - - println!("{} disconnected", &addr); - peer_map.lock().unwrap().remove(&addr); -} - -#[tokio::main] -async fn main() -> Result<()> { - test()?; - - let addr = env::args() - .nth(1) - .unwrap_or_else(|| "127.0.0.1:8080".to_string()); - - let state = PeerMap::new(Mutex::new(HashMap::new())); - - // Create the event loop and TCP listener we'll accept connections on. - let try_socket = TcpListener::bind(&addr).await; - let listener = try_socket.expect("Failed to bind"); - println!("\nListening on: {}", addr); - - // Let's spawn the handling of each connection in a separate task. - while let Ok((stream, addr)) = listener.accept().await { - tokio::spawn(handle_connection(state.clone(), stream, addr)); - } - - Ok(()) -} - #[allow(non_snake_case)] pub mod CAHd_game; use crate::CAHd_game::*; /// Parse json for card data -fn load_json(path: &str) -> Result> { +fn load_json(path: &str) -> Result, Box> { let data: String = fs::read_to_string(path).expect("Error reading file"); let jayson: Vec = serde_json::from_str(&data)?; Ok(jayson) } -fn test() -> Result<()> { +fn test() -> Result<(), Box> { // choose decks let cards_input_path: &str = "data/cah-cards-full.json"; @@ -140,6 +72,81 @@ fn test() -> Result<()> { println!("----------------------"); for card in &games[0].players[0].white { println!("{}", card.text); - }; + } + Ok(()) +} + +async fn handle_connection( + peer_map: PeerMap, + raw_stream: TcpStream, + addr: SocketAddr, +) -> Result<(), tokio_tungstenite::tungstenite::Error> { + println!("Incoming TCP connection from: {}", addr); + + let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; + println!("WebSocket connection established: {}", addr); + + // Insert the write part of this peer to the peer map. + let (tx, rx) = unbounded(); + + peer_map.lock().unwrap().insert(addr, tx); + + let (mut sink, stream) = ws_stream.split(); + + sink.send(Message::Text("hey".to_string())).await?; + + let broadcast_incoming = stream.try_for_each(|msg| { + println!( + "Received a message from {}: {}", + addr, + msg.to_text().unwrap() + ); + let peers = peer_map.lock().unwrap(); + + // We want to broadcast the message to everyone except ourselves. + let broadcast_recipients = peers + .iter() + .filter(|(peer_addr, _)| peer_addr != &&addr) + .map(|(_, ws_sink)| ws_sink); + + for recp in broadcast_recipients { + recp.unbounded_send(msg.clone()).unwrap(); + } + + future::ok(()) + }); + + let receive_from_others = rx.map(Ok).forward(sink); + + pin_mut!(broadcast_incoming, receive_from_others); + future::select(broadcast_incoming, receive_from_others).await; + + println!("{} disconnected", &addr); + peer_map.lock().unwrap().remove(&addr); + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + test()?; + + let addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:8080".to_string()); + + let state = PeerMap::new(Mutex::new(HashMap::new())); + + // Create the event loop and TCP listener we'll accept connections on. + let try_socket = TcpListener::bind(&addr).await; + let listener = try_socket?; + + println!("\nListening on: {}", addr); + + // Let's spawn the handling of each connection in a separate task. + while let Ok((stream, addr)) = listener.accept().await { + tokio::spawn(handle_connection(state.clone(), stream, addr)); + } + Ok(()) }