From f2f926b76c6f6f218a83ed89a22b8a530f954284 Mon Sep 17 00:00:00 2001 From: Maccesch Date: Fri, 11 Aug 2023 13:48:53 +0100 Subject: [PATCH] added datagrams rc signal and tx closure --- Cargo.toml | 7 +++ src/use_webtransport.rs | 132 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 134 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9d4dfd2..bfc4208 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,10 @@ features = [ "Navigator", "NodeList", "PointerEvent", + "ReadableStream", + "ReadableStreamByobReader", + "ReadableStreamGetReaderOptions", + "ReadableStreamReaderMode", "ResizeObserver", "ResizeObserverBoxOptions", "ResizeObserverEntry", @@ -73,7 +77,10 @@ features = [ "WebSocket", "WebTransport", "WebTransportOptions", + "WebTransportDatagramDuplexStream", "Window", + "WritableStream", + "WritableStreamDefaultWriter", ] [features] diff --git a/src/use_webtransport.rs b/src/use_webtransport.rs index e928e90..f13139c 100644 --- a/src/use_webtransport.rs +++ b/src/use_webtransport.rs @@ -1,12 +1,15 @@ use crate::core::ConnectionReadyState; -use crate::utils::{CloneableFn, CloneableFnWithArg}; use default_struct_builder::DefaultBuilder; +use js_sys::{Reflect, Uint8Array}; use leptos::leptos_dom::helpers::TimeoutHandle; use leptos::*; use std::rc::Rc; use std::time::Duration; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; +use web_sys::{ + ReadableStream, ReadableStreamReaderMode, WebTransport, WritableStreamDefaultWriter, +}; /// /// @@ -27,7 +30,7 @@ use wasm_bindgen_futures::JsFuture; /// # view! { } /// # } /// ``` -pub fn use_webtransport(url: &str) -> UseWebtransportReturn { +pub fn use_webtransport(url: &str) -> UseWebTransportReturn) + Clone + 'static> { use_webtransport_with_options(url, UseWebTransportOptions::default()) } @@ -35,7 +38,7 @@ pub fn use_webtransport(url: &str) -> UseWebtransportReturn { pub fn use_webtransport_with_options( url: &str, options: UseWebTransportOptions, -) -> UseWebtransportReturn { +) -> UseWebTransportReturn) + Clone + 'static> { let UseWebTransportOptions { on_open, on_error, @@ -48,6 +51,8 @@ pub fn use_webtransport_with_options( let (ready_state, set_ready_state) = create_signal(ConnectionReadyState::Closed); let transport_ref = store_value(None::); + let datagrams_reader_initialized_ref = store_value(false); + let datagrams_writer_ref = store_value(None::); let reconnect_timer_ref = store_value(None::); let reconnect_count_ref = store_value(0_u64); @@ -132,11 +137,121 @@ pub fn use_webtransport_with_options( } }; - UseWebtransportReturn { + let (datagrams_signal, set_datagrams) = create_signal(None::>); + + let datagrams = Signal::derive(move || { + // lazily create datagrams reader listener + if ready_state.get() == ConnectionReadyState::Open { + initialize_datagrams_reader( + transport_ref, + datagrams_reader_initialized_ref, + set_datagrams, + ); + } + + datagrams_signal.get() + }); + + let send_datagrams = move |data| { + if let Some(transport) = transport_ref.get_value() { + let writer = get_or_create_datagrams_writer(datagrams_writer_ref, transport); + + let _ = writer.write_with_chunk(&data.into()); + } + }; + + // TODO : reliable streams + + UseWebTransportReturn { ready_state: ready_state.into(), + datagrams, + send_datagrams, } } +fn get_or_create_datagrams_writer( + datagrams_writer_ref: StoredValue>, + transport: WebTransport, +) -> WritableStreamDefaultWriter { + if let Some(writer) = datagrams_writer_ref.get_value() { + writer + } else { + let writer = transport + .datagrams() + .writable() + .get_writer() + .expect("should be able to get the writer"); + datagrams_writer_ref.set_value(Some(writer.clone())); + writer + } +} + +fn initialize_datagrams_reader( + transport_ref: StoredValue>, + datagrams_reader_initialized_ref: StoredValue, + set_datagrams: WriteSignal>>, +) { + if !datagrams_reader_initialized_ref.get_value() { + if let Some(transport) = transport_ref.get_value() { + datagrams_reader_initialized_ref.set_value(true); + + listen_to_stream( + transport.datagrams().readable(), + move || datagrams_reader_initialized_ref.set_value(false), + set_datagrams, + ); + } + } +} + +fn listen_to_stream( + readable_stream: ReadableStream, + on_done: fn(), + set_signal: WriteSignal>>, +) { + let mut reader_options = web_sys::ReadableStreamGetReaderOptions::new(); + reader_options.mode(ReadableStreamReaderMode::Byob); + + let reader: web_sys::ReadableStreamByobReader = readable_stream + .get_reader_with_options(&reader_options) + .into(); + + spawn_local(async move { + // the length value 4000 is taken from the MDN example + // https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamBYOBReader/read#examples + let mut buffer = [0_u8; 4000]; + + loop { + let result = JsFuture::from(reader.read_with_u8_array(&mut buffer)).await; + match result { + Ok(result) => { + let done = Reflect::get(&result, &"done".into()) + .expect("done should always be there") + .as_bool() + .unwrap_or(true); + + if done { + // TODO : close connection? + break; + } + + let value: Uint8Array = Reflect::get(&result, &"value".into()) + .expect("if not done there should be a value") + .into(); + + set_signal.set(Some(value.to_vec())) + } + Err(..) => { + // TODO : error handling? + break; + } + } + } + + on_done(); + }); +} + /// Options for [`use_webtransport_with_options`]. #[derive(DefaultBuilder)] pub struct UseWebTransportOptions { @@ -176,9 +291,16 @@ impl From for web_sys::WebTransportOptions { } /// Return type of [`use_webtransport`]. -pub struct UseWebtransportReturn { +pub struct UseWebTransportReturn +where + SendGramsFn: Fn(Vec) + Clone + 'static, +{ /// The current state of the `WebTransport` connection. pub ready_state: Signal, + /// Latest datagrams message received + pub datagrams: Signal>>, + /// Sends binary data through the datagrams stream + pub send_datagrams: SendGramsFn, } /// Error enum for [`UseWebTransportOptions::on_error`]