diff --git a/Cargo.lock b/Cargo.lock index f8dd092..0090758 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "autocfg" version = "1.2.0" @@ -38,6 +47,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "bitflags" version = "2.5.0" @@ -69,15 +84,16 @@ checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" name = "cards" version = "0.1.0" dependencies = [ - "futures", "futures-channel", "futures-util", + "pretty_env_logger", "rand", "serde", "serde_json", "tokio", - "tokio-tungstenite", + "tokio-stream", "url", + "warp", ] [[package]] @@ -127,6 +143,34 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "env_logger" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "fnv" version = "1.0.7" @@ -142,21 +186,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "futures" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.30" @@ -173,23 +202,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" -[[package]] -name = "futures-executor" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" - [[package]] name = "futures-macro" version = "0.3.30" @@ -219,13 +231,10 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ - "futures-channel", "futures-core", - "futures-io", "futures-macro", "futures-sink", "futures-task", - "memchr", "pin-project-lite", "pin-utils", "slab", @@ -258,12 +267,72 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" + +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64", + "bytes", + "headers-core", + "http 0.2.12", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http 0.2.12", +] + [[package]] name = "hermit-abi" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.1.0" @@ -275,12 +344,59 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "hyper" +version = "0.14.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http 0.2.12", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "idna" version = "0.5.0" @@ -291,6 +407,27 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "2.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +dependencies = [ + "equivalent", + "hashbrown", +] + +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "itoa" version = "1.0.11" @@ -325,6 +462,22 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.7.2" @@ -345,6 +498,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.12", + "httparse", + "log", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -364,6 +535,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + [[package]] name = "parking_lot" version = "0.12.2" @@ -393,6 +570,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -411,6 +608,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pretty_env_logger" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" +dependencies = [ + "env_logger", + "log", +] + [[package]] name = "proc-macro2" version = "1.0.81" @@ -468,6 +675,35 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -480,6 +716,12 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -517,6 +759,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.10.6" @@ -562,6 +816,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "syn" version = "2.0.60" @@ -573,6 +833,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.59" @@ -638,6 +907,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.21.0" @@ -650,6 +930,52 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "tokio-util" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "log", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "tungstenite" version = "0.21.0" @@ -659,7 +985,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 1.1.0", "httparse", "log", "rand", @@ -675,6 +1001,15 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -719,12 +1054,59 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + +[[package]] +name = "warp" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http 0.2.12", + "hyper", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tungstenite", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "winapi-util" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 631091e..8cce217 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,11 +8,9 @@ futures-channel = "0.3.30" rand = "0.8.5" serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.115" -futures-util = { version = "0.3.30", default-features = false, features = [ - "sink", - "std", -] } -tokio = { version = "1.37.0", features = ["full"] } -tokio-tungstenite = "*" +tokio = { version = "1", features = ["full"] } +warp = "0.3" url = "2.5.0" -futures = "0.3.30" +pretty_env_logger = "0.5.0" +tokio-stream = "0.1.15" +futures-util = "0.3.30" diff --git a/examples/client.rs b/examples/client.rs deleted file mode 100644 index 8d457b4..0000000 --- a/examples/client.rs +++ /dev/null @@ -1,58 +0,0 @@ -use futures_util::{future, pin_mut, StreamExt}; -use std::env; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - time::sleep, - time::Duration, -}; -use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; - -// Our helper method which will read data from stdin and send it along the -// sender provided. -async fn read_stdin(tx: futures_channel::mpsc::UnboundedSender) { - let mut stdin = tokio::io::stdin(); - - loop { - let mut buf = vec![0; 1024]; - let n = stdin.read(&mut buf).await; - buf.truncate(n.unwrap()); - tx.unbounded_send(Message::binary(buf)).unwrap(); - } -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - // sleep to avoid racing the server on startup when using cargo watch - sleep(Duration::from_millis(1000)).await; - - let connect_addr = env::args() - .nth(1) - .unwrap_or_else(|| "ws://127.0.0.1:8080".to_string()); - let url = url::Url::parse(&connect_addr)?; - - let (stdin_tx, stdin_rx) = futures_channel::mpsc::unbounded(); - tokio::spawn(read_stdin(stdin_tx)); - - let (ws_stream, res) = connect_async(url).await?; - - println!( - "WebSocket handshake has been successfully completed\n{:#?}", - res - ); - - let (sink, stream) = ws_stream.split(); - - let stdin_to_ws = stdin_rx.map(Ok).forward(sink); - - let ws_to_stdout = { - stream.for_each(|message| async { - let data = message.unwrap().into_data(); - tokio::io::stdout().write_all(&data).await.unwrap(); - }) - }; - - pin_mut!(stdin_to_ws, ws_to_stdout); - future::select(stdin_to_ws, ws_to_stdout).await; - - Ok(()) -} diff --git a/readme.md b/readme.md index c2bc6bb..4ea83ee 100644 --- a/readme.md +++ b/readme.md @@ -8,7 +8,8 @@ This started as a problem trying to play games with friends who are all on diffe ## Dev stuff: `cargo run` to run server -`cargo run --example client` to spawn a test client + +open `localhost:3030` in your browser to run test client or connect to `ws://localhost:3030/chat` with a custom client ## TODO: diff --git a/src/main.rs b/src/main.rs index 0afff49..38ef780 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,19 +1,14 @@ -use futures_channel::mpsc::{unbounded, UnboundedSender}; -use futures_util::{future, pin_mut, stream::TryStreamExt, SinkExt, StreamExt}; -use std::{ - collections::HashMap, - env, - error::Error, - fs, - net::SocketAddr, - result::Result, - sync::{Arc, Mutex}, +use futures_util::{SinkExt, StreamExt, TryFutureExt}; +use std::collections::HashMap; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, }; -use tokio::net::{TcpListener, TcpStream}; -use tokio_tungstenite::tungstenite::protocol::Message; - -type Tx = UnboundedSender; -type PeerMap = Arc>>; +use std::{error::Error, fs, result::Result}; +use tokio::sync::{mpsc, RwLock}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use warp::ws::{Message, WebSocket}; +use warp::Filter; #[allow(non_snake_case)] pub mod CAHd_game; @@ -76,77 +71,169 @@ fn test() -> Result<(), Box> { Ok(()) } -async fn handle_connection( - peer_map: PeerMap, - raw_stream: TcpStream, - addr: SocketAddr, -) -> Result<(), tokio_tungstenite::tungstenite::Error> { - println!("Incoming TCP connection from: {}", addr); +/// Our global unique user id counter. +static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); - let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; - println!("WebSocket connection established: {}", addr); - - // Insert the write part of this peer to the peer map. - let (tx, rx) = unbounded(); - - peer_map.lock().unwrap().insert(addr, tx); - - let (mut sink, stream) = ws_stream.split(); - - sink.send(Message::Text("hey".to_string())).await?; - - let broadcast_incoming = stream.try_for_each(|msg| { - println!( - "Received a message from {}: {}", - addr, - msg.to_text().unwrap() - ); - let peers = peer_map.lock().unwrap(); - - // We want to broadcast the message to everyone except ourselves. - let broadcast_recipients = peers - .iter() - .filter(|(peer_addr, _)| peer_addr != &&addr) - .map(|(_, ws_sink)| ws_sink); - - for recp in broadcast_recipients { - recp.unbounded_send(msg.clone()).unwrap(); - } - - future::ok(()) - }); - - let receive_from_others = rx.map(Ok).forward(sink); - - pin_mut!(broadcast_incoming, receive_from_others); - future::select(broadcast_incoming, receive_from_others).await; - - println!("{} disconnected", &addr); - peer_map.lock().unwrap().remove(&addr); - - Ok(()) -} +/// Our state of currently connected users. +/// +/// - Key is their id +/// - Value is a sender of `warp::ws::Message` +type Users = Arc>>>; #[tokio::main] async fn main() -> Result<(), Box> { + pretty_env_logger::init(); test()?; - let addr = env::args() - .nth(1) - .unwrap_or_else(|| "127.0.0.1:8080".to_string()); + // Keep track of all connected users, key is usize, value + // is a websocket sender. + let users = Users::default(); + // Turn our "state" into a new Filter... + let users = warp::any().map(move || users.clone()); - let state = PeerMap::new(Mutex::new(HashMap::new())); + // GET /chat -> websocket upgrade + let chat = warp::path("chat") + // The `ws()` filter will prepare Websocket handshake... + .and(warp::ws()) + .and(users) + .map(|ws: warp::ws::Ws, users| { + // This will call our function if the handshake succeeds. + ws.on_upgrade(move |socket| user_connected(socket, users)) + }); - // Create the event loop and TCP listener we'll accept connections on. - let try_socket = TcpListener::bind(&addr).await; - let listener = try_socket?; + // GET / -> index html + let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML)); - println!("\nListening on: {}", addr); + let routes = index.or(chat); - // Let's spawn the handling of each connection in a separate task. - while let Ok((stream, addr)) = listener.accept().await { - tokio::spawn(handle_connection(state.clone(), stream, addr)); - } + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; Ok(()) } + +async fn user_connected(ws: WebSocket, users: Users) { + // Use a counter to assign a new unique ID for this user. + let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); + + eprintln!("User {} connected!", my_id); + + // Split the socket into a sender and receive of messages. + let (mut user_ws_tx, mut user_ws_rx) = ws.split(); + + // Use an unbounded channel to handle buffering and flushing of messages + // to the websocket... + let (tx, rx) = mpsc::unbounded_channel(); + let mut rx = UnboundedReceiverStream::new(rx); + + let _ = user_ws_tx.send(Message::text(format!("Server Message: Welcome User {}",my_id))).await; + tokio::task::spawn(async move { + while let Some(message) = rx.next().await { + user_ws_tx + .send(message) + .unwrap_or_else(|e| { + eprintln!("websocket send error: {}", e); + }) + .await; + } + }); + + // Save the sender in our list of connected users. + users.write().await.insert(my_id, tx); + + // Return a `Future` that is basically a state machine managing + // this specific user's connection. + + // Every time the user sends a message, broadcast it to + // all other users... + while let Some(result) = user_ws_rx.next().await { + let msg = match result { + Ok(msg) => msg, + Err(e) => { + eprintln!("websocket error(uid={}): {}", my_id, e); + break; + } + }; + user_message(my_id, msg, &users).await; + } + + // user_ws_rx stream will keep processing as long as the user stays + // connected. Once they disconnect, then... + user_disconnected(my_id, &users).await; +} + +async fn user_message(my_id: usize, msg: Message, users: &Users) { + // Skip any non-Text messages... + let msg = if let Ok(s) = msg.to_str() { + s + } else { + return; + }; + + let new_msg = format!(": {}", my_id, msg); + + // New message from this user, send it to everyone else (except same uid)... + for (&uid, tx) in users.read().await.iter() { + if my_id != uid { + if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) { + // The tx is disconnected, our `user_disconnected` code + // should be happening in another task, nothing more to + // do here. + } + } + } +} + +async fn user_disconnected(my_id: usize, users: &Users) { + eprintln!("User {} left.", my_id); + + // Stream closed up, so remove from the user list + users.write().await.remove(&my_id); +} + +static INDEX_HTML: &str = r#" + + + Warp Chat + + +

Cards

+
+

Connecting...

+
+ + + + + +"#;