mirror of
https://github.com/adoyle0/leptos-use.git
synced 2025-03-13 09:09:48 -04:00
reliable streams
This commit is contained in:
parent
b80553751b
commit
25590582e3
3 changed files with 323 additions and 62 deletions
|
@ -24,6 +24,9 @@ paste = "1"
|
||||||
lazy_static = "1"
|
lazy_static = "1"
|
||||||
cfg-if = "1"
|
cfg-if = "1"
|
||||||
wasm-bindgen-futures = "0.4"
|
wasm-bindgen-futures = "0.4"
|
||||||
|
async-trait = "0.1"
|
||||||
|
rmp-serde = { version = "1.1", optional = true }
|
||||||
|
thiserror = "1"
|
||||||
|
|
||||||
[dependencies.web-sys]
|
[dependencies.web-sys]
|
||||||
version = "0.3"
|
version = "0.3"
|
||||||
|
@ -78,7 +81,10 @@ features = [
|
||||||
"WebTransport",
|
"WebTransport",
|
||||||
"WebTransportOptions",
|
"WebTransportOptions",
|
||||||
"WebTransportDatagramDuplexStream",
|
"WebTransportDatagramDuplexStream",
|
||||||
|
"WebTransportBidirectionalStream",
|
||||||
"Window",
|
"Window",
|
||||||
|
"WebTransportReceiveStream",
|
||||||
|
"WebTransportSendStream",
|
||||||
"WritableStream",
|
"WritableStream",
|
||||||
"WritableStreamDefaultWriter",
|
"WritableStreamDefaultWriter",
|
||||||
]
|
]
|
||||||
|
@ -86,8 +92,9 @@ features = [
|
||||||
[features]
|
[features]
|
||||||
docs = []
|
docs = []
|
||||||
math = ["num"]
|
math = ["num"]
|
||||||
storage = ["serde", "serde_json", "web-sys/StorageEvent"]
|
storage = ["dep:serde", "dep:serde_json", "web-sys/StorageEvent"]
|
||||||
ssr = []
|
ssr = []
|
||||||
|
msgpack = ["dep:rmp-serde", "dep:serde"]
|
||||||
|
|
||||||
[package.metadata.docs.rs]
|
[package.metadata.docs.rs]
|
||||||
all-features = true
|
all-features = true
|
||||||
|
|
|
@ -57,7 +57,6 @@ mod use_supported;
|
||||||
mod use_throttle_fn;
|
mod use_throttle_fn;
|
||||||
mod use_to_string;
|
mod use_to_string;
|
||||||
mod use_websocket;
|
mod use_websocket;
|
||||||
mod use_webtransport;
|
|
||||||
mod use_window_focus;
|
mod use_window_focus;
|
||||||
mod use_window_scroll;
|
mod use_window_scroll;
|
||||||
mod watch_debounced;
|
mod watch_debounced;
|
||||||
|
|
|
@ -1,14 +1,27 @@
|
||||||
use crate::core::ConnectionReadyState;
|
use crate::core::ConnectionReadyState;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use cfg_if::cfg_if;
|
||||||
use default_struct_builder::DefaultBuilder;
|
use default_struct_builder::DefaultBuilder;
|
||||||
use js_sys::{Reflect, Uint8Array};
|
use js_sys::Reflect;
|
||||||
use leptos::leptos_dom::helpers::TimeoutHandle;
|
use leptos::leptos_dom::helpers::TimeoutHandle;
|
||||||
use leptos::*;
|
use leptos::*;
|
||||||
use std::cell::{Cell, RefCell};
|
use std::cell::{Cell, RefCell};
|
||||||
|
use std::future::Future;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use wasm_bindgen::prelude::*;
|
use wasm_bindgen::prelude::*;
|
||||||
use wasm_bindgen_futures::JsFuture;
|
use wasm_bindgen_futures::JsFuture;
|
||||||
|
|
||||||
|
#[cfg(any(feature = "bincode", feature = "msgpack"))]
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[cfg(feature = "msgpack")]
|
||||||
|
use rmp_serde::{from_slice, to_vec};
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
#[cfg(feature = "bincode")]
|
||||||
|
use bincode::serde::{decode_from_slice as from_slice, encode_to_vec as to_vec};
|
||||||
|
|
||||||
///
|
///
|
||||||
///
|
///
|
||||||
/// ## Demo
|
/// ## Demo
|
||||||
|
@ -28,7 +41,7 @@ use wasm_bindgen_futures::JsFuture;
|
||||||
/// # view! { }
|
/// # view! { }
|
||||||
/// # }
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn use_webtransport(url: &str) -> UseWebTransportReturn<impl Fn(Vec<u8>) + Clone + 'static> {
|
pub fn use_webtransport(url: &str) -> UseWebTransportReturn {
|
||||||
use_webtransport_with_options(url, UseWebTransportOptions::default())
|
use_webtransport_with_options(url, UseWebTransportOptions::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,17 +49,21 @@ pub fn use_webtransport(url: &str) -> UseWebTransportReturn<impl Fn(Vec<u8>) + C
|
||||||
pub fn use_webtransport_with_options(
|
pub fn use_webtransport_with_options(
|
||||||
url: &str,
|
url: &str,
|
||||||
options: UseWebTransportOptions,
|
options: UseWebTransportOptions,
|
||||||
) -> UseWebTransportReturn<impl Fn(Vec<u8>) + Clone + 'static> {
|
) -> UseWebTransportReturn {
|
||||||
let UseWebTransportOptions {
|
let UseWebTransportOptions {
|
||||||
on_open,
|
on_open,
|
||||||
on_error,
|
on_error,
|
||||||
on_close,
|
on_close,
|
||||||
|
on_receive_stream,
|
||||||
|
on_bidir_stream,
|
||||||
reconnect_limit,
|
reconnect_limit,
|
||||||
reconnect_interval,
|
reconnect_interval,
|
||||||
immediate,
|
immediate,
|
||||||
} = options;
|
} = options;
|
||||||
|
let url = url.to_string();
|
||||||
|
|
||||||
let (ready_state, set_ready_state) = create_signal(ConnectionReadyState::Closed);
|
let (ready_state, set_ready_state) = create_signal(ConnectionReadyState::Closed);
|
||||||
|
let ready_state: Signal<_> = ready_state.into();
|
||||||
|
|
||||||
let transport = Rc::new(RefCell::new(None::<web_sys::WebTransport>));
|
let transport = Rc::new(RefCell::new(None::<web_sys::WebTransport>));
|
||||||
let datagrams_reader_initialized = Rc::new(Cell::new(false));
|
let datagrams_reader_initialized = Rc::new(Cell::new(false));
|
||||||
|
@ -86,6 +103,7 @@ pub fn use_webtransport_with_options(
|
||||||
connect_ref.set_value(Some(Rc::new({
|
connect_ref.set_value(Some(Rc::new({
|
||||||
let transport = Rc::clone(&transport);
|
let transport = Rc::clone(&transport);
|
||||||
let reconnect_timer = Rc::clone(&reconnect_timer);
|
let reconnect_timer = Rc::clone(&reconnect_timer);
|
||||||
|
let on_open = Rc::clone(&on_open);
|
||||||
|
|
||||||
move || {
|
move || {
|
||||||
let transport = transport.borrow();
|
let transport = transport.borrow();
|
||||||
|
@ -96,12 +114,15 @@ pub fn use_webtransport_with_options(
|
||||||
transport.close();
|
transport.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
let transport =
|
let options = web_sys::WebTransportOptions::new();
|
||||||
web_sys::WebTransport::new_with_options(url, &options.into()).unwrap_throw();
|
let transport = web_sys::WebTransport::new_with_options(&url, &options).unwrap_throw();
|
||||||
|
|
||||||
set_ready_state.set(ConnectionReadyState::Connecting);
|
set_ready_state.set(ConnectionReadyState::Connecting);
|
||||||
|
|
||||||
spawn_local(async move {
|
spawn_local({
|
||||||
|
let on_open = Rc::clone(&on_open);
|
||||||
|
|
||||||
|
async move {
|
||||||
match JsFuture::from(transport.ready()).await {
|
match JsFuture::from(transport.ready()).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
set_ready_state.set(ConnectionReadyState::Open);
|
set_ready_state.set(ConnectionReadyState::Open);
|
||||||
|
@ -112,6 +133,7 @@ pub fn use_webtransport_with_options(
|
||||||
set_ready_state.set(ConnectionReadyState::Closed);
|
set_ready_state.set(ConnectionReadyState::Closed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
})));
|
})));
|
||||||
|
@ -162,10 +184,19 @@ pub fn use_webtransport_with_options(
|
||||||
let datagrams_reader_initialized = Rc::clone(&datagrams_reader_initialized);
|
let datagrams_reader_initialized = Rc::clone(&datagrams_reader_initialized);
|
||||||
|
|
||||||
move || {
|
move || {
|
||||||
lazy_initialize_reader(
|
let transport = Rc::clone(&transport);
|
||||||
|
|
||||||
|
lazy_initialize_u8_reader(
|
||||||
ready_state,
|
ready_state,
|
||||||
transport,
|
Rc::clone(&datagrams_reader_initialized),
|
||||||
datagrams_reader_initialized,
|
move || {
|
||||||
|
transport
|
||||||
|
.borrow()
|
||||||
|
.as_ref()
|
||||||
|
.expect("transport should be set a this point")
|
||||||
|
.datagrams()
|
||||||
|
.readable()
|
||||||
|
},
|
||||||
set_datagrams,
|
set_datagrams,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -173,19 +204,6 @@ pub fn use_webtransport_with_options(
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let send_datagrams = {
|
|
||||||
let transport = Rc::clone(&transport);
|
|
||||||
let datagrams_writer = Rc::clone(&datagrams_writer);
|
|
||||||
|
|
||||||
move |data| {
|
|
||||||
if let Some(transport) = transport.borrow().as_ref() {
|
|
||||||
let writer = get_or_create_datagrams_writer(datagrams_writer, transport);
|
|
||||||
|
|
||||||
let _ = writer.write_with_chunk(&data.into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO : reliable streams
|
// TODO : reliable streams
|
||||||
|
|
||||||
on_cleanup(move || {
|
on_cleanup(move || {
|
||||||
|
@ -194,9 +212,10 @@ pub fn use_webtransport_with_options(
|
||||||
});
|
});
|
||||||
|
|
||||||
UseWebTransportReturn {
|
UseWebTransportReturn {
|
||||||
ready_state: ready_state.into(),
|
transport,
|
||||||
|
ready_state,
|
||||||
datagrams,
|
datagrams,
|
||||||
send_datagrams,
|
datagrams_writer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,38 +236,53 @@ fn get_or_create_datagrams_writer(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lazy_initialize_reader(
|
fn lazy_initialize_u8_reader(
|
||||||
ready_state: ReadSignal<ConnectionReadyState>,
|
ready_state: Signal<ConnectionReadyState>,
|
||||||
transport: Rc<RefCell<Option<web_sys::WebTransport>>>,
|
|
||||||
initialized: Rc<Cell<bool>>,
|
initialized: Rc<Cell<bool>>,
|
||||||
set_datagrams: WriteSignal<Option<Vec<u8>>>,
|
get_readable_stream: impl Fn() -> web_sys::ReadableStream,
|
||||||
|
set_signal: WriteSignal<Option<Vec<u8>>>,
|
||||||
|
) {
|
||||||
|
lazy_initialize_reader(
|
||||||
|
ready_state,
|
||||||
|
initialized,
|
||||||
|
get_readable_stream,
|
||||||
|
move |value| {
|
||||||
|
let value: js_sys::Uint8Array = value.into();
|
||||||
|
set_signal.set(Some(value.to_vec()));
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn lazy_initialize_reader(
|
||||||
|
ready_state: Signal<ConnectionReadyState>,
|
||||||
|
initialized: Rc<Cell<bool>>,
|
||||||
|
get_readable_stream: impl Fn() -> web_sys::ReadableStream,
|
||||||
|
on_value: impl Fn(JsValue) + 'static,
|
||||||
) {
|
) {
|
||||||
if ready_state.get() == ConnectionReadyState::Open {
|
if ready_state.get() == ConnectionReadyState::Open {
|
||||||
if !initialized.get() {
|
if !initialized.get() {
|
||||||
if let Some(transport) = transport.borrow().as_ref() {
|
|
||||||
initialized.set(true);
|
initialized.set(true);
|
||||||
|
|
||||||
listen_to_stream(
|
listen_to_stream(
|
||||||
transport.datagrams().readable(),
|
get_readable_stream(),
|
||||||
move || initialized.set(false),
|
move || initialized.set(false),
|
||||||
set_datagrams,
|
on_value,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn listen_to_stream(
|
fn listen_to_stream(
|
||||||
readable_stream: web_sys::ReadableStream,
|
readable_stream: web_sys::ReadableStream,
|
||||||
on_done: fn(),
|
on_done: impl Fn() + 'static,
|
||||||
set_signal: WriteSignal<Option<Vec<u8>>>,
|
on_value: impl Fn(JsValue) + 'static,
|
||||||
) {
|
) {
|
||||||
let mut reader_options = web_sys::ReadableStreamGetReaderOptions::new();
|
let mut reader_options = web_sys::ReadableStreamGetReaderOptions::new();
|
||||||
reader_options.mode(web_sys::ReadableStreamReaderMode::Byob);
|
reader_options.mode(web_sys::ReadableStreamReaderMode::Byob);
|
||||||
|
|
||||||
let reader: web_sys::ReadableStreamByobReader = readable_stream
|
let reader: web_sys::ReadableStreamByobReader = readable_stream
|
||||||
.get_reader_with_options(&reader_options)
|
.get_reader_with_options(&reader_options)
|
||||||
.into();
|
.unchecked_into();
|
||||||
|
|
||||||
spawn_local(async move {
|
spawn_local(async move {
|
||||||
// the length value 4000 is taken from the MDN example
|
// the length value 4000 is taken from the MDN example
|
||||||
|
@ -269,11 +303,10 @@ fn listen_to_stream(
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
let value: Uint8Array = Reflect::get(&result, &"value".into())
|
let value = Reflect::get(&result, &"value".into())
|
||||||
.expect("if not done there should be a value")
|
.expect("if not done there should be a value");
|
||||||
.into();
|
|
||||||
|
|
||||||
set_signal.set(Some(value.to_vec()))
|
on_value(value);
|
||||||
}
|
}
|
||||||
Err(..) => {
|
Err(..) => {
|
||||||
// TODO : error handling?
|
// TODO : error handling?
|
||||||
|
@ -298,6 +331,12 @@ pub struct UseWebTransportOptions {
|
||||||
/// Callback when `WebTransport` is closed.
|
/// Callback when `WebTransport` is closed.
|
||||||
on_close: Rc<dyn Fn()>,
|
on_close: Rc<dyn Fn()>,
|
||||||
|
|
||||||
|
/// Callback when the server opens a one-way stream.
|
||||||
|
on_receive_stream: Rc<dyn Fn(ReceiveStream)>,
|
||||||
|
|
||||||
|
/// Callback when the server opens a bidirectional stream.
|
||||||
|
on_bidir_stream: Rc<dyn Fn(BidirStream)>,
|
||||||
|
|
||||||
/// Retry times. Defaults to 3.
|
/// Retry times. Defaults to 3.
|
||||||
reconnect_limit: u64,
|
reconnect_limit: u64,
|
||||||
|
|
||||||
|
@ -316,6 +355,8 @@ impl Default for UseWebTransportOptions {
|
||||||
on_open: Rc::new(|| {}),
|
on_open: Rc::new(|| {}),
|
||||||
on_error: Rc::new(|_| {}),
|
on_error: Rc::new(|_| {}),
|
||||||
on_close: Rc::new(|| {}),
|
on_close: Rc::new(|| {}),
|
||||||
|
on_receive_stream: Rc::new(|_| {}),
|
||||||
|
on_bidir_stream: Rc::new(|_| {}),
|
||||||
reconnect_limit: 3,
|
reconnect_limit: 3,
|
||||||
reconnect_interval: 3000,
|
reconnect_interval: 3000,
|
||||||
immediate: true,
|
immediate: true,
|
||||||
|
@ -323,24 +364,238 @@ impl Default for UseWebTransportOptions {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<UseWebTransportOptions> for web_sys::WebTransportOptions {
|
#[async_trait(?Send)]
|
||||||
fn from(options: UseWebTransportOptions) -> Self {
|
pub trait CloseableStream {
|
||||||
web_sys::WebTransportOptions::new()
|
fn close(&self);
|
||||||
|
|
||||||
|
async fn close_async(&self) -> Result<(), WebTransportError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
pub trait SendableStream {
|
||||||
|
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter;
|
||||||
|
fn send_bytes(&self, data: &[u8]) {
|
||||||
|
let arr = js_sys::Uint8Array::from(data);
|
||||||
|
let _ = self.writer().write_with_chunk(&arr);
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_bytes_async(&self, data: &[u8]) -> Result<(), SendError> {
|
||||||
|
let arr = js_sys::Uint8Array::from(data);
|
||||||
|
let _ = JsFuture::from(self.writer().write_with_chunk(&arr))
|
||||||
|
.await
|
||||||
|
.map_err(|e| SendError::FailedToWrite(e));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(any(feature = "msgpack", feature = "bincode"))]
|
||||||
|
fn send<T: Serialize>(&self, data: &T) {
|
||||||
|
self.send_bytes(
|
||||||
|
to_vec(data)
|
||||||
|
.expect("Serialization should not fail")
|
||||||
|
.as_slice(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(any(feature = "msgpack", feature = "bincode"))]
|
||||||
|
async fn send_async<T: Serialize>(&self, data: &T) -> Result<(), SendError> {
|
||||||
|
let serialized = to_vec(data)?;
|
||||||
|
self.send_bytes_async(&serialized).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SendStream {
|
||||||
|
pub writer: web_sys::WritableStreamDefaultWriter,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
impl SendableStream for SendStream {
|
||||||
|
#[inline(always)]
|
||||||
|
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter {
|
||||||
|
&self.writer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
impl CloseableStream for SendStream {
|
||||||
|
fn close(&self) {
|
||||||
|
let _ = self.writer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn close_async(&self) -> Result<(), WebTransportError> {
|
||||||
|
let _ = JsFuture::from(self.writer.close())
|
||||||
|
.await
|
||||||
|
.map_err(|e| WebTransportError::OnCloseWriter(e))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ReceiveStream {
|
||||||
|
pub reader: web_sys::ReadableStreamByobReader,
|
||||||
|
pub message: Signal<Option<Vec<u8>>>,
|
||||||
|
pub close: Rc<dyn Fn()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct BidirStream {
|
||||||
|
pub writer: web_sys::WritableStreamDefaultWriter,
|
||||||
|
pub bytes: Signal<Option<Vec<u8>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg_if! { if #[cfg(any(feature = "msgpack", feature = "bincode"))] {
|
||||||
|
impl BidirStream {
|
||||||
|
pub fn receive<T: for <'a> Deserialize<'a>>(&self) -> Signal<Option<T>> {
|
||||||
|
let bytes = self.bytes;
|
||||||
|
|
||||||
|
Signal::derive(move || {
|
||||||
|
bytes.get().map(|bytes| from_slice(bytes.as_slice()).expect("Deserialization should not fail"))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn try_receive<T: for <'a> Deserialize<'a>>(&self) -> Signal<Option<Result<T, ReceiveError>>> {
|
||||||
|
let bytes = self.bytes;
|
||||||
|
|
||||||
|
Signal::derive(move || {
|
||||||
|
bytes.get().map(|bytes| Ok(from_slice(bytes.as_slice())?))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}}
|
||||||
|
|
||||||
|
#[async_trait(?Send)]
|
||||||
|
impl CloseableStream for BidirStream {
|
||||||
|
fn close(&self) {
|
||||||
|
let _ = self.writer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn close_async(&self) -> Result<(), WebTransportError> {
|
||||||
|
let _ = JsFuture::from(self.writer.close())
|
||||||
|
.await
|
||||||
|
.map_err(|e| WebTransportError::OnCloseWriter(e))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return type of [`use_webtransport`].
|
/// Return type of [`use_webtransport`].
|
||||||
pub struct UseWebTransportReturn<SendGramsFn>
|
pub struct UseWebTransportReturn {
|
||||||
where
|
transport: Rc<RefCell<Option<web_sys::WebTransport>>>,
|
||||||
SendGramsFn: Fn(Vec<u8>) + Clone + 'static,
|
datagrams_writer: Rc<RefCell<Option<web_sys::WritableStreamDefaultWriter>>>,
|
||||||
{
|
|
||||||
/// The current state of the `WebTransport` connection.
|
/// The current state of the `WebTransport` connection.
|
||||||
pub ready_state: Signal<ConnectionReadyState>,
|
pub ready_state: Signal<ConnectionReadyState>,
|
||||||
/// Latest datagrams message received
|
/// Latest datagrams message received
|
||||||
pub datagrams: Signal<Option<Vec<u8>>>,
|
pub datagrams: Signal<Option<Vec<u8>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UseWebTransportReturn {
|
||||||
|
/// Access to the underlying `WebTransport`
|
||||||
|
pub async fn transport(&self) -> Option<web_sys::WebTransport> {
|
||||||
|
self.transport.borrow().clone()
|
||||||
|
}
|
||||||
|
|
||||||
/// Sends binary data through the datagrams stream
|
/// Sends binary data through the datagrams stream
|
||||||
pub send_datagrams: SendGramsFn,
|
pub fn send_datagrams(&self, data: &[u8]) {
|
||||||
|
if let Some(transport) = self.transport.borrow().as_ref() {
|
||||||
|
let writer =
|
||||||
|
get_or_create_datagrams_writer(Rc::clone(&self.datagrams_writer), transport);
|
||||||
|
|
||||||
|
let arr = js_sys::Uint8Array::from(data);
|
||||||
|
let _ = writer.write_with_chunk(&arr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO : send_datagrams_async
|
||||||
|
|
||||||
|
/// Create a unidirectional send stream
|
||||||
|
pub async fn create_send_stream(&self) -> Result<SendStream, WebTransportError> {
|
||||||
|
if let Some(transport) = self.transport.borrow().as_ref() {
|
||||||
|
let result = JsFuture::from(transport.create_unidirectional_stream())
|
||||||
|
.await
|
||||||
|
.map_err(|e| WebTransportError::FailedToOpenStream(e))?;
|
||||||
|
let stream: web_sys::WritableStream = result.unchecked_into();
|
||||||
|
let writer = stream
|
||||||
|
.get_writer()
|
||||||
|
.map_err(|e| WebTransportError::FailedToOpenWriter(e))?;
|
||||||
|
|
||||||
|
Ok(SendStream { writer })
|
||||||
|
} else {
|
||||||
|
Err(WebTransportError::NotConnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a bidirectional stream
|
||||||
|
pub async fn create_bidir_stream(&self) -> Result<BidirStream, WebTransportError> {
|
||||||
|
if let Some(transport) = self.transport.borrow().as_ref() {
|
||||||
|
let result = JsFuture::from(transport.create_bidirectional_stream())
|
||||||
|
.await
|
||||||
|
.map_err(|e| WebTransportError::FailedToOpenStream(e))?;
|
||||||
|
let stream: web_sys::WebTransportBidirectionalStream = result.unchecked_into();
|
||||||
|
let writer = stream
|
||||||
|
.writable()
|
||||||
|
.get_writer()
|
||||||
|
.map_err(|e| WebTransportError::FailedToOpenWriter(e))?;
|
||||||
|
|
||||||
|
let bytes = Signal::derive({
|
||||||
|
let reader_initialized = Rc::new(Cell::new(false));
|
||||||
|
let ready_state = self.ready_state;
|
||||||
|
let (message_signal, set_message) = create_signal(None::<Vec<u8>>);
|
||||||
|
let stream = stream.clone();
|
||||||
|
|
||||||
|
move || {
|
||||||
|
let stream = stream.clone();
|
||||||
|
|
||||||
|
lazy_initialize_u8_reader(
|
||||||
|
ready_state,
|
||||||
|
Rc::clone(&reader_initialized),
|
||||||
|
move || stream.readable().unchecked_into(),
|
||||||
|
set_message,
|
||||||
|
);
|
||||||
|
|
||||||
|
message_signal.get()
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(BidirStream { writer, bytes })
|
||||||
|
} else {
|
||||||
|
Err(WebTransportError::NotConnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Error enum for [`UseWebTransportOptions::on_error`]
|
/// Error enum for [`UseWebTransportOptions::on_error`]
|
||||||
pub enum WebTransportError {}
|
pub enum WebTransportError {
|
||||||
|
/// The `WebTransport` is not connected yet. Call `open` first.
|
||||||
|
NotConnected,
|
||||||
|
FailedToOpenStream(JsValue),
|
||||||
|
FailedToOpenWriter(JsValue),
|
||||||
|
FailedToOpenReader(JsValue),
|
||||||
|
FailedToRead(JsValue),
|
||||||
|
OnCloseWriter(JsValue),
|
||||||
|
OnCloseReader(JsValue),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum SendError {
|
||||||
|
#[error("Failed to write to stream")]
|
||||||
|
FailedToWrite(JsValue),
|
||||||
|
|
||||||
|
#[cfg(feature = "bincode")]
|
||||||
|
#[error("Serialization failed: {0}")]
|
||||||
|
SerializationFailed(#[from] bincode::Error),
|
||||||
|
|
||||||
|
#[cfg(feature = "msgpack")]
|
||||||
|
#[error("Serialization failed: {0}")]
|
||||||
|
SerializationFailed(#[from] rmp_serde::encode::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum ReceiveError {
|
||||||
|
#[cfg(feature = "bincode")]
|
||||||
|
#[error("Serialization failed: {0}")]
|
||||||
|
SerializationFailed(#[from] bincode::Error),
|
||||||
|
|
||||||
|
#[cfg(feature = "msgpack")]
|
||||||
|
#[error("Serialization failed: {0}")]
|
||||||
|
DeserializationFailed(#[from] rmp_serde::decode::Error),
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue