From 25590582e3632ecb49f07dde927c31991afaab85 Mon Sep 17 00:00:00 2001 From: Maccesch Date: Tue, 22 Aug 2023 14:34:04 +0100 Subject: [PATCH] reliable streams --- Cargo.toml | 9 +- src/lib.rs | 1 - src/use_webtransport.rs | 375 +++++++++++++++++++++++++++++++++------- 3 files changed, 323 insertions(+), 62 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bfc4208..ec16722 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,9 @@ paste = "1" lazy_static = "1" cfg-if = "1" wasm-bindgen-futures = "0.4" +async-trait = "0.1" +rmp-serde = { version = "1.1", optional = true } +thiserror = "1" [dependencies.web-sys] version = "0.3" @@ -78,7 +81,10 @@ features = [ "WebTransport", "WebTransportOptions", "WebTransportDatagramDuplexStream", + "WebTransportBidirectionalStream", "Window", + "WebTransportReceiveStream", + "WebTransportSendStream", "WritableStream", "WritableStreamDefaultWriter", ] @@ -86,8 +92,9 @@ features = [ [features] docs = [] math = ["num"] -storage = ["serde", "serde_json", "web-sys/StorageEvent"] +storage = ["dep:serde", "dep:serde_json", "web-sys/StorageEvent"] ssr = [] +msgpack = ["dep:rmp-serde", "dep:serde"] [package.metadata.docs.rs] all-features = true diff --git a/src/lib.rs b/src/lib.rs index 3e944ac..c322d6f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,7 +57,6 @@ mod use_supported; mod use_throttle_fn; mod use_to_string; mod use_websocket; -mod use_webtransport; mod use_window_focus; mod use_window_scroll; mod watch_debounced; diff --git a/src/use_webtransport.rs b/src/use_webtransport.rs index c856e8d..2202082 100644 --- a/src/use_webtransport.rs +++ b/src/use_webtransport.rs @@ -1,14 +1,27 @@ use crate::core::ConnectionReadyState; +use async_trait::async_trait; +use cfg_if::cfg_if; use default_struct_builder::DefaultBuilder; -use js_sys::{Reflect, Uint8Array}; +use js_sys::Reflect; use leptos::leptos_dom::helpers::TimeoutHandle; use leptos::*; use std::cell::{Cell, RefCell}; +use std::future::Future; use std::rc::Rc; use std::time::Duration; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; +#[cfg(any(feature = "bincode", feature = "msgpack"))] +use serde::{Deserialize, Serialize}; + +#[cfg(feature = "msgpack")] +use rmp_serde::{from_slice, to_vec}; +use thiserror::Error; + +#[cfg(feature = "bincode")] +use bincode::serde::{decode_from_slice as from_slice, encode_to_vec as to_vec}; + /// /// /// ## Demo @@ -28,7 +41,7 @@ use wasm_bindgen_futures::JsFuture; /// # view! { } /// # } /// ``` -pub fn use_webtransport(url: &str) -> UseWebTransportReturn) + Clone + 'static> { +pub fn use_webtransport(url: &str) -> UseWebTransportReturn { use_webtransport_with_options(url, UseWebTransportOptions::default()) } @@ -36,17 +49,21 @@ pub fn use_webtransport(url: &str) -> UseWebTransportReturn) + C pub fn use_webtransport_with_options( url: &str, options: UseWebTransportOptions, -) -> UseWebTransportReturn) + Clone + 'static> { +) -> UseWebTransportReturn { let UseWebTransportOptions { on_open, on_error, on_close, + on_receive_stream, + on_bidir_stream, reconnect_limit, reconnect_interval, immediate, } = options; + let url = url.to_string(); let (ready_state, set_ready_state) = create_signal(ConnectionReadyState::Closed); + let ready_state: Signal<_> = ready_state.into(); let transport = Rc::new(RefCell::new(None::)); let datagrams_reader_initialized = Rc::new(Cell::new(false)); @@ -86,6 +103,7 @@ pub fn use_webtransport_with_options( connect_ref.set_value(Some(Rc::new({ let transport = Rc::clone(&transport); let reconnect_timer = Rc::clone(&reconnect_timer); + let on_open = Rc::clone(&on_open); move || { let transport = transport.borrow(); @@ -96,20 +114,24 @@ pub fn use_webtransport_with_options( transport.close(); } - let transport = - web_sys::WebTransport::new_with_options(url, &options.into()).unwrap_throw(); + let options = web_sys::WebTransportOptions::new(); + let transport = web_sys::WebTransport::new_with_options(&url, &options).unwrap_throw(); set_ready_state.set(ConnectionReadyState::Connecting); - spawn_local(async move { - match JsFuture::from(transport.ready()).await { - Ok(_) => { - set_ready_state.set(ConnectionReadyState::Open); - on_open(); - } - Err(e) => { - // TODO : handle error? - set_ready_state.set(ConnectionReadyState::Closed); + spawn_local({ + let on_open = Rc::clone(&on_open); + + async move { + match JsFuture::from(transport.ready()).await { + Ok(_) => { + set_ready_state.set(ConnectionReadyState::Open); + on_open(); + } + Err(e) => { + // TODO : handle error? + set_ready_state.set(ConnectionReadyState::Closed); + } } } }); @@ -162,10 +184,19 @@ pub fn use_webtransport_with_options( let datagrams_reader_initialized = Rc::clone(&datagrams_reader_initialized); move || { - lazy_initialize_reader( + let transport = Rc::clone(&transport); + + lazy_initialize_u8_reader( ready_state, - transport, - datagrams_reader_initialized, + Rc::clone(&datagrams_reader_initialized), + move || { + transport + .borrow() + .as_ref() + .expect("transport should be set a this point") + .datagrams() + .readable() + }, set_datagrams, ); @@ -173,19 +204,6 @@ pub fn use_webtransport_with_options( } }); - let send_datagrams = { - let transport = Rc::clone(&transport); - let datagrams_writer = Rc::clone(&datagrams_writer); - - move |data| { - if let Some(transport) = transport.borrow().as_ref() { - let writer = get_or_create_datagrams_writer(datagrams_writer, transport); - - let _ = writer.write_with_chunk(&data.into()); - } - } - }; - // TODO : reliable streams on_cleanup(move || { @@ -194,9 +212,10 @@ pub fn use_webtransport_with_options( }); UseWebTransportReturn { - ready_state: ready_state.into(), + transport, + ready_state, datagrams, - send_datagrams, + datagrams_writer, } } @@ -217,38 +236,53 @@ fn get_or_create_datagrams_writer( } } -fn lazy_initialize_reader( - ready_state: ReadSignal, - transport: Rc>>, +fn lazy_initialize_u8_reader( + ready_state: Signal, initialized: Rc>, - set_datagrams: WriteSignal>>, + get_readable_stream: impl Fn() -> web_sys::ReadableStream, + set_signal: WriteSignal>>, +) { + lazy_initialize_reader( + ready_state, + initialized, + get_readable_stream, + move |value| { + let value: js_sys::Uint8Array = value.into(); + set_signal.set(Some(value.to_vec())); + }, + ); +} + +fn lazy_initialize_reader( + ready_state: Signal, + initialized: Rc>, + get_readable_stream: impl Fn() -> web_sys::ReadableStream, + on_value: impl Fn(JsValue) + 'static, ) { if ready_state.get() == ConnectionReadyState::Open { if !initialized.get() { - if let Some(transport) = transport.borrow().as_ref() { - initialized.set(true); + initialized.set(true); - listen_to_stream( - transport.datagrams().readable(), - move || initialized.set(false), - set_datagrams, - ); - } + listen_to_stream( + get_readable_stream(), + move || initialized.set(false), + on_value, + ); } } } fn listen_to_stream( readable_stream: web_sys::ReadableStream, - on_done: fn(), - set_signal: WriteSignal>>, + on_done: impl Fn() + 'static, + on_value: impl Fn(JsValue) + 'static, ) { let mut reader_options = web_sys::ReadableStreamGetReaderOptions::new(); reader_options.mode(web_sys::ReadableStreamReaderMode::Byob); let reader: web_sys::ReadableStreamByobReader = readable_stream .get_reader_with_options(&reader_options) - .into(); + .unchecked_into(); spawn_local(async move { // the length value 4000 is taken from the MDN example @@ -269,11 +303,10 @@ fn listen_to_stream( break; } - let value: Uint8Array = Reflect::get(&result, &"value".into()) - .expect("if not done there should be a value") - .into(); + let value = Reflect::get(&result, &"value".into()) + .expect("if not done there should be a value"); - set_signal.set(Some(value.to_vec())) + on_value(value); } Err(..) => { // TODO : error handling? @@ -298,6 +331,12 @@ pub struct UseWebTransportOptions { /// Callback when `WebTransport` is closed. on_close: Rc, + /// Callback when the server opens a one-way stream. + on_receive_stream: Rc, + + /// Callback when the server opens a bidirectional stream. + on_bidir_stream: Rc, + /// Retry times. Defaults to 3. reconnect_limit: u64, @@ -316,6 +355,8 @@ impl Default for UseWebTransportOptions { on_open: Rc::new(|| {}), on_error: Rc::new(|_| {}), on_close: Rc::new(|| {}), + on_receive_stream: Rc::new(|_| {}), + on_bidir_stream: Rc::new(|_| {}), reconnect_limit: 3, reconnect_interval: 3000, immediate: true, @@ -323,24 +364,238 @@ impl Default for UseWebTransportOptions { } } -impl From for web_sys::WebTransportOptions { - fn from(options: UseWebTransportOptions) -> Self { - web_sys::WebTransportOptions::new() +#[async_trait(?Send)] +pub trait CloseableStream { + fn close(&self); + + async fn close_async(&self) -> Result<(), WebTransportError>; +} + +#[async_trait(?Send)] +pub trait SendableStream { + fn writer(&self) -> &web_sys::WritableStreamDefaultWriter; + fn send_bytes(&self, data: &[u8]) { + let arr = js_sys::Uint8Array::from(data); + let _ = self.writer().write_with_chunk(&arr); + } + + async fn send_bytes_async(&self, data: &[u8]) -> Result<(), SendError> { + let arr = js_sys::Uint8Array::from(data); + let _ = JsFuture::from(self.writer().write_with_chunk(&arr)) + .await + .map_err(|e| SendError::FailedToWrite(e)); + + Ok(()) + } + + #[cfg(any(feature = "msgpack", feature = "bincode"))] + fn send(&self, data: &T) { + self.send_bytes( + to_vec(data) + .expect("Serialization should not fail") + .as_slice(), + ); + } + + #[cfg(any(feature = "msgpack", feature = "bincode"))] + async fn send_async(&self, data: &T) -> Result<(), SendError> { + let serialized = to_vec(data)?; + self.send_bytes_async(&serialized).await + } +} + +pub struct SendStream { + pub writer: web_sys::WritableStreamDefaultWriter, +} + +#[async_trait(?Send)] +impl SendableStream for SendStream { + #[inline(always)] + fn writer(&self) -> &web_sys::WritableStreamDefaultWriter { + &self.writer + } +} + +#[async_trait(?Send)] +impl CloseableStream for SendStream { + fn close(&self) { + let _ = self.writer.close(); + } + + async fn close_async(&self) -> Result<(), WebTransportError> { + let _ = JsFuture::from(self.writer.close()) + .await + .map_err(|e| WebTransportError::OnCloseWriter(e))?; + + Ok(()) + } +} + +pub struct ReceiveStream { + pub reader: web_sys::ReadableStreamByobReader, + pub message: Signal>>, + pub close: Rc, +} + +pub struct BidirStream { + pub writer: web_sys::WritableStreamDefaultWriter, + pub bytes: Signal>>, +} + +cfg_if! { if #[cfg(any(feature = "msgpack", feature = "bincode"))] { + impl BidirStream { + pub fn receive Deserialize<'a>>(&self) -> Signal> { + let bytes = self.bytes; + + Signal::derive(move || { + bytes.get().map(|bytes| from_slice(bytes.as_slice()).expect("Deserialization should not fail")) + }) + } + + pub fn try_receive Deserialize<'a>>(&self) -> Signal>> { + let bytes = self.bytes; + + Signal::derive(move || { + bytes.get().map(|bytes| Ok(from_slice(bytes.as_slice())?)) + }) + } + } +}} + +#[async_trait(?Send)] +impl CloseableStream for BidirStream { + fn close(&self) { + let _ = self.writer.close(); + } + + async fn close_async(&self) -> Result<(), WebTransportError> { + let _ = JsFuture::from(self.writer.close()) + .await + .map_err(|e| WebTransportError::OnCloseWriter(e))?; + + Ok(()) } } /// Return type of [`use_webtransport`]. -pub struct UseWebTransportReturn -where - SendGramsFn: Fn(Vec) + Clone + 'static, -{ +pub struct UseWebTransportReturn { + transport: Rc>>, + datagrams_writer: Rc>>, + /// The current state of the `WebTransport` connection. pub ready_state: Signal, /// Latest datagrams message received pub datagrams: Signal>>, +} + +impl UseWebTransportReturn { + /// Access to the underlying `WebTransport` + pub async fn transport(&self) -> Option { + self.transport.borrow().clone() + } + /// Sends binary data through the datagrams stream - pub send_datagrams: SendGramsFn, + pub fn send_datagrams(&self, data: &[u8]) { + if let Some(transport) = self.transport.borrow().as_ref() { + let writer = + get_or_create_datagrams_writer(Rc::clone(&self.datagrams_writer), transport); + + let arr = js_sys::Uint8Array::from(data); + let _ = writer.write_with_chunk(&arr); + } + } + + // TODO : send_datagrams_async + + /// Create a unidirectional send stream + pub async fn create_send_stream(&self) -> Result { + if let Some(transport) = self.transport.borrow().as_ref() { + let result = JsFuture::from(transport.create_unidirectional_stream()) + .await + .map_err(|e| WebTransportError::FailedToOpenStream(e))?; + let stream: web_sys::WritableStream = result.unchecked_into(); + let writer = stream + .get_writer() + .map_err(|e| WebTransportError::FailedToOpenWriter(e))?; + + Ok(SendStream { writer }) + } else { + Err(WebTransportError::NotConnected) + } + } + + /// Create a bidirectional stream + pub async fn create_bidir_stream(&self) -> Result { + if let Some(transport) = self.transport.borrow().as_ref() { + let result = JsFuture::from(transport.create_bidirectional_stream()) + .await + .map_err(|e| WebTransportError::FailedToOpenStream(e))?; + let stream: web_sys::WebTransportBidirectionalStream = result.unchecked_into(); + let writer = stream + .writable() + .get_writer() + .map_err(|e| WebTransportError::FailedToOpenWriter(e))?; + + let bytes = Signal::derive({ + let reader_initialized = Rc::new(Cell::new(false)); + let ready_state = self.ready_state; + let (message_signal, set_message) = create_signal(None::>); + let stream = stream.clone(); + + move || { + let stream = stream.clone(); + + lazy_initialize_u8_reader( + ready_state, + Rc::clone(&reader_initialized), + move || stream.readable().unchecked_into(), + set_message, + ); + + message_signal.get() + } + }); + + Ok(BidirStream { writer, bytes }) + } else { + Err(WebTransportError::NotConnected) + } + } } /// Error enum for [`UseWebTransportOptions::on_error`] -pub enum WebTransportError {} +pub enum WebTransportError { + /// The `WebTransport` is not connected yet. Call `open` first. + NotConnected, + FailedToOpenStream(JsValue), + FailedToOpenWriter(JsValue), + FailedToOpenReader(JsValue), + FailedToRead(JsValue), + OnCloseWriter(JsValue), + OnCloseReader(JsValue), +} + +#[derive(Error, Debug)] +pub enum SendError { + #[error("Failed to write to stream")] + FailedToWrite(JsValue), + + #[cfg(feature = "bincode")] + #[error("Serialization failed: {0}")] + SerializationFailed(#[from] bincode::Error), + + #[cfg(feature = "msgpack")] + #[error("Serialization failed: {0}")] + SerializationFailed(#[from] rmp_serde::encode::Error), +} + +#[derive(Error, Debug)] +pub enum ReceiveError { + #[cfg(feature = "bincode")] + #[error("Serialization failed: {0}")] + SerializationFailed(#[from] bincode::Error), + + #[cfg(feature = "msgpack")] + #[error("Serialization failed: {0}")] + DeserializationFailed(#[from] rmp_serde::decode::Error), +}