leptos-use/src/use_webtransport.rs

684 lines
20 KiB
Rust
Raw Normal View History

use crate::core::ConnectionReadyState;
2023-08-22 14:34:04 +01:00
use async_trait::async_trait;
use cfg_if::cfg_if;
use default_struct_builder::DefaultBuilder;
2023-08-22 14:34:04 +01:00
use js_sys::Reflect;
use leptos::leptos_dom::helpers::TimeoutHandle;
use leptos::*;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
2023-08-10 21:39:32 +01:00
use std::time::Duration;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
2023-08-22 14:34:04 +01:00
#[cfg(any(feature = "bincode", feature = "msgpack"))]
use serde::{Deserialize, Serialize};
#[cfg(feature = "msgpack")]
use rmp_serde::{from_slice, to_vec};
use thiserror::Error;
#[cfg(feature = "bincode")]
use bincode::serde::{decode_from_slice as from_slice, encode_to_vec as to_vec};
///
///
/// ## Demo
///
/// [Link to Demo](https://github.com/Synphonyte/leptos-use/tree/main/examples/use_webtransport)
///
/// ## Usage
///
/// ```
/// # use leptos::*;
/// # use leptos_use::use_webtransport;
/// #
/// # #[component]
/// # fn Demo() -> impl IntoView {
/// use_webtransport();
/// #
/// # view! { }
/// # }
/// ```
2023-08-22 14:34:04 +01:00
pub fn use_webtransport(url: &str) -> UseWebTransportReturn {
use_webtransport_with_options(url, UseWebTransportOptions::default())
}
/// Version of [`use_webtransport`] that takes a `UseWebtransportOptions`. See [`use_webtransport`] for how to use.
pub fn use_webtransport_with_options(
url: &str,
options: UseWebTransportOptions,
2023-08-22 14:34:04 +01:00
) -> UseWebTransportReturn {
let UseWebTransportOptions {
on_open,
2023-08-31 20:12:48 +01:00
// on_error,
on_close,
2023-08-22 14:34:04 +01:00
on_receive_stream,
on_bidir_stream,
reconnect_limit,
reconnect_interval,
immediate,
} = options;
2023-08-22 14:34:04 +01:00
let url = url.to_string();
let (ready_state, set_ready_state) = create_signal(ConnectionReadyState::Closed);
2023-08-22 14:34:04 +01:00
let ready_state: Signal<_> = ready_state.into();
let transport = Rc::new(RefCell::new(None::<web_sys::WebTransport>));
let datagrams_reader_initialized = Rc::new(Cell::new(false));
let datagrams_writer = Rc::new(RefCell::new(None::<web_sys::WritableStreamDefaultWriter>));
let reconnect_timer = Rc::new(Cell::new(None::<TimeoutHandle>));
let reconnect_count = Rc::new(Cell::new(0_u64));
let unmounted = Rc::new(Cell::new(false));
2023-08-10 21:39:32 +01:00
let connect_ref = store_value(None::<Rc<dyn Fn()>>);
let reconnect = Rc::new({
let reconnect_timer = Rc::clone(&reconnect_timer);
let reconnect_count = Rc::clone(&reconnect_count);
move || {
if reconnect_count.get() < reconnect_limit
&& ready_state.get_untracked() == ConnectionReadyState::Open
{
reconnect_timer.set(
set_timeout_with_handle(
move || {
if let Some(connect) = connect_ref.get_value() {
connect();
reconnect_count.set(reconnect_count.get() + 1);
}
},
Duration::from_millis(reconnect_interval),
)
.ok(),
2023-08-10 21:39:32 +01:00
)
}
2023-08-10 21:39:32 +01:00
}
});
connect_ref.set_value(Some(Rc::new({
let transport = Rc::clone(&transport);
let reconnect_timer = Rc::clone(&reconnect_timer);
2023-08-22 14:34:04 +01:00
let on_open = Rc::clone(&on_open);
move || {
reconnect_timer.set(None);
2023-08-31 18:26:03 +01:00
if let Some(transport) = transport.borrow().as_ref() {
transport.close();
}
2023-08-22 14:34:04 +01:00
let options = web_sys::WebTransportOptions::new();
2023-08-31 18:26:03 +01:00
transport.replace(Some(
web_sys::WebTransport::new_with_options(&url, &options).unwrap_throw(),
));
set_ready_state.set(ConnectionReadyState::Connecting);
2023-08-22 14:34:04 +01:00
spawn_local({
let on_open = Rc::clone(&on_open);
2023-08-31 18:26:03 +01:00
let transport = Rc::clone(&transport);
2023-08-22 14:34:04 +01:00
async move {
2023-08-31 18:26:03 +01:00
let transport = transport.borrow();
let transport = transport.as_ref().expect("Transport should be set");
2023-08-22 14:34:04 +01:00
match JsFuture::from(transport.ready()).await {
Ok(_) => {
set_ready_state.set(ConnectionReadyState::Open);
on_open();
}
Err(e) => {
// TODO : handle error?
set_ready_state.set(ConnectionReadyState::Closed);
}
}
}
});
}
2023-08-10 21:39:32 +01:00
})));
let open = {
let reconnect_count = Rc::clone(&reconnect_count);
move || {
reconnect_count.set(0);
2023-08-10 21:39:32 +01:00
if let Some(connect) = connect_ref.get_value() {
connect();
}
}
};
2023-08-31 04:42:01 +01:00
let on_closed = {
let reconnect = Rc::clone(&reconnect);
let unmounted = Rc::clone(&unmounted);
move || {
if unmounted.get() {
return;
}
// TODO
// reconnect();
}
};
let close = {
let transport = Rc::clone(&transport);
let reconnect_count = Rc::clone(&reconnect_count);
2023-08-10 21:39:32 +01:00
move || {
reconnect_count.set(reconnect_limit);
2023-08-10 21:39:32 +01:00
if let Some(transport) = transport.take() {
transport.close();
set_ready_state.set(ConnectionReadyState::Closing);
spawn_local(async move {
let result = JsFuture::from(transport.closed()).await;
set_ready_state.set(ConnectionReadyState::Closed);
match result {
Ok(_) => {
on_close();
}
Err(e) => {
// TODO : handle error?
}
2023-08-10 21:39:32 +01:00
}
});
}
}
};
let (datagrams_signal, set_datagrams) = create_signal(None::<Vec<u8>>);
let datagrams = Signal::derive({
let transport = Rc::clone(&transport);
let datagrams_reader_initialized = Rc::clone(&datagrams_reader_initialized);
move || {
2023-08-22 14:34:04 +01:00
let transport = Rc::clone(&transport);
lazy_initialize_u8_reader(
ready_state,
2023-08-22 14:34:04 +01:00
Rc::clone(&datagrams_reader_initialized),
move || {
transport
.borrow()
.as_ref()
.expect("transport should be set a this point")
.datagrams()
.readable()
},
set_datagrams,
);
datagrams_signal.get()
}
});
// TODO : reliable streams
2023-08-31 04:42:01 +01:00
{
let unmounted = Rc::clone(&unmounted);
on_cleanup(move || {
unmounted.set(true);
close();
});
}
2023-08-31 18:26:03 +01:00
if immediate {
open();
}
UseWebTransportReturn {
2023-08-22 14:34:04 +01:00
transport,
ready_state,
datagrams,
2023-08-22 14:34:04 +01:00
datagrams_writer,
}
}
fn get_or_create_datagrams_writer(
datagrams_writer: Rc<RefCell<Option<web_sys::WritableStreamDefaultWriter>>>,
transport: &web_sys::WebTransport,
) -> web_sys::WritableStreamDefaultWriter {
2023-08-31 18:26:03 +01:00
let writer = datagrams_writer.borrow().clone();
if let Some(writer) = writer {
writer
} else {
let writer = transport
.datagrams()
.writable()
.get_writer()
.expect("should be able to get the writer");
datagrams_writer.replace(Some(writer.clone()));
writer
2023-08-10 21:39:32 +01:00
}
}
2023-08-22 14:34:04 +01:00
fn lazy_initialize_u8_reader(
ready_state: Signal<ConnectionReadyState>,
initialized: Rc<Cell<bool>>,
get_readable_stream: impl Fn() -> web_sys::ReadableStream,
set_signal: WriteSignal<Option<Vec<u8>>>,
) {
lazy_initialize_reader(
ready_state,
initialized,
get_readable_stream,
move |value| {
let value: js_sys::Uint8Array = value.into();
set_signal.set(Some(value.to_vec()));
},
);
}
fn lazy_initialize_reader(
2023-08-22 14:34:04 +01:00
ready_state: Signal<ConnectionReadyState>,
initialized: Rc<Cell<bool>>,
2023-08-22 14:34:04 +01:00
get_readable_stream: impl Fn() -> web_sys::ReadableStream,
on_value: impl Fn(JsValue) + 'static,
) {
if ready_state.get() == ConnectionReadyState::Open {
if !initialized.get() {
2023-08-22 14:34:04 +01:00
initialized.set(true);
2023-08-31 04:42:01 +01:00
listen_to_stream(get_readable_stream(), on_value, move || {
initialized.set(false)
});
}
}
}
fn listen_to_stream(
readable_stream: web_sys::ReadableStream,
2023-08-22 14:34:04 +01:00
on_value: impl Fn(JsValue) + 'static,
2023-08-31 04:42:01 +01:00
on_done: impl Fn() + 'static,
) {
2023-08-31 18:26:03 +01:00
let reader: web_sys::ReadableStreamDefaultReader =
readable_stream.get_reader().unchecked_into();
spawn_local(async move {
loop {
2023-08-31 18:26:03 +01:00
let result = JsFuture::from(reader.read()).await;
match result {
Ok(result) => {
let done = Reflect::get(&result, &"done".into())
.expect("done should always be there")
.as_bool()
.unwrap_or(true);
if done {
// TODO : close connection?
break;
}
2023-08-22 14:34:04 +01:00
let value = Reflect::get(&result, &"value".into())
.expect("if not done there should be a value");
2023-08-22 14:34:04 +01:00
on_value(value);
}
Err(..) => {
// TODO : error handling?
break;
}
}
}
on_done();
});
}
/// Options for [`use_webtransport_with_options`].
#[derive(DefaultBuilder)]
pub struct UseWebTransportOptions {
/// Callback when `WebTransport` is ready.
2023-08-10 21:39:32 +01:00
on_open: Rc<dyn Fn()>,
/// Error callback.
2023-08-31 20:12:48 +01:00
// TODO : ? on_error: Rc<dyn Fn(WebTransportError)>,
/// Callback when `WebTransport` is closed.
2023-08-10 21:39:32 +01:00
on_close: Rc<dyn Fn()>,
2023-08-22 14:34:04 +01:00
/// Callback when the server opens a one-way stream.
on_receive_stream: Rc<dyn Fn(ReceiveStream)>,
/// Callback when the server opens a bidirectional stream.
on_bidir_stream: Rc<dyn Fn(BidirStream)>,
/// Retry times. Defaults to 3.
reconnect_limit: u64,
/// Retry interval in ms. Defaults to 3000.
reconnect_interval: u64,
/// If `true` the `WebSocket` connection will immediately be opened when calling this function.
/// If `false` you have to manually call the `open` function.
/// Defaults to `true`.
immediate: bool,
}
impl Default for UseWebTransportOptions {
fn default() -> Self {
Self {
2023-08-10 21:39:32 +01:00
on_open: Rc::new(|| {}),
2023-08-31 20:12:48 +01:00
// on_error: Rc::new(|_| {}),
2023-08-10 21:39:32 +01:00
on_close: Rc::new(|| {}),
2023-08-22 14:34:04 +01:00
on_receive_stream: Rc::new(|_| {}),
on_bidir_stream: Rc::new(|_| {}),
reconnect_limit: 3,
reconnect_interval: 3000,
immediate: true,
}
}
}
/// Wether the stream is open or closed
pub enum StreamState {
Open,
Closed,
}
2023-08-22 14:34:04 +01:00
#[async_trait(?Send)]
/// Trait to close a stream
2023-08-22 14:34:04 +01:00
pub trait CloseableStream {
/// Getter for the stream state (open/closed)
fn state(&self) -> Signal<StreamState>;
/// Close the stream ignoring any potential errors
2023-08-22 14:34:04 +01:00
fn close(&self);
/// Close the stream asynchronously with a result providing potential errors
2023-08-22 14:34:04 +01:00
async fn close_async(&self) -> Result<(), WebTransportError>;
}
#[async_trait(?Send)]
/// Trait to send data in a stream
2023-08-22 14:34:04 +01:00
pub trait SendableStream {
/// Getter for the stream writer
2023-08-22 14:34:04 +01:00
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter;
2023-08-31 20:12:48 +01:00
/// Send data in the form of bytes ignoring potential errors
2023-08-22 14:34:04 +01:00
fn send_bytes(&self, data: &[u8]) {
let arr = js_sys::Uint8Array::from(data);
let _ = self.writer().write_with_chunk(&arr);
}
/// Send data in the form of bytes asynchronously with a result providing potential errors
2023-08-22 14:34:04 +01:00
async fn send_bytes_async(&self, data: &[u8]) -> Result<(), SendError> {
let arr = js_sys::Uint8Array::from(data);
let _ = JsFuture::from(self.writer().write_with_chunk(&arr))
.await
.map_err(|e| SendError::FailedToWrite(e));
Ok(())
}
#[cfg(any(feature = "msgpack", feature = "bincode"))]
/// Send data in the form of a serializable object ignoring potential errors
/// Requires the feature `msgpack` or `bincode`
2023-08-22 14:34:04 +01:00
fn send<T: Serialize>(&self, data: &T) {
self.send_bytes(
to_vec(data)
.expect("Serialization should not fail")
.as_slice(),
);
}
#[cfg(any(feature = "msgpack", feature = "bincode"))]
/// Send data in the form of a serializable object asynchronously with a result providing potential errors
/// Requires the feature `msgpack` or `bincode`
2023-08-22 14:34:04 +01:00
async fn send_async<T: Serialize>(&self, data: &T) -> Result<(), SendError> {
let serialized = to_vec(data)?;
self.send_bytes_async(&serialized).await
}
}
2023-08-31 20:12:48 +01:00
#[derive(Clone, Debug)]
/// Stream for sending data
2023-08-22 14:34:04 +01:00
pub struct SendStream {
writer: web_sys::WritableStreamDefaultWriter,
state: Signal<StreamState>,
set_state: WriteSignal<StreamState>,
2023-08-22 14:34:04 +01:00
}
#[async_trait(?Send)]
impl SendableStream for SendStream {
#[inline(always)]
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter {
&self.writer
}
}
#[async_trait(?Send)]
impl CloseableStream for SendStream {
#[inline(always)]
fn state(&self) -> Signal<StreamState> {
self.state
}
#[inline(always)]
2023-08-22 14:34:04 +01:00
fn close(&self) {
let _ = self.writer.close();
}
async fn close_async(&self) -> Result<(), WebTransportError> {
let _ = JsFuture::from(self.writer.close())
.await
.map_err(|e| WebTransportError::OnCloseWriter(e))?;
Ok(())
}
}
2023-08-31 20:12:48 +01:00
#[derive(Clone, Debug)]
/// Stream for receiving data
2023-08-22 14:34:04 +01:00
pub struct ReceiveStream {
2023-08-31 18:26:03 +01:00
pub reader: web_sys::ReadableStreamDefaultReader,
2023-08-22 14:34:04 +01:00
pub message: Signal<Option<Vec<u8>>>,
2023-08-31 20:12:48 +01:00
// pub close: Rc<dyn Fn()>,
state: Signal<StreamState>,
set_state: WriteSignal<StreamState>,
2023-08-22 14:34:04 +01:00
}
2023-08-31 20:12:48 +01:00
// TODO : implement ReceiveStream
#[derive(Clone, Debug)]
/// Bidirectional stream for sending and receiving data
2023-08-22 14:34:04 +01:00
pub struct BidirStream {
writer: web_sys::WritableStreamDefaultWriter,
2023-08-22 14:34:04 +01:00
pub bytes: Signal<Option<Vec<u8>>>,
state: Signal<StreamState>,
set_state: WriteSignal<StreamState>,
2023-08-22 14:34:04 +01:00
}
cfg_if! { if #[cfg(any(feature = "msgpack", feature = "bincode"))] {
impl BidirStream {
pub fn receive<T: for <'a> Deserialize<'a>>(&self) -> Signal<Option<T>> {
let bytes = self.bytes;
Signal::derive(move || {
bytes.get().map(|bytes| from_slice(bytes.as_slice()).expect("Deserialization should not fail"))
})
}
pub fn try_receive<T: for <'a> Deserialize<'a>>(&self) -> Signal<Option<Result<T, ReceiveError>>> {
let bytes = self.bytes;
Signal::derive(move || {
bytes.get().map(|bytes| Ok(from_slice(bytes.as_slice())?))
})
}
}
}}
#[async_trait(?Send)]
impl CloseableStream for BidirStream {
#[inline(always)]
fn state(&self) -> Signal<StreamState> {
self.state
}
#[inline(always)]
2023-08-22 14:34:04 +01:00
fn close(&self) {
let _ = self.writer.close();
}
async fn close_async(&self) -> Result<(), WebTransportError> {
let _ = JsFuture::from(self.writer.close())
.await
.map_err(|e| WebTransportError::OnCloseWriter(e))?;
Ok(())
}
}
2023-08-31 20:12:48 +01:00
#[async_trait(?Send)]
impl SendableStream for BidirStream {
#[inline(always)]
fn writer(&self) -> &web_sys::WritableStreamDefaultWriter {
&self.writer
}
}
/// Return type of [`use_webtransport`].
2023-08-31 04:42:01 +01:00
#[derive(Clone, Debug)]
2023-08-22 14:34:04 +01:00
pub struct UseWebTransportReturn {
transport: Rc<RefCell<Option<web_sys::WebTransport>>>,
datagrams_writer: Rc<RefCell<Option<web_sys::WritableStreamDefaultWriter>>>,
/// The current state of the `WebTransport` connection.
pub ready_state: Signal<ConnectionReadyState>,
/// Latest datagrams message received
pub datagrams: Signal<Option<Vec<u8>>>,
2023-08-22 14:34:04 +01:00
}
impl UseWebTransportReturn {
/// Access to the underlying `WebTransport`
pub async fn transport(&self) -> Option<web_sys::WebTransport> {
self.transport.borrow().clone()
}
/// Sends binary data through the datagrams stream
2023-08-22 14:34:04 +01:00
pub fn send_datagrams(&self, data: &[u8]) {
if let Some(transport) = self.transport.borrow().as_ref() {
let writer =
get_or_create_datagrams_writer(Rc::clone(&self.datagrams_writer), transport);
let arr = js_sys::Uint8Array::from(data);
let _ = writer.write_with_chunk(&arr);
}
}
// TODO : send_datagrams_async
/// Create a unidirectional send stream
pub async fn create_send_stream(&self) -> Result<SendStream, WebTransportError> {
if let Some(transport) = self.transport.borrow().as_ref() {
let result = JsFuture::from(transport.create_unidirectional_stream())
.await
.map_err(|e| WebTransportError::FailedToOpenStream(e))?;
let stream: web_sys::WritableStream = result.unchecked_into();
let writer = stream
.get_writer()
.map_err(|e| WebTransportError::FailedToOpenWriter(e))?;
let (state, set_state) = create_signal(StreamState::Open);
Ok(SendStream {
writer,
state: state.into(),
set_state,
})
2023-08-22 14:34:04 +01:00
} else {
Err(WebTransportError::NotConnected)
}
}
/// Create a bidirectional stream
pub async fn create_bidir_stream(&self) -> Result<BidirStream, WebTransportError> {
if let Some(transport) = self.transport.borrow().as_ref() {
let result = JsFuture::from(transport.create_bidirectional_stream())
.await
.map_err(|e| WebTransportError::FailedToOpenStream(e))?;
let stream: web_sys::WebTransportBidirectionalStream = result.unchecked_into();
let writer = stream
.writable()
.get_writer()
.map_err(|e| WebTransportError::FailedToOpenWriter(e))?;
let bytes = Signal::derive({
let reader_initialized = Rc::new(Cell::new(false));
let ready_state = self.ready_state;
let (message_signal, set_message) = create_signal(None::<Vec<u8>>);
let stream = stream.clone();
move || {
let stream = stream.clone();
lazy_initialize_u8_reader(
ready_state,
Rc::clone(&reader_initialized),
move || stream.readable().unchecked_into(),
set_message,
);
message_signal.get()
}
});
Ok(BidirStream { writer, bytes })
} else {
Err(WebTransportError::NotConnected)
}
}
}
/// Error enum for [`UseWebTransportOptions::on_error`]
2023-08-31 18:26:03 +01:00
#[derive(Debug)]
2023-08-22 14:34:04 +01:00
pub enum WebTransportError {
/// The `WebTransport` is not connected yet. Call `open` first.
NotConnected,
FailedToOpenStream(JsValue),
FailedToOpenWriter(JsValue),
FailedToOpenReader(JsValue),
FailedToRead(JsValue),
OnCloseWriter(JsValue),
OnCloseReader(JsValue),
}
2023-08-31 20:12:48 +01:00
/// Error enum for [`SendStream::send`]
2023-08-22 14:34:04 +01:00
#[derive(Error, Debug)]
pub enum SendError {
#[error("Failed to write to stream")]
FailedToWrite(JsValue),
#[cfg(feature = "bincode")]
#[error("Serialization failed: {0}")]
SerializationFailed(#[from] bincode::Error),
#[cfg(feature = "msgpack")]
#[error("Serialization failed: {0}")]
SerializationFailed(#[from] rmp_serde::encode::Error),
}
#[derive(Error, Debug)]
pub enum ReceiveError {
#[cfg(feature = "bincode")]
#[error("Serialization failed: {0}")]
SerializationFailed(#[from] bincode::Error),
#[cfg(feature = "msgpack")]
#[error("Serialization failed: {0}")]
DeserializationFailed(#[from] rmp_serde::decode::Error),
}