added incoming stream handlers

This commit is contained in:
Maccesch 2023-09-14 15:24:54 +01:00
parent 901bfca95a
commit 7cabae75b6
14 changed files with 760 additions and 116 deletions

View file

@ -65,7 +65,7 @@ fn Demo() -> impl IntoView {
let transport = transport.clone();
spawn_local(async move {
match transport.create_bidir_stream().await {
match transport.open_bidir_stream().await {
Ok(bidir_stream) => {
let i = id.get_value();
id.set_value(i + 1);

View file

@ -0,0 +1,16 @@
[package]
name = "use_webtransport"
version = "0.1.0"
edition = "2021"
[dependencies]
leptos = { version = "0.5.0-beta2", features = ["nightly", "csr"] }
console_error_panic_hook = "0.1"
console_log = "1"
log = "0.4"
leptos-use = { path = "../..", features = ["docs"] }
web-sys = "0.3"
[dev-dependencies]
wasm-bindgen = "0.2"
wasm-bindgen-test = "0.3.0"

View file

@ -0,0 +1,23 @@
A simple example for `use_webtransport`.
If you don't have it installed already, install [Trunk](https://trunkrs.dev/) and [Tailwind](https://tailwindcss.com/docs/installation)
as well as the nightly toolchain for Rust and the wasm32-unknown-unknown target:
```bash
cargo install trunk
npm install -D tailwindcss @tailwindcss/forms
rustup toolchain install nightly
rustup target add wasm32-unknown-unknown
```
Then, open two terminals. In the first one, run:
```
npx tailwindcss -i ./input.css -o ./style/output.css --watch
```
In the second one, run:
```bash
trunk serve --open
```

View file

@ -0,0 +1,2 @@
[build]
public_url = "/demo/"

View file

@ -0,0 +1,7 @@
<!DOCTYPE html>
<html>
<head>
<link data-trunk rel="css" href="style/output.css">
</head>
<body></body>
</html>

View file

@ -0,0 +1,3 @@
@tailwind base;
@tailwind components;
@tailwind utilities;

View file

@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"

View file

@ -0,0 +1,12 @@
use leptos::*;
#[component]
pub fn LogDisplay(#[prop(into)] log: Signal<Vec<String>>) -> impl IntoView {
view! {
<div>
<ul>
{move || log().iter().map(|l| view! { <li>{l}</li> }).collect::<Vec<_>>()}
</ul>
</div>
}
}

View file

@ -0,0 +1,111 @@
use leptos::*;
use leptos_use::core::ConnectionReadyState;
use leptos_use::docs::demo_or_body;
use leptos_use::{use_webtransport_with_options, UseWebTransportOptions};
mod log_display;
mod stream_bidir;
mod stream_send;
use log_display::*;
use stream_bidir::*;
use stream_send::*;
#[component]
fn Demo() -> impl IntoView {
let (datagrams_log, set_datagrams_log) = create_signal(vec![]);
let id = store_value(0);
let transport = use_webtransport_with_options(
"https://echo.webtransport.day",
UseWebTransportOptions::default()
.on_open(move || {
set_datagrams_log.update(|log| log.push("Connection opened".to_string()))
})
.on_close(move || {
set_datagrams_log.update(|log| log.push("Connection closed".to_string()))
}),
);
let (text, set_text) = create_signal("".to_string());
let on_send_datagrams = {
let transport = transport.clone();
move |_| {
set_datagrams_log.update(|log| log.push(format!("Sent datagram: '{}'", text())));
transport.send_datagrams(text().as_bytes());
set_text("".to_string());
}
};
let _ = watch(
transport.datagrams,
move |grams, _, _| {
if let Some(grams) = grams {
set_datagrams_log.update(|log| {
log.push(format!(
"Received datagrams: '{}'",
String::from_utf8(grams.clone()).expect("valid utf8")
))
});
}
},
false,
);
let (bidir_streams, set_bidir_streams) = create_signal(vec![]);
let on_open_bidir_stream = {
let transport = transport.clone();
move |_| {
let transport = transport.clone();
spawn_local(async move {
match transport.open_bidir_stream().await {
Ok(bidir_stream) => {
let i = id.get_value();
id.set_value(i + 1);
set_bidir_streams.update(|s| s.push((i, bidir_stream)));
}
Err(e) => {
set_datagrams_log.update(|log| {
log.push(format!("Failed to open bidir stream: {:?}", e))
});
}
}
});
}
};
let ready_state = transport.ready_state;
view! {
<button on:click=on_open_bidir_stream>"Open Bidir Stream"</button>
<h2>Datagrams</h2>
<textarea on:change=move |e| set_text(event_target_value(&e)) prop:value=text />
<button on:click=on_send_datagrams disabled=move || ready_state() != ConnectionReadyState::Open>"Send"</button>
<LogDisplay log=datagrams_log />
<h2>Bidir Streams</h2>
<For
each=bidir_streams
key=|(i, _)| *i
view=move |(_, bidir_stream)| view! { <StreamBidir ready_state=ready_state stream=bidir_stream.clone() /> }
/>
}
}
fn main() {
_ = console_log::init_with_level(log::Level::Debug);
console_error_panic_hook::set_once();
mount_to(demo_or_body(), || {
view! { <Demo/> }
})
}

View file

@ -0,0 +1,36 @@
use crate::{LogDisplay, StreamSend};
use leptos::*;
use leptos_use::core::ConnectionReadyState;
use leptos_use::BidirStream;
#[component]
pub fn StreamBidir(
#[prop(into)] ready_state: Signal<ConnectionReadyState>,
stream: BidirStream,
) -> impl IntoView {
let (log, set_log) = create_signal(vec![]);
let on_send = move |msg| {
set_log.update(|log| log.push(format!("Sent: '{}'", msg)));
};
let _ = watch(
stream.bytes,
move |bytes, _, _| {
if let Some(bytes) = bytes {
set_log.update(|log| {
log.push(format!(
"Received datagrams: '{}'",
String::from_utf8(bytes.clone()).expect("valid utf8")
))
});
}
},
false,
);
view! {
<StreamSend ready_state=ready_state send_stream=stream.clone() on_send=on_send />
<LogDisplay log=log />
}
}

View file

@ -0,0 +1,29 @@
use leptos::*;
use leptos_use::core::ConnectionReadyState;
use leptos_use::SendableStream;
#[component]
pub fn StreamSend<S, F>(
#[prop(into)] ready_state: Signal<ConnectionReadyState>,
send_stream: S,
on_send: F,
) -> impl IntoView
where
S: SendableStream + 'static,
F: Fn(String) + 'static,
{
let (text, set_text) = create_signal("".to_string());
let on_send = {
move |_| {
send_stream.send_bytes(text().as_bytes());
on_send(text());
set_text("".to_string());
}
};
view! {
<textarea on:change=move |e| set_text(event_target_value(&e)) prop:value=text />
<button on:click=on_send disabled=move || ready_state() != ConnectionReadyState::Open>"Send"</button>
}
}

View file

@ -0,0 +1,289 @@
[type='text'],[type='email'],[type='url'],[type='password'],[type='number'],[type='date'],[type='datetime-local'],[type='month'],[type='search'],[type='tel'],[type='time'],[type='week'],[multiple],textarea,select {
-webkit-appearance: none;
-moz-appearance: none;
appearance: none;
background-color: #fff;
border-color: #6b7280;
border-width: 1px;
border-radius: 0px;
padding-top: 0.5rem;
padding-right: 0.75rem;
padding-bottom: 0.5rem;
padding-left: 0.75rem;
font-size: 1rem;
line-height: 1.5rem;
--tw-shadow: 0 0 #0000;
}
[type='text']:focus, [type='email']:focus, [type='url']:focus, [type='password']:focus, [type='number']:focus, [type='date']:focus, [type='datetime-local']:focus, [type='month']:focus, [type='search']:focus, [type='tel']:focus, [type='time']:focus, [type='week']:focus, [multiple]:focus, textarea:focus, select:focus {
outline: 2px solid transparent;
outline-offset: 2px;
--tw-ring-inset: var(--tw-empty,/*!*/ /*!*/);
--tw-ring-offset-width: 0px;
--tw-ring-offset-color: #fff;
--tw-ring-color: #2563eb;
--tw-ring-offset-shadow: var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);
--tw-ring-shadow: var(--tw-ring-inset) 0 0 0 calc(1px + var(--tw-ring-offset-width)) var(--tw-ring-color);
box-shadow: var(--tw-ring-offset-shadow), var(--tw-ring-shadow), var(--tw-shadow);
border-color: #2563eb;
}
input::-moz-placeholder, textarea::-moz-placeholder {
color: #6b7280;
opacity: 1;
}
input::placeholder,textarea::placeholder {
color: #6b7280;
opacity: 1;
}
::-webkit-datetime-edit-fields-wrapper {
padding: 0;
}
::-webkit-date-and-time-value {
min-height: 1.5em;
}
::-webkit-datetime-edit,::-webkit-datetime-edit-year-field,::-webkit-datetime-edit-month-field,::-webkit-datetime-edit-day-field,::-webkit-datetime-edit-hour-field,::-webkit-datetime-edit-minute-field,::-webkit-datetime-edit-second-field,::-webkit-datetime-edit-millisecond-field,::-webkit-datetime-edit-meridiem-field {
padding-top: 0;
padding-bottom: 0;
}
select {
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%236b7280' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
background-position: right 0.5rem center;
background-repeat: no-repeat;
background-size: 1.5em 1.5em;
padding-right: 2.5rem;
-webkit-print-color-adjust: exact;
print-color-adjust: exact;
}
[multiple] {
background-image: initial;
background-position: initial;
background-repeat: unset;
background-size: initial;
padding-right: 0.75rem;
-webkit-print-color-adjust: unset;
print-color-adjust: unset;
}
[type='checkbox'],[type='radio'] {
-webkit-appearance: none;
-moz-appearance: none;
appearance: none;
padding: 0;
-webkit-print-color-adjust: exact;
print-color-adjust: exact;
display: inline-block;
vertical-align: middle;
background-origin: border-box;
-webkit-user-select: none;
-moz-user-select: none;
user-select: none;
flex-shrink: 0;
height: 1rem;
width: 1rem;
color: #2563eb;
background-color: #fff;
border-color: #6b7280;
border-width: 1px;
--tw-shadow: 0 0 #0000;
}
[type='checkbox'] {
border-radius: 0px;
}
[type='radio'] {
border-radius: 100%;
}
[type='checkbox']:focus,[type='radio']:focus {
outline: 2px solid transparent;
outline-offset: 2px;
--tw-ring-inset: var(--tw-empty,/*!*/ /*!*/);
--tw-ring-offset-width: 2px;
--tw-ring-offset-color: #fff;
--tw-ring-color: #2563eb;
--tw-ring-offset-shadow: var(--tw-ring-inset) 0 0 0 var(--tw-ring-offset-width) var(--tw-ring-offset-color);
--tw-ring-shadow: var(--tw-ring-inset) 0 0 0 calc(2px + var(--tw-ring-offset-width)) var(--tw-ring-color);
box-shadow: var(--tw-ring-offset-shadow), var(--tw-ring-shadow), var(--tw-shadow);
}
[type='checkbox']:checked,[type='radio']:checked {
border-color: transparent;
background-color: currentColor;
background-size: 100% 100%;
background-position: center;
background-repeat: no-repeat;
}
[type='checkbox']:checked {
background-image: url("data:image/svg+xml,%3csvg viewBox='0 0 16 16' fill='white' xmlns='http://www.w3.org/2000/svg'%3e%3cpath d='M12.207 4.793a1 1 0 010 1.414l-5 5a1 1 0 01-1.414 0l-2-2a1 1 0 011.414-1.414L6.5 9.086l4.293-4.293a1 1 0 011.414 0z'/%3e%3c/svg%3e");
}
[type='radio']:checked {
background-image: url("data:image/svg+xml,%3csvg viewBox='0 0 16 16' fill='white' xmlns='http://www.w3.org/2000/svg'%3e%3ccircle cx='8' cy='8' r='3'/%3e%3c/svg%3e");
}
[type='checkbox']:checked:hover,[type='checkbox']:checked:focus,[type='radio']:checked:hover,[type='radio']:checked:focus {
border-color: transparent;
background-color: currentColor;
}
[type='checkbox']:indeterminate {
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 16 16'%3e%3cpath stroke='white' stroke-linecap='round' stroke-linejoin='round' stroke-width='2' d='M4 8h8'/%3e%3c/svg%3e");
border-color: transparent;
background-color: currentColor;
background-size: 100% 100%;
background-position: center;
background-repeat: no-repeat;
}
[type='checkbox']:indeterminate:hover,[type='checkbox']:indeterminate:focus {
border-color: transparent;
background-color: currentColor;
}
[type='file'] {
background: unset;
border-color: inherit;
border-width: 0;
border-radius: 0;
padding: 0;
font-size: unset;
line-height: inherit;
}
[type='file']:focus {
outline: 1px solid ButtonText;
outline: 1px auto -webkit-focus-ring-color;
}
*, ::before, ::after {
--tw-border-spacing-x: 0;
--tw-border-spacing-y: 0;
--tw-translate-x: 0;
--tw-translate-y: 0;
--tw-rotate: 0;
--tw-skew-x: 0;
--tw-skew-y: 0;
--tw-scale-x: 1;
--tw-scale-y: 1;
--tw-pan-x: ;
--tw-pan-y: ;
--tw-pinch-zoom: ;
--tw-scroll-snap-strictness: proximity;
--tw-gradient-from-position: ;
--tw-gradient-via-position: ;
--tw-gradient-to-position: ;
--tw-ordinal: ;
--tw-slashed-zero: ;
--tw-numeric-figure: ;
--tw-numeric-spacing: ;
--tw-numeric-fraction: ;
--tw-ring-inset: ;
--tw-ring-offset-width: 0px;
--tw-ring-offset-color: #fff;
--tw-ring-color: rgb(59 130 246 / 0.5);
--tw-ring-offset-shadow: 0 0 #0000;
--tw-ring-shadow: 0 0 #0000;
--tw-shadow: 0 0 #0000;
--tw-shadow-colored: 0 0 #0000;
--tw-blur: ;
--tw-brightness: ;
--tw-contrast: ;
--tw-grayscale: ;
--tw-hue-rotate: ;
--tw-invert: ;
--tw-saturate: ;
--tw-sepia: ;
--tw-drop-shadow: ;
--tw-backdrop-blur: ;
--tw-backdrop-brightness: ;
--tw-backdrop-contrast: ;
--tw-backdrop-grayscale: ;
--tw-backdrop-hue-rotate: ;
--tw-backdrop-invert: ;
--tw-backdrop-opacity: ;
--tw-backdrop-saturate: ;
--tw-backdrop-sepia: ;
}
::backdrop {
--tw-border-spacing-x: 0;
--tw-border-spacing-y: 0;
--tw-translate-x: 0;
--tw-translate-y: 0;
--tw-rotate: 0;
--tw-skew-x: 0;
--tw-skew-y: 0;
--tw-scale-x: 1;
--tw-scale-y: 1;
--tw-pan-x: ;
--tw-pan-y: ;
--tw-pinch-zoom: ;
--tw-scroll-snap-strictness: proximity;
--tw-gradient-from-position: ;
--tw-gradient-via-position: ;
--tw-gradient-to-position: ;
--tw-ordinal: ;
--tw-slashed-zero: ;
--tw-numeric-figure: ;
--tw-numeric-spacing: ;
--tw-numeric-fraction: ;
--tw-ring-inset: ;
--tw-ring-offset-width: 0px;
--tw-ring-offset-color: #fff;
--tw-ring-color: rgb(59 130 246 / 0.5);
--tw-ring-offset-shadow: 0 0 #0000;
--tw-ring-shadow: 0 0 #0000;
--tw-shadow: 0 0 #0000;
--tw-shadow-colored: 0 0 #0000;
--tw-blur: ;
--tw-brightness: ;
--tw-contrast: ;
--tw-grayscale: ;
--tw-hue-rotate: ;
--tw-invert: ;
--tw-saturate: ;
--tw-sepia: ;
--tw-drop-shadow: ;
--tw-backdrop-blur: ;
--tw-backdrop-brightness: ;
--tw-backdrop-contrast: ;
--tw-backdrop-grayscale: ;
--tw-backdrop-hue-rotate: ;
--tw-backdrop-invert: ;
--tw-backdrop-opacity: ;
--tw-backdrop-saturate: ;
--tw-backdrop-sepia: ;
}
.block {
display: block;
}
.text-\[--brand-color\] {
color: var(--brand-color);
}
.text-green-600 {
--tw-text-opacity: 1;
color: rgb(22 163 74 / var(--tw-text-opacity));
}
.opacity-75 {
opacity: 0.75;
}
@media (prefers-color-scheme: dark) {
.dark\:text-green-500 {
--tw-text-opacity: 1;
color: rgb(34 197 94 / var(--tw-text-opacity));
}
}

View file

@ -0,0 +1,15 @@
/** @type {import('tailwindcss').Config} */
module.exports = {
content: {
files: ["*.html", "./src/**/*.rs", "../../src/docs/**/*.rs"],
},
theme: {
extend: {},
},
corePlugins: {
preflight: false,
},
plugins: [
require('@tailwindcss/forms'),
],
}

View file

@ -1,6 +1,5 @@
use crate::core::ConnectionReadyState;
use async_trait::async_trait;
use cfg_if::cfg_if;
use default_struct_builder::DefaultBuilder;
use js_sys::Reflect;
use leptos::leptos_dom::helpers::TimeoutHandle;
@ -17,6 +16,7 @@ use serde::{Deserialize, Serialize};
#[cfg(feature = "msgpack")]
use rmp_serde::{from_slice, to_vec};
use thiserror::Error;
use web_sys::WebTransportBidirectionalStream;
#[cfg(feature = "bincode")]
use bincode::serde::{decode_from_slice as from_slice, encode_to_vec as to_vec};
@ -103,6 +103,8 @@ pub fn use_webtransport_with_options(
let transport = Rc::clone(&transport);
let reconnect_timer = Rc::clone(&reconnect_timer);
let on_open = Rc::clone(&on_open);
let on_bidir_stream = Rc::clone(&on_bidir_stream);
let on_receive_stream = Rc::clone(&on_receive_stream);
move || {
reconnect_timer.set(None);
@ -119,8 +121,10 @@ pub fn use_webtransport_with_options(
set_ready_state.set(ConnectionReadyState::Connecting);
spawn_local({
let on_open = Rc::clone(&on_open);
let transport = Rc::clone(&transport);
let on_open = Rc::clone(&on_open);
let on_bidir_stream = Rc::clone(&on_bidir_stream);
let on_receive_stream = Rc::clone(&on_receive_stream);
async move {
let transport = transport.borrow();
@ -130,6 +134,37 @@ pub fn use_webtransport_with_options(
Ok(_) => {
set_ready_state.set(ConnectionReadyState::Open);
on_open();
listen_to_stream(
transport.incoming_bidirectional_streams(),
move |value| {
let stream: web_sys::WebTransportBidirectionalStream =
value.unchecked_into();
if let Ok(stream) = create_bidir_stream(stream, ready_state) {
on_bidir_stream(stream);
}
},
|| {},
);
listen_to_stream(
transport.incoming_unidirectional_streams(),
move |value| {
let stream: web_sys::ReadableStream = value.unchecked_into();
let (state, set_state, bytes) = create_state_and_bytes_signal(
stream.unchecked_into(),
ready_state,
);
on_receive_stream(ReceiveStream {
bytes,
state,
set_state,
});
},
|| {},
);
}
Err(e) => {
// TODO : handle error?
@ -181,10 +216,10 @@ pub fn use_webtransport_with_options(
let result = JsFuture::from(transport.closed()).await;
set_ready_state.set(ConnectionReadyState::Closed);
on_closed();
match result {
Ok(_) => {
on_close();
}
Ok(_) => {}
Err(e) => {
// TODO : handle error?
}
@ -215,14 +250,13 @@ pub fn use_webtransport_with_options(
.readable()
},
set_datagrams,
|| {},
);
datagrams_signal.get()
}
});
// TODO : reliable streams
{
let unmounted = Rc::clone(&unmounted);
@ -268,6 +302,7 @@ fn lazy_initialize_u8_reader(
initialized: Rc<Cell<bool>>,
get_readable_stream: impl Fn() -> web_sys::ReadableStream,
set_signal: WriteSignal<Option<Vec<u8>>>,
on_done: impl Fn() + 'static,
) {
lazy_initialize_reader(
ready_state,
@ -277,6 +312,7 @@ fn lazy_initialize_u8_reader(
let value: js_sys::Uint8Array = value.into();
set_signal.set(Some(value.to_vec()));
},
on_done,
);
}
@ -285,13 +321,15 @@ fn lazy_initialize_reader(
initialized: Rc<Cell<bool>>,
get_readable_stream: impl Fn() -> web_sys::ReadableStream,
on_value: impl Fn(JsValue) + 'static,
on_done: impl Fn() + 'static,
) {
if ready_state.get() == ConnectionReadyState::Open {
if !initialized.get() {
initialized.set(true);
listen_to_stream(get_readable_stream(), on_value, move || {
initialized.set(false)
initialized.set(false);
on_done();
});
}
}
@ -382,6 +420,7 @@ impl Default for UseWebTransportOptions {
}
/// Wether the stream is open or closed
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum StreamState {
Open,
Closed,
@ -402,18 +441,24 @@ pub trait CloseableStream {
#[async_trait(?Send)]
/// Trait to send data in a stream
pub trait SendableStream {
pub trait SendableStream: CloseableStream {
/// Getter for the stream writer
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter;
/// Send data in the form of bytes ignoring potential errors
fn send_bytes(&self, data: &[u8]) {
let arr = js_sys::Uint8Array::from(data);
let _ = self.writer().write_with_chunk(&arr);
if self.state().get() == StreamState::Open {
let arr = js_sys::Uint8Array::from(data);
let _ = self.writer().write_with_chunk(&arr);
}
}
/// Send data in the form of bytes asynchronously with a result providing potential errors
async fn send_bytes_async(&self, data: &[u8]) -> Result<(), SendError> {
if self.state().get() != StreamState::Open {
return Err(SendError::StreamNotOpen);
}
let arr = js_sys::Uint8Array::from(data);
let _ = JsFuture::from(self.writer().write_with_chunk(&arr))
.await
@ -442,6 +487,18 @@ pub trait SendableStream {
}
}
#[async_trait(?Send)]
/// Trait to receive data in a stream
pub trait ReceivableStream: CloseableStream {
#[cfg(any(feature = "msgpack", feature = "bincode"))]
/// Receive data in the form of a serializable object ignoring potential errors
fn receive<T: for<'a> Deserialize<'a>>(&self) -> Signal<Option<T>>;
#[cfg(any(feature = "msgpack", feature = "bincode"))]
/// Receive data in the form of a serializable object asynchronously with a result providing potential errors
fn try_receive<T: for<'a> Deserialize<'a>>(&self) -> Signal<Option<Result<T, ReceiveError>>>;
}
#[derive(Clone, Debug)]
/// Stream for sending data
pub struct SendStream {
@ -450,41 +507,11 @@ pub struct SendStream {
set_state: WriteSignal<StreamState>,
}
#[async_trait(?Send)]
impl SendableStream for SendStream {
#[inline(always)]
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter {
&self.writer
}
}
#[async_trait(?Send)]
impl CloseableStream for SendStream {
#[inline(always)]
fn state(&self) -> Signal<StreamState> {
self.state
}
#[inline(always)]
fn close(&self) {
let _ = self.writer.close();
}
async fn close_async(&self) -> Result<(), WebTransportError> {
let _ = JsFuture::from(self.writer.close())
.await
.map_err(|e| WebTransportError::OnCloseWriter(e))?;
Ok(())
}
}
#[derive(Clone, Debug)]
/// Stream for receiving data
#[allow(dead_code)]
pub struct ReceiveStream {
pub reader: web_sys::ReadableStreamDefaultReader,
pub message: Signal<Option<Vec<u8>>>,
// pub close: Rc<dyn Fn()>,
pub bytes: Signal<Option<Vec<u8>>>,
state: Signal<StreamState>,
set_state: WriteSignal<StreamState>,
}
@ -500,55 +527,89 @@ pub struct BidirStream {
set_state: WriteSignal<StreamState>,
}
cfg_if! { if #[cfg(any(feature = "msgpack", feature = "bincode"))] {
impl BidirStream {
pub fn receive<T: for <'a> Deserialize<'a>>(&self) -> Signal<Option<T>> {
let bytes = self.bytes;
macro_rules! impl_receivable_stream {
($ty:ty) => {
impl BidirStream {
#[cfg(any(feature = "msgpack", feature = "bincode"))]
pub fn receive<T: for<'a> Deserialize<'a>>(&self) -> Signal<Option<T>> {
let bytes = self.bytes;
Signal::derive(move || {
bytes.get().map(|bytes| from_slice(bytes.as_slice()).expect("Deserialization should not fail"))
})
Signal::derive(move || {
if self.state.get() != StreamState::Open {
None
} else {
bytes
.get()
.and_then(|bytes| from_slice(bytes.as_slice()).ok())
}
})
}
#[cfg(any(feature = "msgpack", feature = "bincode"))]
pub fn try_receive<T: for<'a> Deserialize<'a>>(
&self,
) -> Signal<Option<Result<T, ReceiveError>>> {
let bytes = self.bytes;
Signal::derive(move || {
if self.state.get() != StreamState::Open {
None
} else {
bytes.get().map(|bytes| Ok(from_slice(bytes.as_slice())?))
}
})
}
}
pub fn try_receive<T: for <'a> Deserialize<'a>>(&self) -> Signal<Option<Result<T, ReceiveError>>> {
let bytes = self.bytes;
Signal::derive(move || {
bytes.get().map(|bytes| Ok(from_slice(bytes.as_slice())?))
})
}
}
}}
#[async_trait(?Send)]
impl CloseableStream for BidirStream {
#[inline(always)]
fn state(&self) -> Signal<StreamState> {
self.state
}
#[inline(always)]
fn close(&self) {
let _ = self.writer.close();
}
async fn close_async(&self) -> Result<(), WebTransportError> {
let _ = JsFuture::from(self.writer.close())
.await
.map_err(|e| WebTransportError::OnCloseWriter(e))?;
Ok(())
}
};
}
#[async_trait(?Send)]
impl SendableStream for BidirStream {
#[inline(always)]
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter {
&self.writer
}
impl_receivable_stream!(ReceiveStream);
impl_receivable_stream!(BidirStream);
macro_rules! impl_sendable_stream {
($ty:ty) => {
#[async_trait(?Send)]
impl SendableStream for $ty {
#[inline(always)]
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter {
&self.writer
}
}
};
}
impl_sendable_stream!(SendStream);
impl_sendable_stream!(BidirStream);
macro_rules! impl_closable_stream {
($ty:ty) => {
#[async_trait(?Send)]
impl CloseableStream for $ty {
#[inline(always)]
fn state(&self) -> Signal<StreamState> {
self.state
}
#[inline(always)]
fn close(&self) {
let _ = self.writer.close();
self.set_state.set(StreamState::Closed);
}
async fn close_async(&self) -> Result<(), WebTransportError> {
let _ = JsFuture::from(self.writer.close())
.await
.map_err(|e| WebTransportError::OnCloseWriter(e))?;
Ok(())
}
}
};
}
impl_closable_stream!(SendStream);
impl_closable_stream!(BidirStream);
/// Return type of [`use_webtransport`].
#[derive(Clone, Debug)]
pub struct UseWebTransportReturn {
@ -581,8 +642,8 @@ impl UseWebTransportReturn {
// TODO : send_datagrams_async
/// Create a unidirectional send stream
pub async fn create_send_stream(&self) -> Result<SendStream, WebTransportError> {
/// Open a unidirectional send stream
pub async fn open_send_stream(&self) -> Result<SendStream, WebTransportError> {
if let Some(transport) = self.transport.borrow().as_ref() {
let result = JsFuture::from(transport.create_unidirectional_stream())
.await
@ -604,45 +665,80 @@ impl UseWebTransportReturn {
}
}
/// Create a bidirectional stream
pub async fn create_bidir_stream(&self) -> Result<BidirStream, WebTransportError> {
/// Open a bidirectional stream
pub async fn open_bidir_stream(&self) -> Result<BidirStream, WebTransportError> {
if let Some(transport) = self.transport.borrow().as_ref() {
let result = JsFuture::from(transport.create_bidirectional_stream())
.await
.map_err(|e| WebTransportError::FailedToOpenStream(e))?;
let stream: web_sys::WebTransportBidirectionalStream = result.unchecked_into();
let writer = stream
.writable()
.get_writer()
.map_err(|e| WebTransportError::FailedToOpenWriter(e))?;
let ready_state = self.ready_state;
let bytes = Signal::derive({
let reader_initialized = Rc::new(Cell::new(false));
let ready_state = self.ready_state;
let (message_signal, set_message) = create_signal(None::<Vec<u8>>);
let stream = stream.clone();
move || {
let stream = stream.clone();
lazy_initialize_u8_reader(
ready_state,
Rc::clone(&reader_initialized),
move || stream.readable().unchecked_into(),
set_message,
);
message_signal.get()
}
});
Ok(BidirStream { writer, bytes })
create_bidir_stream(stream, ready_state)
} else {
Err(WebTransportError::NotConnected)
}
}
}
fn create_state_and_bytes_signal(
stream: web_sys::ReadableStream,
ready_state: Signal<ConnectionReadyState>,
) -> (
Signal<StreamState>,
WriteSignal<StreamState>,
Signal<Option<Vec<u8>>>,
) {
let (state, set_state) = create_signal(StreamState::Open);
let bytes = Signal::derive({
let reader_initialized = Rc::new(Cell::new(false));
let (message_signal, set_message) = create_signal(None::<Vec<u8>>);
let stream = stream.clone();
move || {
let stream = stream.clone();
lazy_initialize_u8_reader(
ready_state,
Rc::clone(&reader_initialized),
move || stream.clone().unchecked_into(),
set_message,
move || {
set_state.set(StreamState::Closed);
},
);
message_signal.get()
}
});
(state.into(), set_state, bytes)
}
fn create_bidir_stream(
stream: WebTransportBidirectionalStream,
ready_state: Signal<ConnectionReadyState>,
) -> Result<BidirStream, WebTransportError> {
let writer = stream
.writable()
.get_writer()
.map_err(|e| WebTransportError::FailedToOpenWriter(e))?;
let (state, set_state, bytes) =
create_state_and_bytes_signal(stream.readable().unchecked_into(), ready_state);
let bidir_stream = BidirStream {
writer,
bytes,
state,
set_state,
};
Ok(bidir_stream)
}
/// Error enum for [`UseWebTransportOptions::on_error`]
#[derive(Debug)]
pub enum WebTransportError {
@ -659,6 +755,9 @@ pub enum WebTransportError {
/// Error enum for [`SendStream::send`]
#[derive(Error, Debug)]
pub enum SendError {
#[error("Stream is not open")]
StreamNotOpen,
#[error("Failed to write to stream")]
FailedToWrite(JsValue),