basic test for server opened bidir streams

This commit is contained in:
Maccesch 2023-09-18 01:01:36 +01:00
parent b146bb864e
commit d79b594391
8 changed files with 88 additions and 76 deletions

View file

@ -14,6 +14,7 @@ use stream_send::*;
#[component]
fn Demo() -> impl IntoView {
let (datagrams_log, set_datagrams_log) = create_signal(vec![]);
let (bidir_streams, set_bidir_streams) = create_signal(vec![]);
let id = store_value(0);
@ -25,6 +26,14 @@ fn Demo() -> impl IntoView {
})
.on_close(move || {
set_datagrams_log.update(|log| log.push("Connection closed".to_string()))
})
.on_bidir_stream(move |bidir_stream| {
let i = id.get_value();
id.set_value(i + 1);
set_datagrams_log
.update(|log| log.push("Server opened bidirectional stream".to_string()));
set_bidir_streams.update(|s| s.push((i, bidir_stream, "server")));
}),
);
@ -56,8 +65,6 @@ fn Demo() -> impl IntoView {
false,
);
let (bidir_streams, set_bidir_streams) = create_signal(vec![]);
let on_open_bidir_stream = {
let transport = transport.clone();
@ -70,7 +77,7 @@ fn Demo() -> impl IntoView {
let i = id.get_value();
id.set_value(i + 1);
set_bidir_streams.update(|s| s.push((i, bidir_stream)));
set_bidir_streams.update(|s| s.push((i, bidir_stream, "client")));
}
Err(e) => {
set_datagrams_log.update(|log| {
@ -95,8 +102,14 @@ fn Demo() -> impl IntoView {
<h2>Bidir Streams</h2>
<For
each=bidir_streams
key=|(i, _)| *i
view=move |(_, bidir_stream)| view! { <StreamBidir ready_state=ready_state stream=bidir_stream.clone() /> }
key=|(i, _, _)| *i
view=move |(i, bidir_stream, opened_by)| view! {
<StreamBidir
ready_state=ready_state
stream=bidir_stream.clone()
opened_by=opened_by
/>
}
/>
}
}

View file

@ -1,12 +1,13 @@
use crate::{LogDisplay, StreamSend};
use leptos::*;
use leptos_use::core::ConnectionReadyState;
use leptos_use::BidirStream;
use leptos_use::{BidirStream, CloseableStream, StreamState};
#[component]
pub fn StreamBidir(
#[prop(into)] ready_state: Signal<ConnectionReadyState>,
stream: BidirStream,
opened_by: &'static str,
) -> impl IntoView {
let (log, set_log) = create_signal(vec![]);
@ -20,7 +21,7 @@ pub fn StreamBidir(
if let Some(bytes) = bytes {
set_log.update(|log| {
log.push(format!(
"Received datagrams: '{}'",
"Received bidir: '{}'",
String::from_utf8(bytes.clone()).expect("valid utf8")
))
});
@ -29,8 +30,20 @@ pub fn StreamBidir(
false,
);
let on_close = {
let stream = stream.clone();
move |_| {
stream.close();
}
};
view! {
<StreamSend ready_state=ready_state send_stream=stream.clone() on_send=on_send />
<p>Opened by {opened_by}</p>
<StreamSend ready_state=ready_state stream_state=stream.state() send_stream=stream.clone() on_send=on_send />
<LogDisplay log=log />
// TODO : make component out of this:
<p>Stream state: { let stream = stream.clone(); move || format!("{:?}", stream.state().get()) }</p>
<button on:click=on_close disabled=move || ready_state() != ConnectionReadyState::Open || stream.state().get() != StreamState::Open>Close</button>
}
}

View file

@ -1,10 +1,11 @@
use leptos::*;
use leptos_use::core::ConnectionReadyState;
use leptos_use::SendableStream;
use leptos_use::{SendableStream, StreamState};
#[component]
pub fn StreamSend<S, F>(
#[prop(into)] ready_state: Signal<ConnectionReadyState>,
stream_state: Signal<StreamState>,
send_stream: S,
on_send: F,
) -> impl IntoView
@ -24,6 +25,6 @@ where
view! {
<textarea on:change=move |e| set_text(event_target_value(&e)) prop:value=text />
<button on:click=on_send disabled=move || ready_state() != ConnectionReadyState::Open>"Send"</button>
<button on:click=on_send disabled=move || ready_state() != ConnectionReadyState::Open || stream_state() != StreamState::Open>"Send"</button>
}
}

View file

@ -0,0 +1,3 @@
*.crt
*.key
*.pem

View file

@ -1,9 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIBNTCB3KADAgECAghq294raE5e9TAKBggqhkjOPQQDAjAUMRIwEAYDVQQDDAls
b2NhbGhvc3QwHhcNMjMwOTEzMDEyNzIxWhcNMjMwOTE3MDEyNzIxWjAUMRIwEAYD
VQQDDAlsb2NhbGhvc3QwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARel++iw3/0
CPZTew09XTEmyKWGyUW7clEok17gb6BIqVZHy5pZ+P1F/qHSSuJtzZ+WFueVS1ZO
MCDfQql88sIyoxgwFjAUBgNVHREEDTALgglsb2NhbGhvc3QwCgYIKoZIzj0EAwID
SAAwRQIhAMQi1xToDZY8y4zZMOk7JJ3qqFOei6JkNcZf/68Zdgk3AiBCbPgLxaIw
n4bYLm0yqzQ5jzaJcOkrLMBQakaHJMyFoA==
-----END CERTIFICATE-----

View file

@ -1,5 +0,0 @@
-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgsOnsJtFGWK8w011r
400WvxgSScmnnAWFyuQ+o/dhvDGhRANCAARel++iw3/0CPZTew09XTEmyKWGyUW7
clEok17gb6BIqVZHy5pZ+P1F/qHSSuJtzZ+WFueVS1ZOMCDfQql88sIy
-----END PRIVATE KEY-----

View file

@ -1,50 +0,0 @@
use base64::engine::general_purpose::STANDARD as Base64Engine;
use base64::Engine;
use rcgen::CertificateParams;
use rcgen::DistinguishedName;
use rcgen::DnType;
use rcgen::KeyPair;
use rcgen::PKCS_ECDSA_P256_SHA256;
use ring::digest::digest;
use ring::digest::SHA256;
use std::fs;
use std::io::Write;
use time::Duration;
use time::OffsetDateTime;
fn main() {
const COMMON_NAME: &str = "localhost";
let mut dname = DistinguishedName::new();
dname.push(DnType::CommonName, COMMON_NAME);
let keypair = KeyPair::generate(&PKCS_ECDSA_P256_SHA256).unwrap();
let digest = digest(&SHA256, &keypair.public_key_der());
let mut cert_params = CertificateParams::new(vec![COMMON_NAME.to_string()]);
cert_params.distinguished_name = dname;
cert_params.alg = &PKCS_ECDSA_P256_SHA256;
cert_params.key_pair = Some(keypair);
cert_params.not_before = OffsetDateTime::now_utc()
.checked_sub(Duration::days(2))
.unwrap();
cert_params.not_after = OffsetDateTime::now_utc()
.checked_add(Duration::days(2))
.unwrap();
let certificate = rcgen::Certificate::from_params(cert_params).unwrap();
fs::File::create("cert.pem")
.unwrap()
.write_all(certificate.serialize_pem().unwrap().as_bytes())
.unwrap();
fs::File::create("key.pem")
.unwrap()
.write_all(certificate.serialize_private_key_pem().as_bytes())
.unwrap();
println!("Certificate generated");
println!("Fingerprint: {}", Base64Engine.encode(digest));
}

View file

@ -1,5 +1,6 @@
use anyhow::Result;
use std::time::Duration;
use tokio::time::sleep;
use tracing::error;
use tracing::info;
use tracing::info_span;
@ -8,8 +9,8 @@ use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::EnvFilter;
use wtransport::endpoint::IncomingSession;
use wtransport::tls::Certificate;
use wtransport::Endpoint;
use wtransport::ServerConfig;
use wtransport::{Connection, Endpoint};
#[tokio::main]
async fn main() -> Result<()> {
@ -17,7 +18,7 @@ async fn main() -> Result<()> {
let config = ServerConfig::builder()
.with_bind_default(4433)
.with_certificate(Certificate::load("cert.pem", "key.pem")?)
.with_certificate(Certificate::load("cert.crt", "cert.key")?)
.keep_alive_interval(Some(Duration::from_secs(3)))
.build();
@ -55,6 +56,9 @@ async fn handle_connection_impl(incoming_session: IncomingSession) -> Result<()>
info!("Waiting for data from client...");
open_bidir_stream(&connection).await?;
open_uni_stream(&connection).await?;
loop {
tokio::select! {
stream = connection.accept_bi() => {
@ -100,6 +104,48 @@ async fn handle_connection_impl(incoming_session: IncomingSession) -> Result<()>
}
}
async fn open_bidir_stream(connection: &Connection) -> Result<()> {
sleep(Duration::from_millis(2000)).await;
let mut buffer = vec![0; 65536].into_boxed_slice();
let mut stream = connection.open_bi().await?.await?;
info!("Opened BI stream");
stream.0.write_all(b"Hello from server").await?;
info!("Sent 'Hello from server' to client");
let bytes_read = match stream.1.read(&mut buffer).await? {
Some(bytes_read) => bytes_read,
None => {
info!("Server opened bidir stream closed");
return Ok(());
}
};
let str_data = std::str::from_utf8(&buffer[..bytes_read])?;
info!("Received (bi) '{str_data}' from client");
stream.0.write_all(b"ACK").await?;
Ok(())
}
async fn open_uni_stream(connection: &Connection) -> Result<()> {
sleep(Duration::from_millis(2000)).await;
let mut stream = connection.open_uni().await?.await?;
info!("Opened UNI stream");
stream.write_all(b"Hello from server").await?;
info!("Sent 'Hello from server' to client");
Ok(())
}
fn init_logging() {
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())