datagrams work!

This commit is contained in:
Maccesch 2023-08-31 18:26:03 +01:00
parent 6370694044
commit fb9dfc7b76
3 changed files with 75 additions and 21 deletions

View file

@ -62,7 +62,7 @@ features = [
"NodeList", "NodeList",
"PointerEvent", "PointerEvent",
"ReadableStream", "ReadableStream",
"ReadableStreamByobReader", "ReadableStreamDefaultReader",
"ReadableStreamGetReaderOptions", "ReadableStreamGetReaderOptions",
"ReadableStreamReaderMode", "ReadableStreamReaderMode",
"ResizeObserver", "ResizeObserver",

View file

@ -1,10 +1,17 @@
use leptos::*; use leptos::*;
use leptos_use::core::ConnectionReadyState;
use leptos_use::docs::demo_or_body; use leptos_use::docs::demo_or_body;
use leptos_use::use_webtransport; use leptos_use::{use_webtransport_with_options, UseWebTransportOptions};
#[component] #[component]
fn Demo() -> impl IntoView { fn Demo() -> impl IntoView {
let transport = use_webtransport("https://echo.webtransport.day"); let (log, set_log) = create_signal(vec![]);
let transport = use_webtransport_with_options(
"https://echo.webtransport.day",
UseWebTransportOptions::default()
.on_error(|e| set_log.update(|log| log.push(format!("Error: {:?}", e)))),
);
let (text, set_text) = create_signal("".to_string()); let (text, set_text) = create_signal("".to_string());
@ -12,13 +19,57 @@ fn Demo() -> impl IntoView {
let transport = transport.clone(); let transport = transport.clone();
move |e| { move |e| {
set_log.update(|log| log.push(format!("Sent datagram: '{}'", text())));
transport.send_datagrams(text().as_bytes()); transport.send_datagrams(text().as_bytes());
} }
}; };
let ready_state = transport.ready_state;
let _ = watch(
ready_state,
move |ready, prev_ready, _| {
if ready == &ConnectionReadyState::Open
&& prev_ready.unwrap_or(&ConnectionReadyState::Closed)
!= &ConnectionReadyState::Open
{
set_log.update(|log| log.push("Connection opened".to_string()));
} else if ready == &ConnectionReadyState::Closed
&& prev_ready.unwrap_or(&ConnectionReadyState::Open)
!= &ConnectionReadyState::Closed
{
set_log.update(|log| log.push("Connection closed".to_string()));
}
},
false,
);
let _ = watch(
transport.datagrams,
move |grams, _, _| {
if let Some(grams) = grams {
set_log.update(|log| {
log.push(format!(
"Received datagrams: '{}'",
String::from_utf8(grams.clone()).expect("valid utf8")
))
});
}
},
false,
);
view! { view! {
<h2>Datagrams</h2>
<textarea on:change=move |e| set_text(event_target_value(&e))/> <textarea on:change=move |e| set_text(event_target_value(&e))/>
<button on:click=on_send>"Send"</button> <button on:click=on_send disabled=move || ready_state() != ConnectionReadyState::Open>"Send"</button>
<div>
<ul>
{move || log().iter().map(|l| view! { <li>{l}</li> }).collect::<Vec<_>>()}
</ul>
</div>
} }
} }

View file

@ -106,23 +106,27 @@ pub fn use_webtransport_with_options(
let on_open = Rc::clone(&on_open); let on_open = Rc::clone(&on_open);
move || { move || {
let transport = transport.borrow();
reconnect_timer.set(None); reconnect_timer.set(None);
if let Some(transport) = transport.as_ref() { if let Some(transport) = transport.borrow().as_ref() {
transport.close(); transport.close();
} }
let options = web_sys::WebTransportOptions::new(); let options = web_sys::WebTransportOptions::new();
let transport = web_sys::WebTransport::new_with_options(&url, &options).unwrap_throw(); transport.replace(Some(
web_sys::WebTransport::new_with_options(&url, &options).unwrap_throw(),
));
set_ready_state.set(ConnectionReadyState::Connecting); set_ready_state.set(ConnectionReadyState::Connecting);
spawn_local({ spawn_local({
let on_open = Rc::clone(&on_open); let on_open = Rc::clone(&on_open);
let transport = Rc::clone(&transport);
async move { async move {
let transport = transport.borrow();
let transport = transport.as_ref().expect("Transport should be set");
match JsFuture::from(transport.ready()).await { match JsFuture::from(transport.ready()).await {
Ok(_) => { Ok(_) => {
set_ready_state.set(ConnectionReadyState::Open); set_ready_state.set(ConnectionReadyState::Open);
@ -229,6 +233,10 @@ pub fn use_webtransport_with_options(
}); });
} }
if immediate {
open();
}
UseWebTransportReturn { UseWebTransportReturn {
transport, transport,
ready_state, ready_state,
@ -241,7 +249,9 @@ fn get_or_create_datagrams_writer(
datagrams_writer: Rc<RefCell<Option<web_sys::WritableStreamDefaultWriter>>>, datagrams_writer: Rc<RefCell<Option<web_sys::WritableStreamDefaultWriter>>>,
transport: &web_sys::WebTransport, transport: &web_sys::WebTransport,
) -> web_sys::WritableStreamDefaultWriter { ) -> web_sys::WritableStreamDefaultWriter {
if let Some(writer) = datagrams_writer.borrow().clone() { let writer = datagrams_writer.borrow().clone();
if let Some(writer) = writer {
writer writer
} else { } else {
let writer = transport let writer = transport
@ -293,20 +303,12 @@ fn listen_to_stream(
on_value: impl Fn(JsValue) + 'static, on_value: impl Fn(JsValue) + 'static,
on_done: impl Fn() + 'static, on_done: impl Fn() + 'static,
) { ) {
let mut reader_options = web_sys::ReadableStreamGetReaderOptions::new(); let reader: web_sys::ReadableStreamDefaultReader =
reader_options.mode(web_sys::ReadableStreamReaderMode::Byob); readable_stream.get_reader().unchecked_into();
let reader: web_sys::ReadableStreamByobReader = readable_stream
.get_reader_with_options(&reader_options)
.unchecked_into();
spawn_local(async move { spawn_local(async move {
// the length value 4000 is taken from the MDN example
// https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamBYOBReader/read#examples
let mut buffer = [0_u8; 4000];
loop { loop {
let result = JsFuture::from(reader.read_with_u8_array(&mut buffer)).await; let result = JsFuture::from(reader.read()).await;
match result { match result {
Ok(result) => { Ok(result) => {
let done = Reflect::get(&result, &"done".into()) let done = Reflect::get(&result, &"done".into())
@ -448,7 +450,7 @@ impl CloseableStream for SendStream {
} }
pub struct ReceiveStream { pub struct ReceiveStream {
pub reader: web_sys::ReadableStreamByobReader, pub reader: web_sys::ReadableStreamDefaultReader,
pub message: Signal<Option<Vec<u8>>>, pub message: Signal<Option<Vec<u8>>>,
pub close: Rc<dyn Fn()>, pub close: Rc<dyn Fn()>,
} }
@ -581,6 +583,7 @@ impl UseWebTransportReturn {
} }
/// Error enum for [`UseWebTransportOptions::on_error`] /// Error enum for [`UseWebTransportOptions::on_error`]
#[derive(Debug)]
pub enum WebTransportError { pub enum WebTransportError {
/// The `WebTransport` is not connected yet. Call `open` first. /// The `WebTransport` is not connected yet. Call `open` first.
NotConnected, NotConnected,