From 85d79bb304bed699e5d771f8f53010015e0a5174 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 18:38:04 +0200 Subject: [PATCH] Experimental heartbeats --- src/gateway.rs | 82 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index 83cf07c..94beeb1 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -17,9 +17,12 @@ use serde::Serialize; use serde_json::from_str; use tokio::io; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; use tokio::task; +use tokio::time; +use tokio::time::Instant; use tokio_tungstenite::tungstenite::error::UrlError; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::{connect_async, connect_async_tls_with_config}; @@ -35,6 +38,7 @@ pub struct Gateway<'a> { pub token: String, pub events: Events<'a>, websocket: WebSocketConnection, + heartbeat_handler: Option } impl<'a> Gateway<'a> { @@ -47,6 +51,7 @@ impl<'a> Gateway<'a> { token, events: Events::default(), websocket: WebSocketConnection::new(websocket_url).await, + heartbeat_handler: None, }); } @@ -167,19 +172,94 @@ impl<'a> Gateway<'a> { // Invalid Session 9 => {todo!()} // Hello - // Should start our heartbeat + // Starts our heartbeat 10 => { let gateway_hello: HelloData = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket.tx.clone())); } // Heartbeat ACK 11 => {} 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} } + + // If we have an active heartbeat thread and we received a seq number we should let it know + if gateway_payload.s.is_some() { + if self.heartbeat_handler.is_some() { + + let heartbeat_communication = HeartbeatThreadCommunication { op: gateway_payload.op, d: gateway_payload.s.unwrap() }; + + self.heartbeat_handler.as_mut().unwrap().tx.send(heartbeat_communication).await.unwrap(); + } + } } } } +/** +Handles sending heartbeats to the gateway in another thread +*/ +struct HeartbeatHandler { + /// The heartbeat interval in milliseconds + heartbeat_interval: u128, + tx: Sender, +} + +impl HeartbeatHandler { + pub fn new(heartbeat_interval: u128, websocket_tx: Arc>>) -> HeartbeatHandler { + let (mut tx, mut rx) = mpsc::channel(32); + + task::spawn(async move { + let mut last_heartbeat: Instant = time::Instant::now(); + let mut last_seq_number: Option = None; + + loop { + + // If we received a seq number update, use that as the last seq number + let hb_communication: Option = rx.recv().await; + while hb_communication.is_some() { + last_seq_number = Some(hb_communication.unwrap().d); + } + + if last_heartbeat.elapsed().as_millis() > heartbeat_interval { + + let heartbeat = GatewayHeartbeat { + op: 1, + d: last_seq_number + }; + + let heartbeat_json = serde_json::to_string(&heartbeat).unwrap(); + + let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); + + websocket_tx.lock() + .await + .send(msg) + .await + .unwrap(); + + last_heartbeat = time::Instant::now(); + } + + } + }); + + Self { heartbeat_interval, tx } + } +} + +/** +Used to communicate with the main thread. +Either signifies a sequence number update or a received heartbeat ack +*/ +#[derive(Clone, Copy, Debug)] +struct HeartbeatThreadCommunication { + /// An opcode for the communication we received + op: u8, + /// The sequence number we got from discord + d: u64 +} + struct WebSocketConnection { rx: Arc>>, tx: Arc>>,