added datagrams rc signal and tx closure

This commit is contained in:
Maccesch 2023-08-11 13:48:53 +01:00
parent 40c9b7b92c
commit f2f926b76c
2 changed files with 134 additions and 5 deletions

View file

@ -58,6 +58,10 @@ features = [
"Navigator",
"NodeList",
"PointerEvent",
"ReadableStream",
"ReadableStreamByobReader",
"ReadableStreamGetReaderOptions",
"ReadableStreamReaderMode",
"ResizeObserver",
"ResizeObserverBoxOptions",
"ResizeObserverEntry",
@ -73,7 +77,10 @@ features = [
"WebSocket",
"WebTransport",
"WebTransportOptions",
"WebTransportDatagramDuplexStream",
"Window",
"WritableStream",
"WritableStreamDefaultWriter",
]
[features]

View file

@ -1,12 +1,15 @@
use crate::core::ConnectionReadyState;
use crate::utils::{CloneableFn, CloneableFnWithArg};
use default_struct_builder::DefaultBuilder;
use js_sys::{Reflect, Uint8Array};
use leptos::leptos_dom::helpers::TimeoutHandle;
use leptos::*;
use std::rc::Rc;
use std::time::Duration;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use web_sys::{
ReadableStream, ReadableStreamReaderMode, WebTransport, WritableStreamDefaultWriter,
};
///
///
@ -27,7 +30,7 @@ use wasm_bindgen_futures::JsFuture;
/// # view! { }
/// # }
/// ```
pub fn use_webtransport(url: &str) -> UseWebtransportReturn {
pub fn use_webtransport(url: &str) -> UseWebTransportReturn<impl Fn(Vec<u8>) + Clone + 'static> {
use_webtransport_with_options(url, UseWebTransportOptions::default())
}
@ -35,7 +38,7 @@ pub fn use_webtransport(url: &str) -> UseWebtransportReturn {
pub fn use_webtransport_with_options(
url: &str,
options: UseWebTransportOptions,
) -> UseWebtransportReturn {
) -> UseWebTransportReturn<impl Fn(Vec<u8>) + Clone + 'static> {
let UseWebTransportOptions {
on_open,
on_error,
@ -48,6 +51,8 @@ pub fn use_webtransport_with_options(
let (ready_state, set_ready_state) = create_signal(ConnectionReadyState::Closed);
let transport_ref = store_value(None::<web_sys::WebTransport>);
let datagrams_reader_initialized_ref = store_value(false);
let datagrams_writer_ref = store_value(None::<web_sys::WritableStreamDefaultWriter>);
let reconnect_timer_ref = store_value(None::<TimeoutHandle>);
let reconnect_count_ref = store_value(0_u64);
@ -132,11 +137,121 @@ pub fn use_webtransport_with_options(
}
};
UseWebtransportReturn {
let (datagrams_signal, set_datagrams) = create_signal(None::<Vec<u8>>);
let datagrams = Signal::derive(move || {
// lazily create datagrams reader listener
if ready_state.get() == ConnectionReadyState::Open {
initialize_datagrams_reader(
transport_ref,
datagrams_reader_initialized_ref,
set_datagrams,
);
}
datagrams_signal.get()
});
let send_datagrams = move |data| {
if let Some(transport) = transport_ref.get_value() {
let writer = get_or_create_datagrams_writer(datagrams_writer_ref, transport);
let _ = writer.write_with_chunk(&data.into());
}
};
// TODO : reliable streams
UseWebTransportReturn {
ready_state: ready_state.into(),
datagrams,
send_datagrams,
}
}
fn get_or_create_datagrams_writer(
datagrams_writer_ref: StoredValue<Option<WritableStreamDefaultWriter>>,
transport: WebTransport,
) -> WritableStreamDefaultWriter {
if let Some(writer) = datagrams_writer_ref.get_value() {
writer
} else {
let writer = transport
.datagrams()
.writable()
.get_writer()
.expect("should be able to get the writer");
datagrams_writer_ref.set_value(Some(writer.clone()));
writer
}
}
fn initialize_datagrams_reader(
transport_ref: StoredValue<Option<web_sys::WebTransport>>,
datagrams_reader_initialized_ref: StoredValue<bool>,
set_datagrams: WriteSignal<Option<Vec<u8>>>,
) {
if !datagrams_reader_initialized_ref.get_value() {
if let Some(transport) = transport_ref.get_value() {
datagrams_reader_initialized_ref.set_value(true);
listen_to_stream(
transport.datagrams().readable(),
move || datagrams_reader_initialized_ref.set_value(false),
set_datagrams,
);
}
}
}
fn listen_to_stream(
readable_stream: ReadableStream,
on_done: fn(),
set_signal: WriteSignal<Option<Vec<u8>>>,
) {
let mut reader_options = web_sys::ReadableStreamGetReaderOptions::new();
reader_options.mode(ReadableStreamReaderMode::Byob);
let reader: web_sys::ReadableStreamByobReader = readable_stream
.get_reader_with_options(&reader_options)
.into();
spawn_local(async move {
// the length value 4000 is taken from the MDN example
// https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamBYOBReader/read#examples
let mut buffer = [0_u8; 4000];
loop {
let result = JsFuture::from(reader.read_with_u8_array(&mut buffer)).await;
match result {
Ok(result) => {
let done = Reflect::get(&result, &"done".into())
.expect("done should always be there")
.as_bool()
.unwrap_or(true);
if done {
// TODO : close connection?
break;
}
let value: Uint8Array = Reflect::get(&result, &"value".into())
.expect("if not done there should be a value")
.into();
set_signal.set(Some(value.to_vec()))
}
Err(..) => {
// TODO : error handling?
break;
}
}
}
on_done();
});
}
/// Options for [`use_webtransport_with_options`].
#[derive(DefaultBuilder)]
pub struct UseWebTransportOptions {
@ -176,9 +291,16 @@ impl From<UseWebTransportOptions> for web_sys::WebTransportOptions {
}
/// Return type of [`use_webtransport`].
pub struct UseWebtransportReturn {
pub struct UseWebTransportReturn<SendGramsFn>
where
SendGramsFn: Fn(Vec<u8>) + Clone + 'static,
{
/// The current state of the `WebTransport` connection.
pub ready_state: Signal<ConnectionReadyState>,
/// Latest datagrams message received
pub datagrams: Signal<Option<Vec<u8>>>,
/// Sends binary data through the datagrams stream
pub send_datagrams: SendGramsFn,
}
/// Error enum for [`UseWebTransportOptions::on_error`]