better messaging
This commit is contained in:
parent
4fb917eb08
commit
ee3c392f9f
5 changed files with 109 additions and 68 deletions
|
@ -60,10 +60,13 @@ impl GameHandler {
|
||||||
.is_some()
|
.is_some()
|
||||||
{
|
{
|
||||||
// Broadcast game browser update
|
// Broadcast game browser update
|
||||||
self.state
|
if let Err(e) = self
|
||||||
|
.state
|
||||||
.broadcast_tx
|
.broadcast_tx
|
||||||
.send(meta_games_browser_update(&self.state))
|
.send(meta_games_browser_update(&self.state))
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Error broadcasting game browser update: {}", e);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tracing::error!("User tried to delete a nonexistent game!");
|
tracing::error!("User tried to delete a nonexistent game!");
|
||||||
}
|
}
|
||||||
|
@ -112,17 +115,24 @@ impl GameHandler {
|
||||||
let message = ChatMessage { text: err };
|
let message = ChatMessage { text: err };
|
||||||
let tx = self.state.users_tx.clone();
|
let tx = self.state.users_tx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _ = tx.send(DmUser(SendChatMessage(message), Addr(addr))).await;
|
if let Err(e) =
|
||||||
|
tx.send(DmUser(SendChatMessage(message), Addr(addr))).await
|
||||||
|
{
|
||||||
|
tracing::error!("Could not send message: {}", e);
|
||||||
|
};
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Ok(None) => tokio::spawn(async move { tracing::debug!("None") }),
|
Ok(None) => tokio::spawn(async move { tracing::debug!("TODO") }),
|
||||||
Ok(Some((judge_round, czar_id))) => {
|
Ok(Some((judge_round, czar_id))) => {
|
||||||
let tx = self.state.users_tx.clone();
|
let tx = self.state.users_tx.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _ = tx
|
if let Err(e) = tx
|
||||||
.send(DmUser(SendJudgeRound(judge_round), Id(czar_id)))
|
.send(DmUser(SendJudgeRound(judge_round), Id(czar_id)))
|
||||||
.await;
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!("Could not send message: {}", e);
|
||||||
|
};
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -188,22 +198,27 @@ impl GameHandler {
|
||||||
tracing::error!("User tried to join a nonexistent game!");
|
tracing::error!("User tried to join a nonexistent game!");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
tracing::debug!("{:#?}", self.state.games_by_user.read().unwrap());
|
|
||||||
|
|
||||||
// Send updates for all players
|
// Send updates for all players
|
||||||
self.send_game_meta_update(vec![game_id.clone()]);
|
self.send_game_meta_update(vec![game_id.clone()]);
|
||||||
|
|
||||||
// Broadcast game browser update
|
// Broadcast game browser update
|
||||||
self.state
|
if let Err(e) = self
|
||||||
|
.state
|
||||||
.broadcast_tx
|
.broadcast_tx
|
||||||
.send(meta_games_browser_update(&self.state))
|
.send(meta_games_browser_update(&self.state))
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Could not broadcast game browser update: {}", e);
|
||||||
|
};
|
||||||
|
|
||||||
// Broadcast server meta update
|
// Broadcast server meta update
|
||||||
self.state
|
if let Err(e) = self
|
||||||
|
.state
|
||||||
.broadcast_tx
|
.broadcast_tx
|
||||||
.send(meta_server_summary_update(&self.state))
|
.send(meta_server_summary_update(&self.state))
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Could not broadcast server meta update: {}", e);
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send game meta update for all players of a game
|
/// Send game meta update for all players of a game
|
||||||
|
@ -245,7 +260,11 @@ impl GameHandler {
|
||||||
// Send user's update
|
// Send user's update
|
||||||
let msg = serde_json::to_string(&meta).unwrap();
|
let msg = serde_json::to_string(&meta).unwrap();
|
||||||
let user_tx = player.user.read().unwrap().tx.clone();
|
let user_tx = player.user.read().unwrap().tx.clone();
|
||||||
tokio::spawn(async move { user_tx.send(msg).await });
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = user_tx.send(msg).await {
|
||||||
|
tracing::error!("Error sending user update: {}", e)
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tracing::error!("Attempted to create game meta update for nonexistent game!");
|
tracing::error!("Attempted to create game meta update for nonexistent game!");
|
||||||
|
@ -369,16 +388,22 @@ impl GameHandler {
|
||||||
self.send_game_meta_update(vec![game_id]);
|
self.send_game_meta_update(vec![game_id]);
|
||||||
|
|
||||||
// Broadcast game browser update
|
// Broadcast game browser update
|
||||||
self.state
|
if let Err(e) = self
|
||||||
|
.state
|
||||||
.broadcast_tx
|
.broadcast_tx
|
||||||
.send(meta_games_browser_update(&self.state))
|
.send(meta_games_browser_update(&self.state))
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Could not broadcast game browser update: {}", e);
|
||||||
|
};
|
||||||
|
|
||||||
// Broadcast server meta update
|
// Broadcast server meta update
|
||||||
self.state
|
if let Err(e) = self
|
||||||
|
.state
|
||||||
.broadcast_tx
|
.broadcast_tx
|
||||||
.send(meta_server_summary_update(&self.state))
|
.send(meta_server_summary_update(&self.state))
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Could not broadcast server meta update: {}", e);
|
||||||
|
};
|
||||||
} else {
|
} else {
|
||||||
tracing::error!("Attempted to create game for nonexistent player!");
|
tracing::error!("Attempted to create game for nonexistent player!");
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,10 +128,10 @@ async fn main() -> Result<()> {
|
||||||
.route("/websocket", get(websocket_connection_handler))
|
.route("/websocket", get(websocket_connection_handler))
|
||||||
.nest_service("/", ServeDir::new("dist"))
|
.nest_service("/", ServeDir::new("dist"))
|
||||||
.layer(ServiceBuilder::new().layer(CompressionLayer::new()))
|
.layer(ServiceBuilder::new().layer(CompressionLayer::new()))
|
||||||
.with_state(app_state)
|
|
||||||
.layer(GovernorLayer {
|
.layer(GovernorLayer {
|
||||||
config: governor_conf,
|
config: governor_conf,
|
||||||
});
|
})
|
||||||
|
.with_state(app_state);
|
||||||
|
|
||||||
// send it
|
// send it
|
||||||
let listener = tokio::net::TcpListener::bind(bind_addr.to_owned())
|
let listener = tokio::net::TcpListener::bind(bind_addr.to_owned())
|
||||||
|
|
|
@ -33,35 +33,36 @@ impl MessageHandler {
|
||||||
msg = format! {"{0}: {1}", self.state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text};
|
msg = format! {"{0}: {1}", self.state.online_users.read().unwrap().get(&addr).unwrap().read().unwrap().name, chat_message.text};
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::debug!("{msg}");
|
if let Err(e) = self
|
||||||
self.state
|
.state
|
||||||
.broadcast_tx
|
.broadcast_tx
|
||||||
.send(to_string::<ChatMessage>(&ChatMessage { text: msg }).unwrap())
|
.send(to_string::<ChatMessage>(&ChatMessage { text: msg }).unwrap())
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Error broadcasting Chat message: {}", e)
|
||||||
|
};
|
||||||
}
|
}
|
||||||
_user_log_in_request
|
_user_log_in_request
|
||||||
if let Ok(user_log_in) = from_str::<UserLogInRequest>(&text) =>
|
if let Ok(user_log_in) = from_str::<UserLogInRequest>(&text) =>
|
||||||
{
|
{
|
||||||
self.state
|
if let Err(e) = self.state.users_tx.send(UserLogIn(user_log_in, addr)).await {
|
||||||
.users_tx
|
tracing::error!("Error sending user login: {}", e)
|
||||||
.send(UserLogIn(user_log_in, addr))
|
};
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
_new_game_request if let Ok(new_game) = from_str::<NewGameRequest>(&text) => {
|
_new_game_request if let Ok(new_game) = from_str::<NewGameRequest>(&text) => {
|
||||||
self.state
|
if let Err(e) = self.state.games_tx.send(NewGame(new_game, addr)).await {
|
||||||
.games_tx
|
tracing::error!("Error requesting new game: {}", e)
|
||||||
.send(NewGame(new_game, addr))
|
};
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_join_game_request if let Ok(join_request) = from_str::<GameJoinRequest>(&text) => {
|
_join_game_request if let Ok(join_request) = from_str::<GameJoinRequest>(&text) => {
|
||||||
self.state
|
if let Err(e) = self
|
||||||
|
.state
|
||||||
.games_tx
|
.games_tx
|
||||||
.send(JoinGame(join_request.id, addr))
|
.send(JoinGame(join_request.id, addr))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Error requesting game join: {}", e)
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
_player_move_request
|
_player_move_request
|
||||||
|
@ -75,11 +76,14 @@ impl MessageHandler {
|
||||||
tracing::error!("Move request game_id is empty! Ignoring...");
|
tracing::error!("Move request game_id is empty! Ignoring...");
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
self.state
|
if let Err(e) = self
|
||||||
|
.state
|
||||||
.games_tx
|
.games_tx
|
||||||
.send(MoveRequest(move_request, addr))
|
.send(MoveRequest(move_request, addr))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Error sending move request: {}", e)
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,39 +91,44 @@ impl MessageHandler {
|
||||||
if let Ok(judge_request) = from_str::<JudgeDecisionRequest>(&text) =>
|
if let Ok(judge_request) = from_str::<JudgeDecisionRequest>(&text) =>
|
||||||
{
|
{
|
||||||
if !judge_request.winning_cards.is_empty() {
|
if !judge_request.winning_cards.is_empty() {
|
||||||
self.state
|
if let Err(e) = self
|
||||||
|
.state
|
||||||
.games_tx
|
.games_tx
|
||||||
.send(JudgeDecision(judge_request, addr))
|
.send(JudgeDecision(judge_request, addr))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Error sending Judge Decision: {}", e)
|
||||||
|
};
|
||||||
} else {
|
} else {
|
||||||
tracing::debug!("Judge request received with empty cards");
|
tracing::error!("Judge request received with empty cards");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_delete_game if let Ok(delete_request) = from_str::<GameDeleteRequest>(&text) => {
|
_delete_game if let Ok(delete_request) = from_str::<GameDeleteRequest>(&text) => {
|
||||||
self.state
|
if let Err(e) = self.state.games_tx.send(DeleteGame(delete_request)).await {
|
||||||
.games_tx
|
tracing::error!("Error sending delete game: {}", e)
|
||||||
.send(DeleteGame(delete_request))
|
};
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_ => tracing::debug!("Unhandled text from {}\n{:#?}", addr, &text),
|
_ => tracing::error!(
|
||||||
|
"Unhandled text from {}\n{:#?}\nTODO: Probably close connection here",
|
||||||
|
addr,
|
||||||
|
&text
|
||||||
|
),
|
||||||
},
|
},
|
||||||
|
|
||||||
Message::Binary(data) => tracing::debug!("{} sent binary: {:?}", addr, data),
|
Message::Binary(data) => tracing::error!("{} sent binary: {:?}\n TODO: Probably close connection here", addr, data),
|
||||||
|
|
||||||
Message::Close(close_frame) => {
|
Message::Close(close_frame) => {
|
||||||
self.handle_close(close_frame, addr);
|
self.handle_close(close_frame, addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
Message::Ping(ping) => {
|
Message::Ping(ping) => {
|
||||||
tracing::debug!("Pong received with: {:?}", ping);
|
tracing::info!("Pong received with: {:?}", ping);
|
||||||
}
|
}
|
||||||
|
|
||||||
Message::Pong(pong) => {
|
Message::Pong(pong) => {
|
||||||
tracing::debug!("Pong received with: {:?}", pong);
|
tracing::info!("Pong received with: {:?}", pong);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -142,22 +151,24 @@ impl MessageHandler {
|
||||||
let msg = ChatMessage {
|
let msg = ChatMessage {
|
||||||
text: format!("{0} left.", &user_name),
|
text: format!("{0} left.", &user_name),
|
||||||
};
|
};
|
||||||
tracing::debug!("{}", msg.text);
|
if let Err(e) = self
|
||||||
self.state
|
.state
|
||||||
.broadcast_tx
|
.broadcast_tx
|
||||||
.send(to_string::<ChatMessage>(&msg).unwrap())
|
.send(to_string::<ChatMessage>(&msg).unwrap())
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Error broadcasting user leave message: {}", e)
|
||||||
|
};
|
||||||
|
|
||||||
// Process close frame
|
// Process close frame
|
||||||
if let Some(cf) = close_frame {
|
if let Some(cf) = close_frame {
|
||||||
tracing::debug!(
|
tracing::info!(
|
||||||
"Close received from {0} with code: {1} and reason: {2}",
|
"Close received from {0} with code: {1} and reason: {2}",
|
||||||
&user_name,
|
&user_name,
|
||||||
cf.code,
|
cf.code,
|
||||||
cf.reason
|
cf.reason
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
tracing::debug!("close received without close frame")
|
tracing::info!("close received without close frame")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move user to offline
|
// Move user to offline
|
||||||
|
@ -172,13 +183,15 @@ impl MessageHandler {
|
||||||
);
|
);
|
||||||
|
|
||||||
// Send client updates
|
// Send client updates
|
||||||
self.state
|
if let Err(e) = self
|
||||||
|
.state
|
||||||
.broadcast_tx
|
.broadcast_tx
|
||||||
.send(meta_server_summary_update(&self.state))
|
.send(meta_server_summary_update(&self.state))
|
||||||
.unwrap();
|
{
|
||||||
self.state
|
tracing::error!("Error broadcasting server summary update: {}", e)
|
||||||
.broadcast_tx
|
};
|
||||||
.send(meta_chat_update(&self.state))
|
if let Err(e) = self.state.broadcast_tx.send(meta_chat_update(&self.state)) {
|
||||||
.unwrap();
|
tracing::error!("Error broadcasting chat update: {}", e)
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,15 +83,21 @@ impl UserHandler {
|
||||||
match message {
|
match message {
|
||||||
SendUserUpdate(message) => {
|
SendUserUpdate(message) => {
|
||||||
let msg = to_string::<UserUpdate>(&message).unwrap();
|
let msg = to_string::<UserUpdate>(&message).unwrap();
|
||||||
let _ = tx.send(msg).await;
|
if let Err(e) = tx.send(msg).await {
|
||||||
|
tracing::error!("Error sending user update: {}", e)
|
||||||
|
};
|
||||||
}
|
}
|
||||||
SendChatMessage(message) => {
|
SendChatMessage(message) => {
|
||||||
let msg = to_string::<ChatMessage>(&message).unwrap();
|
let msg = to_string::<ChatMessage>(&message).unwrap();
|
||||||
let _ = tx.send(msg).await;
|
if let Err(e) = tx.send(msg).await {
|
||||||
|
tracing::error!("Error sending chat message: {}", e)
|
||||||
|
};
|
||||||
}
|
}
|
||||||
SendJudgeRound(message) => {
|
SendJudgeRound(message) => {
|
||||||
let msg = to_string::<JudgeRound>(&message).unwrap();
|
let msg = to_string::<JudgeRound>(&message).unwrap();
|
||||||
let _ = tx.send(msg).await;
|
if let Err(e) = tx.send(msg).await {
|
||||||
|
tracing::error!("Error sending judge round: {}", e)
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -193,7 +199,6 @@ impl UserHandler {
|
||||||
old_name,
|
old_name,
|
||||||
new_name
|
new_name
|
||||||
};
|
};
|
||||||
tracing::debug!("{}", &msg);
|
|
||||||
let _ = broadcast_tx.send(to_string(&ChatMessage { text: msg }).unwrap());
|
let _ = broadcast_tx.send(to_string(&ChatMessage { text: msg }).unwrap());
|
||||||
}
|
}
|
||||||
// Check if name is taken by an online user
|
// Check if name is taken by an online user
|
||||||
|
@ -211,7 +216,6 @@ impl UserHandler {
|
||||||
Addr(addr),
|
Addr(addr),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
tracing::debug!("{}", old_name.clone());
|
|
||||||
self.dm_user(
|
self.dm_user(
|
||||||
SendUserUpdate(UserUpdate {
|
SendUserUpdate(UserUpdate {
|
||||||
username: old_name.clone(),
|
username: old_name.clone(),
|
||||||
|
@ -276,7 +280,6 @@ impl UserHandler {
|
||||||
let tx = self.state.games_tx.clone();
|
let tx = self.state.games_tx.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::debug!("msg: {:#?}", &msg);
|
|
||||||
let _ = tx.send(msg).await;
|
let _ = tx.send(msg).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -322,7 +325,6 @@ pub fn meta_announce_user_join(state: &Arc<AppState>, addr: &SocketAddr) -> Stri
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
tracing::debug!("{}", &msg);
|
|
||||||
to_string::<ChatMessage>(&ChatMessage { text: msg }).unwrap()
|
to_string::<ChatMessage>(&ChatMessage { text: msg }).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,6 @@ pub async fn websocket_connection_handler(
|
||||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
||||||
State(state): State<Arc<AppState>>,
|
State(state): State<Arc<AppState>>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
tracing::debug!("New connection from {}", &addr);
|
|
||||||
ws.on_upgrade(move |socket| websocket_on_connection(socket, state, addr))
|
ws.on_upgrade(move |socket| websocket_on_connection(socket, state, addr))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,11 +91,13 @@ pub async fn websocket_on_connection(stream: WebSocket, state: Arc<AppState>, ad
|
||||||
// Receive messages from this client
|
// Receive messages from this client
|
||||||
let mut recv_task = tokio::spawn(async move {
|
let mut recv_task = tokio::spawn(async move {
|
||||||
while let Some(Ok(message)) = receiver.next().await {
|
while let Some(Ok(message)) = receiver.next().await {
|
||||||
state
|
if let Err(e) = state
|
||||||
.messages_tx
|
.messages_tx
|
||||||
.send((addr.clone(), message.clone()))
|
.send((addr.clone(), message.clone()))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
{
|
||||||
|
tracing::error!("Error relaying received message: {}", e)
|
||||||
|
};
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue