client opened bidir streams work

This commit is contained in:
Maccesch 2023-08-31 20:12:48 +01:00
parent 9493b690ff
commit d2a91b4d6d
5 changed files with 145 additions and 15 deletions

View file

@ -0,0 +1,12 @@
use leptos::*;
#[component]
pub fn LogDisplay(#[prop(into)] log: Signal<Vec<String>>) -> impl IntoView {
view! {
<div>
<ul>
{move || log().iter().map(|l| view! { <li>{l}</li> }).collect::<Vec<_>>()}
</ul>
</div>
}
}

View file

@ -3,10 +3,20 @@ use leptos_use::core::ConnectionReadyState;
use leptos_use::docs::demo_or_body;
use leptos_use::{use_webtransport_with_options, UseWebTransportOptions};
mod log_display;
mod stream_bidir;
mod stream_send;
use log_display::*;
use stream_bidir::*;
use stream_send::*;
#[component]
fn Demo() -> impl IntoView {
let (datagrams_log, set_datagrams_log) = create_signal(vec![]);
let id = store_value(0);
let transport = use_webtransport_with_options(
"https://echo.webtransport.day",
UseWebTransportOptions::default()
@ -15,16 +25,15 @@ fn Demo() -> impl IntoView {
})
.on_close(move || {
set_datagrams_log.update(|log| log.push("Connection closed".to_string()))
})
.on_error(move |e| set_datagrams_log.update(|log| log.push(format!("Error: {:?}", e)))),
}),
);
let (text, set_text) = create_signal("".to_string());
let on_send = {
let on_send_datagrams = {
let transport = transport.clone();
move |e| {
move |_| {
set_datagrams_log.update(|log| log.push(format!("Sent datagram: '{}'", text())));
transport.send_datagrams(text().as_bytes());
@ -47,18 +56,48 @@ fn Demo() -> impl IntoView {
false,
);
let (bidir_streams, set_bidir_streams) = create_signal(vec![]);
let on_open_bidir_stream = {
let transport = transport.clone();
move |_| {
let transport = transport.clone();
spawn_local(async move {
match transport.create_bidir_stream().await {
Ok(bidir_stream) => {
let i = id.get_value();
id.set_value(i + 1);
set_bidir_streams.update(|s| s.push((i, bidir_stream)));
}
Err(e) => {
set_datagrams_log.update(|log| {
log.push(format!("Failed to open bidir stream: {:?}", e))
});
}
}
});
}
};
let ready_state = transport.ready_state;
view! {
<button on:click=on_open_bidir_stream>"Open Bidir Stream"</button>
<h2>Datagrams</h2>
<textarea on:change=move |e| set_text(event_target_value(&e)) prop:value=text />
<button on:click=on_send disabled=move || ready_state() != ConnectionReadyState::Open>"Send"</button>
<button on:click=on_send_datagrams disabled=move || ready_state() != ConnectionReadyState::Open>"Send"</button>
<div>
<ul>
{move || datagrams_log().iter().map(|l| view! { <li>{l}</li> }).collect::<Vec<_>>()}
</ul>
</div>
<LogDisplay log=datagrams_log />
<h2>Bidir Streams</h2>
<For
each=bidir_streams
key=|(i, _)| *i
view=move |(_, bidir_stream)| view! { <StreamBidir ready_state=ready_state stream=bidir_stream.clone() /> }
/>
}
}

View file

@ -0,0 +1,36 @@
use crate::{LogDisplay, StreamSend};
use leptos::*;
use leptos_use::core::ConnectionReadyState;
use leptos_use::BidirStream;
#[component]
pub fn StreamBidir(
#[prop(into)] ready_state: Signal<ConnectionReadyState>,
stream: BidirStream,
) -> impl IntoView {
let (log, set_log) = create_signal(vec![]);
let on_send = move |msg| {
set_log.update(|log| log.push(format!("Sent: '{}'", msg)));
};
let _ = watch(
stream.bytes,
move |bytes, _, _| {
if let Some(bytes) = bytes {
set_log.update(|log| {
log.push(format!(
"Received datagrams: '{}'",
String::from_utf8(bytes.clone()).expect("valid utf8")
))
});
}
},
false,
);
view! {
<StreamSend ready_state=ready_state send_stream=stream.clone() on_send=on_send />
<LogDisplay log=log />
}
}

View file

@ -0,0 +1,29 @@
use leptos::*;
use leptos_use::core::ConnectionReadyState;
use leptos_use::SendableStream;
#[component]
pub fn StreamSend<S, F>(
#[prop(into)] ready_state: Signal<ConnectionReadyState>,
send_stream: S,
on_send: F,
) -> impl IntoView
where
S: SendableStream + 'static,
F: Fn(String) + 'static,
{
let (text, set_text) = create_signal("".to_string());
let on_send = {
move |_| {
send_stream.send_bytes(text().as_bytes());
on_send(text());
set_text("".to_string());
}
};
view! {
<textarea on:change=move |e| set_text(event_target_value(&e)) prop:value=text />
<button on:click=on_send disabled=move || ready_state() != ConnectionReadyState::Open>"Send"</button>
}
}

View file

@ -6,7 +6,6 @@ use js_sys::Reflect;
use leptos::leptos_dom::helpers::TimeoutHandle;
use leptos::*;
use std::cell::{Cell, RefCell};
use std::future::Future;
use std::rc::Rc;
use std::time::Duration;
use wasm_bindgen::prelude::*;
@ -52,7 +51,7 @@ pub fn use_webtransport_with_options(
) -> UseWebTransportReturn {
let UseWebTransportOptions {
on_open,
on_error,
// on_error,
on_close,
on_receive_stream,
on_bidir_stream,
@ -344,7 +343,7 @@ pub struct UseWebTransportOptions {
on_open: Rc<dyn Fn()>,
/// Error callback.
on_error: Rc<dyn Fn(WebTransportError)>,
// TODO : ? on_error: Rc<dyn Fn(WebTransportError)>,
/// Callback when `WebTransport` is closed.
on_close: Rc<dyn Fn()>,
@ -371,7 +370,7 @@ impl Default for UseWebTransportOptions {
fn default() -> Self {
Self {
on_open: Rc::new(|| {}),
on_error: Rc::new(|_| {}),
// on_error: Rc::new(|_| {}),
on_close: Rc::new(|| {}),
on_receive_stream: Rc::new(|_| {}),
on_bidir_stream: Rc::new(|_| {}),
@ -392,6 +391,7 @@ pub trait CloseableStream {
#[async_trait(?Send)]
pub trait SendableStream {
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter;
fn send_bytes(&self, data: &[u8]) {
let arr = js_sys::Uint8Array::from(data);
let _ = self.writer().write_with_chunk(&arr);
@ -422,6 +422,7 @@ pub trait SendableStream {
}
}
#[derive(Clone, Debug)]
pub struct SendStream {
pub writer: web_sys::WritableStreamDefaultWriter,
}
@ -449,12 +450,16 @@ impl CloseableStream for SendStream {
}
}
#[derive(Clone, Debug)]
pub struct ReceiveStream {
pub reader: web_sys::ReadableStreamDefaultReader,
pub message: Signal<Option<Vec<u8>>>,
pub close: Rc<dyn Fn()>,
// pub close: Rc<dyn Fn()>,
}
// TODO : implement ReceiveStream
#[derive(Clone, Debug)]
pub struct BidirStream {
pub writer: web_sys::WritableStreamDefaultWriter,
pub bytes: Signal<Option<Vec<u8>>>,
@ -495,6 +500,14 @@ impl CloseableStream for BidirStream {
}
}
#[async_trait(?Send)]
impl SendableStream for BidirStream {
#[inline(always)]
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter {
&self.writer
}
}
/// Return type of [`use_webtransport`].
#[derive(Clone, Debug)]
pub struct UseWebTransportReturn {
@ -595,6 +608,7 @@ pub enum WebTransportError {
OnCloseReader(JsValue),
}
/// Error enum for [`SendStream::send`]
#[derive(Error, Debug)]
pub enum SendError {
#[error("Failed to write to stream")]