use futures_util::{future, pin_mut, StreamExt}; use std::env; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, time::sleep, time::Duration, }; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; // Our helper method which will read data from stdin and send it along the // sender provided. async fn read_stdin(tx: futures_channel::mpsc::UnboundedSender) { let mut stdin = tokio::io::stdin(); loop { let mut buf = vec![0; 1024]; let n = stdin.read(&mut buf).await; buf.truncate(n.unwrap()); tx.unbounded_send(Message::binary(buf)).unwrap(); } } #[tokio::main] async fn main() -> Result<(), Box> { // sleep to avoid racing the server on startup when using cargo watch sleep(Duration::from_millis(200)).await; if let Some(connect_addr) = env::args().nth(1) { let url = url::Url::parse(&connect_addr)?; let (stdin_tx, stdin_rx) = futures_channel::mpsc::unbounded(); tokio::spawn(read_stdin(stdin_tx)); let (ws_stream, res) = connect_async(url).await?; println!( "WebSocket handshake has been successfully completed\n{:#?}", res ); let (sink, stream) = ws_stream.split(); let stdin_to_ws = stdin_rx.map(Ok).forward(sink); let ws_to_stdout = { stream.for_each(|message| async { let data = message.unwrap().into_data(); tokio::io::stdout().write_all(&data).await.unwrap(); }) }; pin_mut!(stdin_to_ws, ws_to_stdout); future::select(stdin_to_ws, ws_to_stdout).await; } else { eprintln!("This program requires at least one argument!") } Ok(()) }