From 58cf288a90611a6f461698ea0fce1178a1786a53 Mon Sep 17 00:00:00 2001 From: Adam <24621027+adoyle0@users.noreply.github.com> Date: Fri, 26 Apr 2024 23:12:42 -0400 Subject: [PATCH] clean up client and print server response --- examples/client.rs | 83 ++++++++++++++++++++++++++-------------------- 1 file changed, 47 insertions(+), 36 deletions(-) diff --git a/examples/client.rs b/examples/client.rs index b40326f..144696a 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,48 +1,59 @@ -use std::env; - use futures_util::{future, pin_mut, StreamExt}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use std::env; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + time::sleep, + time::Duration, +}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; -#[tokio::main] -async fn main() { - let connect_addr = env::args() - .nth(1) - .unwrap_or_else(|| panic!("this program requires at least one argument")); - - let url = url::Url::parse(&connect_addr).unwrap(); - - let (stdin_tx, stdin_rx) = futures_channel::mpsc::unbounded(); - tokio::spawn(read_stdin(stdin_tx)); - - let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); - println!("WebSocket handshake has been successfully completed"); - - let (write, read) = ws_stream.split(); - - let stdin_to_ws = stdin_rx.map(Ok).forward(write); - let ws_to_stdout = { - read.for_each(|message| async { - let data = message.unwrap().into_data(); - tokio::io::stdout().write_all(&data).await.unwrap(); - }) - }; - - pin_mut!(stdin_to_ws, ws_to_stdout); - future::select(stdin_to_ws, ws_to_stdout).await; -} - // Our helper method which will read data from stdin and send it along the // sender provided. async fn read_stdin(tx: futures_channel::mpsc::UnboundedSender) { let mut stdin = tokio::io::stdin(); + loop { let mut buf = vec![0; 1024]; - let n = match stdin.read(&mut buf).await { - Err(_) | Ok(0) => break, - Ok(n) => n, - }; - buf.truncate(n); + let n = stdin.read(&mut buf).await; + buf.truncate(n.unwrap()); tx.unbounded_send(Message::binary(buf)).unwrap(); } } + +#[tokio::main] +async fn main() -> Result<(), Box> { + // sleep to avoid racing the server on startup when using cargo watch + sleep(Duration::from_millis(200)).await; + + if let Some(connect_addr) = env::args().nth(1) { + let url = url::Url::parse(&connect_addr)?; + + let (stdin_tx, stdin_rx) = futures_channel::mpsc::unbounded(); + tokio::spawn(read_stdin(stdin_tx)); + + let (ws_stream, res) = connect_async(url).await?; + + println!( + "WebSocket handshake has been successfully completed\n{:#?}", + res + ); + + let (sink, stream) = ws_stream.split(); + + let stdin_to_ws = stdin_rx.map(Ok).forward(sink); + + let ws_to_stdout = { + stream.for_each(|message| async { + let data = message.unwrap().into_data(); + tokio::io::stdout().write_all(&data).await.unwrap(); + }) + }; + + pin_mut!(stdin_to_ws, ws_to_stdout); + future::select(stdin_to_ws, ws_to_stdout).await; + } else { + eprintln!("This program requires at least one argument!") + } + + Ok(()) +}