From b80553751b6fe3cfc149d52f5b08bfbd8938c872 Mon Sep 17 00:00:00 2001 From: Maccesch Date: Tue, 15 Aug 2023 12:15:58 +0100 Subject: [PATCH] ported internal callbacks to Rc RefCells --- src/use_webtransport.rs | 223 +++++++++++++++++++++++----------------- 1 file changed, 131 insertions(+), 92 deletions(-) diff --git a/src/use_webtransport.rs b/src/use_webtransport.rs index f13139c..c856e8d 100644 --- a/src/use_webtransport.rs +++ b/src/use_webtransport.rs @@ -3,13 +3,11 @@ use default_struct_builder::DefaultBuilder; use js_sys::{Reflect, Uint8Array}; use leptos::leptos_dom::helpers::TimeoutHandle; use leptos::*; +use std::cell::{Cell, RefCell}; use std::rc::Rc; use std::time::Duration; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; -use web_sys::{ - ReadableStream, ReadableStreamReaderMode, WebTransport, WritableStreamDefaultWriter, -}; /// /// @@ -50,118 +48,151 @@ pub fn use_webtransport_with_options( let (ready_state, set_ready_state) = create_signal(ConnectionReadyState::Closed); - let transport_ref = store_value(None::); - let datagrams_reader_initialized_ref = store_value(false); - let datagrams_writer_ref = store_value(None::); + let transport = Rc::new(RefCell::new(None::)); + let datagrams_reader_initialized = Rc::new(Cell::new(false)); + let datagrams_writer = Rc::new(RefCell::new(None::)); - let reconnect_timer_ref = store_value(None::); - let reconnect_count_ref = store_value(0_u64); + let reconnect_timer = Rc::new(Cell::new(None::)); + let reconnect_count = Rc::new(Cell::new(0_u64)); + + let unmounted = Rc::new(Cell::new(false)); let connect_ref = store_value(None::>); - let reconnect = Rc::new(move || { - if reconnect_count_ref.get_value() < reconnect_limit - && ready_state.get_untracked() == ConnectionReadyState::Open - { - reconnect_timer_ref.set_value( - set_timeout_with_handle( - move || { - if let Some(connect) = connect_ref.get_value() { - connect(); - reconnect_count_ref.update_value(|current| *current += 1); - } - }, - Duration::from_millis(reconnect_interval), + let reconnect = Rc::new({ + let reconnect_timer = Rc::clone(&reconnect_timer); + let reconnect_count = Rc::clone(&reconnect_count); + + move || { + if reconnect_count.get() < reconnect_limit + && ready_state.get_untracked() == ConnectionReadyState::Open + { + reconnect_timer.set( + set_timeout_with_handle( + move || { + if let Some(connect) = connect_ref.get_value() { + connect(); + reconnect_count.set(reconnect_count.get() + 1); + } + }, + Duration::from_millis(reconnect_interval), + ) + .ok(), ) - .ok(), - ) + } } }); - connect_ref.set_value(Some(Rc::new(move || { - let transport = transport_ref.get_value(); + connect_ref.set_value(Some(Rc::new({ + let transport = Rc::clone(&transport); + let reconnect_timer = Rc::clone(&reconnect_timer); - reconnect_timer_ref.set_value(None); + move || { + let transport = transport.borrow(); - if let Some(transport) = transport.as_ref() { - transport.close(); - } + reconnect_timer.set(None); - let transport = - web_sys::WebTransport::new_with_options(url, &options.into()).unwrap_throw(); - - set_ready_state.set(ConnectionReadyState::Connecting); - - spawn_local(async move { - match JsFuture::from(transport.ready()).await { - Ok(_) => { - set_ready_state.set(ConnectionReadyState::Open); - on_open(); - } - Err(e) => { - // TODO : handle error? - set_ready_state.set(ConnectionReadyState::Closed); - } + if let Some(transport) = transport.as_ref() { + transport.close(); } - }); + + let transport = + web_sys::WebTransport::new_with_options(url, &options.into()).unwrap_throw(); + + set_ready_state.set(ConnectionReadyState::Connecting); + + spawn_local(async move { + match JsFuture::from(transport.ready()).await { + Ok(_) => { + set_ready_state.set(ConnectionReadyState::Open); + on_open(); + } + Err(e) => { + // TODO : handle error? + set_ready_state.set(ConnectionReadyState::Closed); + } + } + }); + } }))); let open = { + let reconnect_count = Rc::clone(&reconnect_count); + move || { - reconnect_count_ref.set_value(0); + reconnect_count.set(0); if let Some(connect) = connect_ref.get_value() { connect(); } } }; - let close = move || { - reconnect_count_ref.set_value(reconnect_limit); - if let Some(transport) = transport_ref.get_value() { - transport.close(); - set_ready_state.set(ConnectionReadyState::Closing); + let close = { + let transport = Rc::clone(&transport); + let reconnect_count = Rc::clone(&reconnect_count); - spawn_local(async move { - let result = JsFuture::from(transport.closed()).await; - set_ready_state.set(ConnectionReadyState::Closed); + move || { + reconnect_count.set(reconnect_limit); - match result { - Ok(_) => { - on_close(); + if let Some(transport) = transport.take() { + transport.close(); + set_ready_state.set(ConnectionReadyState::Closing); + + spawn_local(async move { + let result = JsFuture::from(transport.closed()).await; + set_ready_state.set(ConnectionReadyState::Closed); + + match result { + Ok(_) => { + on_close(); + } + Err(e) => { + // TODO : handle error? + } } - Err(e) => { - // TODO : handle error? - } - } - }); + }); + } } }; let (datagrams_signal, set_datagrams) = create_signal(None::>); - let datagrams = Signal::derive(move || { - // lazily create datagrams reader listener - if ready_state.get() == ConnectionReadyState::Open { - initialize_datagrams_reader( - transport_ref, - datagrams_reader_initialized_ref, + let datagrams = Signal::derive({ + let transport = Rc::clone(&transport); + let datagrams_reader_initialized = Rc::clone(&datagrams_reader_initialized); + + move || { + lazy_initialize_reader( + ready_state, + transport, + datagrams_reader_initialized, set_datagrams, ); - } - datagrams_signal.get() + datagrams_signal.get() + } }); - let send_datagrams = move |data| { - if let Some(transport) = transport_ref.get_value() { - let writer = get_or_create_datagrams_writer(datagrams_writer_ref, transport); + let send_datagrams = { + let transport = Rc::clone(&transport); + let datagrams_writer = Rc::clone(&datagrams_writer); - let _ = writer.write_with_chunk(&data.into()); + move |data| { + if let Some(transport) = transport.borrow().as_ref() { + let writer = get_or_create_datagrams_writer(datagrams_writer, transport); + + let _ = writer.write_with_chunk(&data.into()); + } } }; // TODO : reliable streams + on_cleanup(move || { + unmounted.set(true); + close(); + }); + UseWebTransportReturn { ready_state: ready_state.into(), datagrams, @@ -170,10 +201,10 @@ pub fn use_webtransport_with_options( } fn get_or_create_datagrams_writer( - datagrams_writer_ref: StoredValue>, - transport: WebTransport, -) -> WritableStreamDefaultWriter { - if let Some(writer) = datagrams_writer_ref.get_value() { + datagrams_writer: Rc>>, + transport: &web_sys::WebTransport, +) -> web_sys::WritableStreamDefaultWriter { + if let Some(writer) = datagrams_writer.borrow().clone() { writer } else { let writer = transport @@ -181,36 +212,39 @@ fn get_or_create_datagrams_writer( .writable() .get_writer() .expect("should be able to get the writer"); - datagrams_writer_ref.set_value(Some(writer.clone())); + datagrams_writer.replace(Some(writer.clone())); writer } } -fn initialize_datagrams_reader( - transport_ref: StoredValue>, - datagrams_reader_initialized_ref: StoredValue, +fn lazy_initialize_reader( + ready_state: ReadSignal, + transport: Rc>>, + initialized: Rc>, set_datagrams: WriteSignal>>, ) { - if !datagrams_reader_initialized_ref.get_value() { - if let Some(transport) = transport_ref.get_value() { - datagrams_reader_initialized_ref.set_value(true); + if ready_state.get() == ConnectionReadyState::Open { + if !initialized.get() { + if let Some(transport) = transport.borrow().as_ref() { + initialized.set(true); - listen_to_stream( - transport.datagrams().readable(), - move || datagrams_reader_initialized_ref.set_value(false), - set_datagrams, - ); + listen_to_stream( + transport.datagrams().readable(), + move || initialized.set(false), + set_datagrams, + ); + } } } } fn listen_to_stream( - readable_stream: ReadableStream, + readable_stream: web_sys::ReadableStream, on_done: fn(), set_signal: WriteSignal>>, ) { let mut reader_options = web_sys::ReadableStreamGetReaderOptions::new(); - reader_options.mode(ReadableStreamReaderMode::Byob); + reader_options.mode(web_sys::ReadableStreamReaderMode::Byob); let reader: web_sys::ReadableStreamByobReader = readable_stream .get_reader_with_options(&reader_options) @@ -257,14 +291,19 @@ fn listen_to_stream( pub struct UseWebTransportOptions { /// Callback when `WebTransport` is ready. on_open: Rc, + /// Error callback. on_error: Rc, + /// Callback when `WebTransport` is closed. on_close: Rc, + /// Retry times. Defaults to 3. reconnect_limit: u64, + /// Retry interval in ms. Defaults to 3000. reconnect_interval: u64, + /// If `true` the `WebSocket` connection will immediately be opened when calling this function. /// If `false` you have to manually call the `open` function. /// Defaults to `true`.