Primitive voice implementation (feature/voice) (#457)

* Add Webrtc Identify & Ready

* Add more webrtc typings

* Attempt an untested voice gateway implementation

* fmt

* Merge with main

* Same allow as for voice as normal gateway

* Test error observer

* Minor updates

* More derives

* Even more derives

* Small types update

* e

* Minor doc fixes

* Modernise voice gateway

* Add default impl for voicegatewayerror

* Make voice event fields pub

* Event updates via the scientific method

* ??

* Fix bad request in voice gateway init

* Voice gateway updates

* Fix error failing to 'deserialize' properly

* Update voice identify

* Clarify FIXME related to #430

* Update to v7

* Create seperate voice_gateway.rs and voice_udp.rs

* Restructure voice to new module

* fix: deserialization error in speaking bitflags

* feat: kinda janky ip discovery impl

* feat: return ip discovery data + minor update

* feat: packet parsing!

* fix: voice works again

* feat: add voice_media_sink_wants

(comitting uncommited changes to merge)

* chore: rename events/webrtc to events/voice_gateway

* Add UdpHandle

* chore: clippy + other misc updates

* fix: attempt to fix failing wasm build

* chore: yes clippy, that is indeed an unneeded return statement

* feat: add VoiceData struct

* feat: add VoiceData reference to UdpHandler

* feat: decryption?

* chore: formatting

* feat: add ssrc definition (op 12)

* feat: add untested sending & asbtract nonce generation

* feat: Public api! (sorta)

* small updates

* feat: add sequence number

* chore: yes

* feat: merge VoiceHandler into official development

* chore: yes clippy, you are special

* fix: duplicated gateway events

* feat: first try at vgw wasm compat

* fix: blunder

* fix: gateway connect using wrong url

* fix: properly using encrypted data, bad practice for buffer creation

* chore: split voice udp

* feat: udp error handling, create udp/backends

* fix: its the same

* chore: clarify UDP on WASM

* api: split voice gateway and udp features, test for voice gateway in WASM

* feat: new encryption modes, minor code quality

* docs: document voice encryption modes

* chore: unused imports

* chore: update getrandom version to match wasm version

* chore: update on packet size FIXME

* drop buf asap

* Okay can't do that actually

* tests: add nonce test

* normal tests work?

* docs: fix doc warning, fix incorrect refrences to 'webrtc'

* chore: json isn't a doc test

* tests: better gateway auth test

* testing tests

* update voice heartbeat, fix the new test issue

* committed too much

* fix: unused import

* fix: use ip discovery address as string, not as Vec<u8>

* chore: less obnoxious logging

* chore: better unimplemented voice modes handling

* chore: remove unused variable

* chore: use matches macro

* add voice examples, make gateway ones clearer

* rename voice example

* chore: remove unused VoiceHandler

* fix: implement gateway Reconnect and InvalidSession

* Typo

Co-authored-by: Flori <39242991+bitfl0wer@users.noreply.github.com>

* Fix a bunch of typos

Co-authored-by: Flori <39242991+bitfl0wer@users.noreply.github.com>

* fix: error handling while loading native certs

* fix: guh

* use be for nonce bytes

* fix: refactor gw and vgw closures

* remove outdated docs

---------

Co-authored-by: Flori <39242991+bitfl0wer@users.noreply.github.com>
This commit is contained in:
kozabrada123 2024-04-16 17:18:21 +02:00 committed by GitHub
parent 5209348ac1
commit 74d6785e50
54 changed files with 2942 additions and 226 deletions

View File

@ -100,7 +100,7 @@ jobs:
rustup target add wasm32-unknown-unknown
curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash
cargo binstall --no-confirm wasm-bindgen-cli --version "0.2.88" --force
GECKODRIVER=$(which geckodriver) cargo test --target wasm32-unknown-unknown --no-default-features --features="client, rt"
GECKODRIVER=$(which geckodriver) cargo test --target wasm32-unknown-unknown --no-default-features --features="client, rt, voice_gateway"
wasm-chrome:
runs-on: macos-latest
steps:
@ -128,4 +128,4 @@ jobs:
rustup target add wasm32-unknown-unknown
curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash
cargo binstall --no-confirm wasm-bindgen-cli --version "0.2.88" --force
CHROMEDRIVER=$(which chromedriver) cargo test --target wasm32-unknown-unknown --no-default-features --features="client, rt"
CHROMEDRIVER=$(which chromedriver) cargo test --target wasm32-unknown-unknown --no-default-features --features="client, rt, voice_gateway"

131
Cargo.lock generated
View File

@ -17,6 +17,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aead"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"
dependencies = [
"crypto-common",
"generic-array",
]
[[package]]
name = "ahash"
version = "0.8.7"
@ -206,7 +216,9 @@ dependencies = [
"bitflags 2.4.1",
"chorus-macros",
"chrono",
"crypto_secretbox",
"custom_error",
"discortp",
"futures-util",
"getrandom",
"hostname",
@ -264,6 +276,17 @@ dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
"zeroize",
]
[[package]]
name = "console_error_panic_hook"
version = "0.1.7"
@ -342,9 +365,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"rand_core",
"typenum",
]
[[package]]
name = "crypto_secretbox"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d6cf87adf719ddf43a805e92c6870a531aedda35ff640442cbaf8674e141e1"
dependencies = [
"aead",
"cipher",
"generic-array",
"poly1305",
"salsa20",
"subtle",
"zeroize",
]
[[package]]
name = "custom_error"
version = "1.9.2"
@ -425,6 +464,16 @@ dependencies = [
"subtle",
]
[[package]]
name = "discortp"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524b9439c09174aede2c88d58cfc6b83575b06569d1af4d07562f76595b2896b"
dependencies = [
"pnet_macros",
"pnet_macros_support",
]
[[package]]
name = "dotenvy"
version = "0.15.7"
@ -643,6 +692,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
"zeroize",
]
[[package]]
@ -923,6 +973,15 @@ dependencies = [
"serde",
]
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]]
name = "ipnet"
version = "2.9.0"
@ -1123,6 +1182,12 @@ dependencies = [
"libc",
]
[[package]]
name = "no-std-net"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43794a0ace135be66a25d3ae77d41b91615fb68ae937f904090203e81f755b65"
[[package]]
name = "nom"
version = "7.1.3"
@ -1217,6 +1282,12 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.62"
@ -1363,6 +1434,36 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a"
[[package]]
name = "pnet_base"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d3a993d49e5fd5d4d854d6999d4addca1f72d86c65adf224a36757161c02b6"
dependencies = [
"no-std-net",
]
[[package]]
name = "pnet_macros"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48dd52a5211fac27e7acb14cfc9f30ae16ae0e956b7b779c8214c74559cef4c3"
dependencies = [
"proc-macro2",
"quote",
"regex",
"syn 1.0.109",
]
[[package]]
name = "pnet_macros_support"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89de095dc7739349559913aed1ef6a11e73ceade4897dadc77c5e09de6740750"
dependencies = [
"pnet_base",
]
[[package]]
name = "poem"
version = "1.3.59"
@ -1406,6 +1507,17 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "poly1305"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf"
dependencies = [
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@ -1688,6 +1800,15 @@ version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
[[package]]
name = "salsa20"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213"
dependencies = [
"cipher",
]
[[package]]
name = "schannel"
version = "0.1.23"
@ -2526,6 +2647,16 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "universal-hash"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea"
dependencies = [
"crypto-common",
"subtle",
]
[[package]]
name = "untrusted"
version = "0.7.1"

View File

@ -17,6 +17,9 @@ backend = ["dep:poem", "dep:sqlx"]
rt-multi-thread = ["tokio/rt-multi-thread"]
rt = ["tokio/rt"]
client = []
voice = ["voice_udp", "voice_gateway"]
voice_udp = ["dep:discortp", "dep:crypto_secretbox"]
voice_gateway = []
[dependencies]
tokio = { version = "1.35.1", features = ["macros", "sync"] }
@ -50,6 +53,8 @@ sqlx = { version = "0.7.3", features = [
"runtime-tokio-native-tls",
"any",
], optional = true }
discortp = { version = "0.5.0", optional = true, features = ["rtp", "discord", "demux"] }
crypto_secretbox = { version = "0.1.1", optional = true }
rand = "0.8.5"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
@ -61,6 +66,7 @@ tokio-tungstenite = { version = "0.20.1", features = [
] }
native-tls = "0.2.11"
hostname = "0.3.1"
getrandom = { version = "0.2.12" }
[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2.12", features = ["js"] }

View File

@ -119,7 +119,7 @@ like "proxy connection checking" are already disabled on this version, which oth
### wasm
To test for wasm, you will need to `cargo install wasm-pack`. You can then run
`wasm-pack test --<chrome/firefox/safari> --headless -- --target wasm32-unknown-unknown --features="rt, client" --no-default-features`
`wasm-pack test --<chrome/firefox/safari> --headless -- --target wasm32-unknown-unknown --features="rt, client, voice_gateway" --no-default-features`
to run the tests for wasm.
## Versioning

View File

@ -2,6 +2,15 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// This example showcase how to properly use gateway observers.
//
// To properly run it, you will need to change the token below.
const TOKEN: &str = "";
/// Find the gateway websocket url of the server we want to connect to
const GATEWAY_URL: &str = "wss://gateway.old.server.spacebar.chat/";
use async_trait::async_trait;
use chorus::gateway::Gateway;
use chorus::{
@ -36,11 +45,10 @@ impl Observer<GatewayReady> for ExampleObserver {
#[tokio::main(flavor = "current_thread")]
async fn main() {
// Find the gateway websocket url of the server we want to connect to
let websocket_url_spacebar = "wss://gateway.old.server.spacebar.chat/".to_string();
let gateway_websocket_url = GATEWAY_URL.to_string();
// Initiate the gateway connection
let gateway = Gateway::spawn(websocket_url_spacebar).await.unwrap();
let gateway = Gateway::spawn(gateway_websocket_url).await.unwrap();
// Create an instance of our observer
let observer = ExampleObserver {};
@ -59,7 +67,7 @@ async fn main() {
.subscribe(shared_observer);
// Authenticate so we will receive any events
let token = "SecretToken".to_string();
let token = TOKEN.to_string();
let mut identify = GatewayIdentifyPayload::common();
identify.token = token;
gateway.send_identify(identify).await;

View File

@ -2,6 +2,16 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// This example showcases how to initiate a gateway connection manually
// (e. g. not through ChorusUser)
//
// To properly run it, you will need to modify the token below.
const TOKEN: &str = "";
/// Find the gateway websocket url of the server we want to connect to
const GATEWAY_URL: &str = "wss://gateway.old.server.spacebar.chat/";
use std::time::Duration;
use chorus::gateway::Gateway;
@ -15,16 +25,15 @@ use wasmtimer::tokio::sleep;
/// This example creates a simple gateway connection and a session with an Identify event
#[tokio::main(flavor = "current_thread")]
async fn main() {
// Find the gateway websocket url of the server we want to connect to
let websocket_url_spacebar = "wss://gateway.old.server.spacebar.chat/".to_string();
let gateway_websocket_url = GATEWAY_URL.to_string();
// Initiate the gateway connection, starting a listener in one thread and a heartbeat handler in another
let gateway = Gateway::spawn(websocket_url_spacebar).await.unwrap();
let gateway = Gateway::spawn(gateway_websocket_url).await.unwrap();
// At this point, we are connected to the server and are sending heartbeats, however we still haven't authenticated
// Get a token for an account on the server
let token = "SecretToken".to_string();
let token = TOKEN.to_string();
// Create an identify event
// An Identify event is how the server authenticates us and gets info about our os and browser, along with our intents / capabilities

View File

@ -0,0 +1,13 @@
[package]
name = "voice_simple"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "*"
chorus = { path = "../../", features = ["rt", "client", "voice"] }
tokio = { version = "*", features = ["full"] }
simplelog = "*"
log = "*"

View File

@ -0,0 +1,311 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// This example showcases how to use the voice udp channel.
//
// To use this to properly communicate with voice, you will need to bring your own opus bindings
// along with potentially sending some other events, like Speaking
//
// To properly run this example, you will need to change some values below,
// like the token, guild and channel ids.
const TOKEN: &str = "";
const VOICE_GUILD_ID: Option<Snowflake> = None;
const VOICE_CHANNEL_ID: Option<Snowflake> = Some(Snowflake(0_u64));
const GATEWAY_URL: &str = "wss://gateway.discord.gg";
use async_trait::async_trait;
use simplelog::{TermLogger, Config, WriteLogger};
use std::{net::SocketAddrV4, sync::Arc, fs::File, time::Duration};
use chorus::{
gateway::{Observer, Gateway},
types::{
GatewayReady, SelectProtocol, SelectProtocolData, SessionDescription, Snowflake, Speaking,
SpeakingBitflags, SsrcDefinition, VoiceEncryptionMode, VoiceIdentify, VoiceProtocol,
VoiceReady, VoiceServerUpdate, GatewayIdentifyPayload, UpdateVoiceState,
},
voice::{
gateway::{VoiceGateway, VoiceGatewayHandle},
udp::{UdpHandle, UdpHandler},
voice_data::VoiceData,
},
};
use log::{info, LevelFilter};
use tokio::sync::{Mutex, RwLock};
extern crate chorus;
extern crate tokio;
/// Handles in between connections between the gateway and UDP modules
#[derive(Debug, Clone)]
pub struct VoiceHandler {
pub voice_gateway_connection: Arc<Mutex<Option<VoiceGatewayHandle>>>,
pub voice_udp_connection: Arc<Mutex<Option<UdpHandle>>>,
pub data: Arc<RwLock<VoiceData>>,
}
impl VoiceHandler {
/// Creates a new [VoiceHandler], only initializing the data
pub fn new() -> VoiceHandler {
Self {
data: Arc::new(RwLock::new(VoiceData::default())),
voice_gateway_connection: Arc::new(Mutex::new(None)),
voice_udp_connection: Arc::new(Mutex::new(None)),
}
}
}
impl Default for VoiceHandler {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
// On [VoiceServerUpdate] we get our starting data and URL for the voice gateway server.
impl Observer<VoiceServerUpdate> for VoiceHandler {
async fn update(&self, data: &VoiceServerUpdate) {
let mut data_lock = self.data.write().await;
data_lock.server_data = Some(data.clone());
let user_id = data_lock.user_id;
let session_id = data_lock.session_id.clone();
drop(data_lock);
// Create and connect to the voice gateway
let voice_gateway_handle = VoiceGateway::spawn(data.endpoint.clone().unwrap())
.await
.unwrap();
let server_id: Snowflake;
if data.guild_id.is_some() {
server_id = data.guild_id.unwrap();
} else {
server_id = data.channel_id.unwrap();
}
let voice_identify = VoiceIdentify {
server_id,
user_id,
session_id,
token: data.token.clone(),
video: Some(false),
};
voice_gateway_handle.send_identify(voice_identify).await;
let cloned_gateway_handle = voice_gateway_handle.clone();
let mut voice_events = cloned_gateway_handle.events.lock().await;
let self_reference = Arc::new(self.clone());
// Subscribe to voice gateway events
voice_events.voice_ready.subscribe(self_reference.clone());
voice_events
.session_description
.subscribe(self_reference.clone());
voice_events.speaking.subscribe(self_reference.clone());
voice_events
.ssrc_definition
.subscribe(self_reference.clone());
*self.voice_gateway_connection.lock().await = Some(voice_gateway_handle);
}
}
#[async_trait]
// On [VoiceReady] we get info for establishing a UDP connection, and we immediately need said UDP
// connection for ip discovery.
impl Observer<VoiceReady> for VoiceHandler {
async fn update(&self, data: &VoiceReady) {
let mut data_lock = self.data.write().await;
data_lock.ready_data = Some(data.clone());
drop(data_lock);
// Create a udp connection and perform ip discovery
let udp_handle = UdpHandler::spawn(
self.data.clone(),
std::net::SocketAddr::V4(SocketAddrV4::new(data.ip, data.port)),
data.ssrc,
)
.await
.unwrap();
// Subscribe ourself to receiving rtp data
udp_handle
.events
.lock()
.await
.rtp
.subscribe(Arc::new(self.clone()));
let ip_discovery = self.data.read().await.ip_discovery.clone().unwrap();
*self.voice_udp_connection.lock().await = Some(udp_handle.clone());
let string_ip_address =
String::from_utf8(ip_discovery.address).expect("Ip discovery gave non string ip");
// Send a select protocol, which tells the server where we'll be receiving data and what
// mode to encrypt data in
self.voice_gateway_connection
.lock()
.await
.clone()
.unwrap()
.send_select_protocol(SelectProtocol {
protocol: VoiceProtocol::Udp,
data: SelectProtocolData {
address: string_ip_address,
port: ip_discovery.port,
// There are several other voice encryption modes available, though not all are
// implemented in chorus
mode: VoiceEncryptionMode::Xsalsa20Poly1305,
},
..Default::default()
})
.await;
}
}
#[async_trait]
// Session descryption gives us final info regarding codecs and our encryption key
impl Observer<SessionDescription> for VoiceHandler {
async fn update(&self, data: &SessionDescription) {
let mut data_write = self.data.write().await;
data_write.session_description = Some(data.clone());
drop(data_write);
}
}
#[async_trait]
// Ready is used just to obtain some info, like the user id and session id
impl Observer<GatewayReady> for VoiceHandler {
async fn update(&self, data: &GatewayReady) {
let mut lock = self.data.write().await;
lock.user_id = data.user.id;
lock.session_id = data.session_id.clone();
drop(lock);
}
}
#[async_trait]
// This is the received voice data
impl Observer<chorus::voice::discortp::rtp::Rtp> for VoiceHandler {
async fn update(&self, data: &chorus::voice::discortp::rtp::Rtp) {
info!(
"Received decrypted voice data! {:?} (SSRC: {})",
data.payload.clone(),
data.ssrc,
);
}
}
#[async_trait]
// This event gives extra info about who is speaking
impl Observer<Speaking> for VoiceHandler {
async fn update(&self, data: &Speaking) {
println!(
"Received Speaking! (SRRC: {}, flags: {:?})",
data.ssrc,
SpeakingBitflags::from_bits(data.speaking).unwrap()
);
}
}
#[async_trait]
// This event gives some info about which user has which ssrc
impl Observer<SsrcDefinition> for VoiceHandler {
async fn update(&self, data: &SsrcDefinition) {
println!(
"Received SSRC Definition! (User {} has audio ssrc {})",
data.user_id.unwrap(),
data.audio_ssrc
);
}
}
#[tokio::main]
async fn main() {
simplelog::CombinedLogger::init(vec![
TermLogger::new(
LevelFilter::Debug,
Config::default(),
simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto,
),
WriteLogger::new(
LevelFilter::Trace,
Config::default(),
File::create("latest.log").unwrap(),
),
])
.unwrap();
let gateway = Gateway::spawn(GATEWAY_URL.to_string())
.await
.unwrap();
let mut identify = GatewayIdentifyPayload::common();
identify.token = TOKEN.to_string();
gateway.send_identify(identify).await;
let voice_handler = Arc::new(VoiceHandler::new());
// Voice handler needs voice server update
gateway
.events
.lock()
.await
.voice
.server_update
.subscribe(voice_handler.clone());
// It also needs a bit of the data in ready
gateway
.events
.lock()
.await
.session
.ready
.subscribe(voice_handler.clone());
// Data which channel to update the local user to be joined into.
//
// guild_id and channel_id can be some to join guild voice channels
//
// guild_id can be none and channel id some to join dm calls
//
// both can be none to leave all voice channels
let voice_state_update = UpdateVoiceState {
guild_id: VOICE_GUILD_ID,
channel_id: VOICE_CHANNEL_ID,
self_mute: false,
self_deaf: false,
..Default::default()
};
gateway.send_update_voice_state(voice_state_update).await;
loop {
tokio::time::sleep(Duration::from_millis(1000)).await;
// Potentially send some data here
/*let voice_udp_option = voice_handler.voice_udp_connection.lock().await.clone();
if voice_udp_option.is_some() {
voice_udp_option.unwrap().send_opus_data(0, vec![1, 2, 3, 4, 5]).await.unwrap();
}*/
}
}

View File

@ -13,7 +13,7 @@ impl Guild {
/// permission to be present on the current user.
///
/// If the guild/channel you are searching is not yet indexed, the endpoint will return a 202 accepted response.
/// In this case, the method will return a [`ChorusError::InvalidResponse`] error.
/// In this case, the method will return a [`ChorusError::InvalidResponse`](crate::errors::ChorusError::InvalidResponse) error.
///
/// # Reference:
/// See <https://discord-userdoccers.vercel.app/resources/message#search-messages>

View File

@ -67,7 +67,7 @@ custom_error! {
}
custom_error! {
/// For errors we receive from the gateway, see https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#gateway-close-event-codes;
/// For errors we receive from the gateway, see <https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#gateway-close-event-codes>;
///
/// Supposed to be sent as numbers, though they are sent as string most of the time?
///
@ -100,3 +100,59 @@ custom_error! {
}
impl WebSocketEvent for GatewayError {}
custom_error! {
/// Voice Gateway errors
///
/// Similar to [GatewayError].
///
/// See <https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes>;
#[derive(Clone, Default, PartialEq, Eq)]
pub VoiceGatewayError
// Errors we receive
#[default]
UnknownOpcode = "You sent an invalid opcode",
FailedToDecodePayload = "You sent an invalid payload in your identifying to the (Voice) Gateway",
NotAuthenticated = "You sent a payload before identifying with the (Voice) Gateway",
AuthenticationFailed = "The token you sent in your identify payload is incorrect",
AlreadyAuthenticated = "You sent more than one identify payload",
SessionNoLongerValid = "Your session is no longer valid",
SessionTimeout = "Your session has timed out",
ServerNotFound = "We can't find the server you're trying to connect to",
UnknownProtocol = "We didn't recognize the protocol you sent",
Disconnected = "Channel was deleted, you were kicked, voice server changed, or the main gateway session was dropped. Should not reconnect.",
VoiceServerCrashed = "The server crashed, try resuming",
UnknownEncryptionMode = "Server failed to decrypt data",
// Errors when initiating a gateway connection
CannotConnect{error: String} = "Cannot connect due to a tungstenite error: {error}",
NonHelloOnInitiate{opcode: u8} = "Received non hello on initial gateway connection ({opcode}), something is definitely wrong",
// Other misc errors
UnexpectedOpcodeReceived{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}",
}
impl WebSocketEvent for VoiceGatewayError {}
custom_error! {
/// Voice UDP errors.
#[derive(Clone, PartialEq, Eq)]
pub VoiceUdpError
// General errors
BrokenSocket{error: String} = "Could not write / read from UDP socket: {error}",
NoData = "We have not set received the necessary data to perform this operation.",
// Encryption errors
EncryptionModeNotImplemented{encryption_mode: String} = "Voice encryption mode {encryption_mode} is not yet implemented.",
NoKey = "Tried to encrypt / decrypt rtp data, but no key has been received yet",
FailedEncryption = "Tried to encrypt rtp data, but failed. Most likely this is an issue chorus' nonce generation. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new",
FailedDecryption = "Tried to decrypt rtp data, but failed. Most likely this is an issue chorus' nonce generation. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new",
FailedNonceGeneration{error: String} = "Tried to generate nonce, but failed due to error: {error}.",
// Errors when initiating a socket connection
CannotBind{error: String} = "Cannot bind socket due to a UDP error: {error}",
CannotConnect{error: String} = "Cannot connect due to a UDP error: {error}",
}
impl WebSocketEvent for VoiceUdpError {}

View File

@ -27,8 +27,16 @@ impl TungsteniteBackend {
websocket_url: &str,
) -> Result<(TungsteniteSink, TungsteniteStream), crate::errors::GatewayError> {
let mut roots = rustls::RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs")
{
let certs = rustls_native_certs::load_native_certs();
if let Err(e) = certs {
log::error!("Failed to load platform native certs! {:?}", e);
return Err(GatewayError::CannotConnect {
error: format!("{:?}", e),
});
}
for cert in certs.unwrap() {
roots.add(&rustls::Certificate(cert.0)).unwrap();
}
let (websocket_stream, _) = match connect_async_tls_with_config(

View File

@ -46,6 +46,8 @@ pub struct Session {
pub ready: GatewayEvent<types::GatewayReady>,
pub ready_supplemental: GatewayEvent<types::GatewayReadySupplemental>,
pub replace: GatewayEvent<types::SessionsReplace>,
pub reconnect: GatewayEvent<types::GatewayReconnect>,
pub invalid: GatewayEvent<types::GatewayInvalidSession>,
}
#[derive(Default, Debug)]

View File

@ -9,13 +9,14 @@ use log::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::task;
use self::event::Events;
use super::events::Events;
use super::*;
use super::{Sink, Stream};
use crate::types::{
self, AutoModerationRule, AutoModerationRuleUpdate, Channel, ChannelCreate, ChannelDelete,
ChannelUpdate, Guild, GuildRoleCreate, GuildRoleUpdate, JsonField, RoleObject, SourceUrlField,
ThreadUpdate, UpdateMessage, WebSocketEvent,
ChannelUpdate, GatewayInvalidSession, GatewayReconnect, Guild, GuildRoleCreate,
GuildRoleUpdate, JsonField, RoleObject, SourceUrlField, ThreadUpdate, UpdateMessage,
WebSocketEvent,
};
#[derive(Debug)]
@ -25,6 +26,7 @@ pub struct Gateway {
websocket_send: Arc<Mutex<Sink>>,
websocket_receive: Stream,
kill_send: tokio::sync::broadcast::Sender<()>,
kill_receive: tokio::sync::broadcast::Receiver<()>,
store: Arc<Mutex<HashMap<Snowflake, Arc<RwLock<ObservableObject>>>>>,
url: String,
}
@ -74,6 +76,7 @@ impl Gateway {
websocket_send: shared_websocket_send.clone(),
websocket_receive,
kill_send: kill_send.clone(),
kill_receive: kill_send.subscribe(),
store: store.clone(),
url: websocket_url.clone(),
};
@ -98,14 +101,21 @@ impl Gateway {
}
/// The main gateway listener task;
///
/// Can only be stopped by closing the websocket, cannot be made to listen for kill
pub async fn gateway_listen_task(&mut self) {
loop {
let msg = self.websocket_receive.next().await;
let msg;
tokio::select! {
Ok(_) = self.kill_receive.recv() => {
log::trace!("GW: Closing listener task");
break;
}
message = self.websocket_receive.next() => {
msg = message;
}
}
// PRETTYFYME: Remove inline conditional compiling
// This if chain can be much better but if let is unstable on stable rust
#[cfg(not(target_arch = "wasm32"))]
if let Some(Ok(message)) = msg {
self.handle_message(message.into()).await;
@ -339,10 +349,42 @@ impl Gateway {
.unwrap();
}
GATEWAY_RECONNECT => {
todo!()
trace!("GW: Received Reconnect");
let reconnect = GatewayReconnect {};
self.events
.lock()
.await
.session
.reconnect
.notify(reconnect)
.await;
}
GATEWAY_INVALID_SESSION => {
todo!()
trace!("GW: Received Invalid Session");
let mut resumable: bool = false;
if let Some(raw_value) = gateway_payload.event_data {
if let Ok(deserialized) = serde_json::from_str(raw_value.get()) {
resumable = deserialized;
} else {
warn!("Failed to parse part of INVALID_SESSION ('{}' as bool), assuming non-resumable", raw_value.get());
}
} else {
warn!("Failed to parse part of INVALID_SESSION ('d' missing), assuming non-resumable");
}
let invalid_session = GatewayInvalidSession { resumable };
self.events
.lock()
.await
.session
.invalid
.notify(invalid_session)
.await;
}
// Starts our heartbeat
// We should have already handled this in gateway init
@ -398,165 +440,3 @@ impl Gateway {
}
}
}
pub mod event {
use super::*;
#[derive(Default, Debug)]
pub struct Events {
pub application: Application,
pub auto_moderation: AutoModeration,
pub session: Session,
pub message: Message,
pub user: User,
pub relationship: Relationship,
pub channel: Channel,
pub thread: Thread,
pub guild: Guild,
pub invite: Invite,
pub integration: Integration,
pub interaction: Interaction,
pub stage_instance: StageInstance,
pub call: Call,
pub voice: Voice,
pub webhooks: Webhooks,
pub gateway_identify_payload: GatewayEvent<types::GatewayIdentifyPayload>,
pub gateway_resume: GatewayEvent<types::GatewayResume>,
pub error: GatewayEvent<GatewayError>,
}
#[derive(Default, Debug)]
pub struct Application {
pub command_permissions_update: GatewayEvent<types::ApplicationCommandPermissionsUpdate>,
}
#[derive(Default, Debug)]
pub struct AutoModeration {
pub rule_create: GatewayEvent<types::AutoModerationRuleCreate>,
pub rule_update: GatewayEvent<types::AutoModerationRuleUpdate>,
pub rule_delete: GatewayEvent<types::AutoModerationRuleDelete>,
pub action_execution: GatewayEvent<types::AutoModerationActionExecution>,
}
#[derive(Default, Debug)]
pub struct Session {
pub ready: GatewayEvent<types::GatewayReady>,
pub ready_supplemental: GatewayEvent<types::GatewayReadySupplemental>,
pub replace: GatewayEvent<types::SessionsReplace>,
}
#[derive(Default, Debug)]
pub struct StageInstance {
pub create: GatewayEvent<types::StageInstanceCreate>,
pub update: GatewayEvent<types::StageInstanceUpdate>,
pub delete: GatewayEvent<types::StageInstanceDelete>,
}
#[derive(Default, Debug)]
pub struct Message {
pub create: GatewayEvent<types::MessageCreate>,
pub update: GatewayEvent<types::MessageUpdate>,
pub delete: GatewayEvent<types::MessageDelete>,
pub delete_bulk: GatewayEvent<types::MessageDeleteBulk>,
pub reaction_add: GatewayEvent<types::MessageReactionAdd>,
pub reaction_remove: GatewayEvent<types::MessageReactionRemove>,
pub reaction_remove_all: GatewayEvent<types::MessageReactionRemoveAll>,
pub reaction_remove_emoji: GatewayEvent<types::MessageReactionRemoveEmoji>,
pub ack: GatewayEvent<types::MessageACK>,
}
#[derive(Default, Debug)]
pub struct User {
pub update: GatewayEvent<types::UserUpdate>,
pub guild_settings_update: GatewayEvent<types::UserGuildSettingsUpdate>,
pub presence_update: GatewayEvent<types::PresenceUpdate>,
pub typing_start: GatewayEvent<types::TypingStartEvent>,
}
#[derive(Default, Debug)]
pub struct Relationship {
pub add: GatewayEvent<types::RelationshipAdd>,
pub remove: GatewayEvent<types::RelationshipRemove>,
}
#[derive(Default, Debug)]
pub struct Channel {
pub create: GatewayEvent<types::ChannelCreate>,
pub update: GatewayEvent<types::ChannelUpdate>,
pub unread_update: GatewayEvent<types::ChannelUnreadUpdate>,
pub delete: GatewayEvent<types::ChannelDelete>,
pub pins_update: GatewayEvent<types::ChannelPinsUpdate>,
}
#[derive(Default, Debug)]
pub struct Thread {
pub create: GatewayEvent<types::ThreadCreate>,
pub update: GatewayEvent<types::ThreadUpdate>,
pub delete: GatewayEvent<types::ThreadDelete>,
pub list_sync: GatewayEvent<types::ThreadListSync>,
pub member_update: GatewayEvent<types::ThreadMemberUpdate>,
pub members_update: GatewayEvent<types::ThreadMembersUpdate>,
}
#[derive(Default, Debug)]
pub struct Guild {
pub create: GatewayEvent<types::GuildCreate>,
pub update: GatewayEvent<types::GuildUpdate>,
pub delete: GatewayEvent<types::GuildDelete>,
pub audit_log_entry_create: GatewayEvent<types::GuildAuditLogEntryCreate>,
pub ban_add: GatewayEvent<types::GuildBanAdd>,
pub ban_remove: GatewayEvent<types::GuildBanRemove>,
pub emojis_update: GatewayEvent<types::GuildEmojisUpdate>,
pub stickers_update: GatewayEvent<types::GuildStickersUpdate>,
pub integrations_update: GatewayEvent<types::GuildIntegrationsUpdate>,
pub member_add: GatewayEvent<types::GuildMemberAdd>,
pub member_remove: GatewayEvent<types::GuildMemberRemove>,
pub member_update: GatewayEvent<types::GuildMemberUpdate>,
pub members_chunk: GatewayEvent<types::GuildMembersChunk>,
pub role_create: GatewayEvent<types::GuildRoleCreate>,
pub role_update: GatewayEvent<types::GuildRoleUpdate>,
pub role_delete: GatewayEvent<types::GuildRoleDelete>,
pub role_scheduled_event_create: GatewayEvent<types::GuildScheduledEventCreate>,
pub role_scheduled_event_update: GatewayEvent<types::GuildScheduledEventUpdate>,
pub role_scheduled_event_delete: GatewayEvent<types::GuildScheduledEventDelete>,
pub role_scheduled_event_user_add: GatewayEvent<types::GuildScheduledEventUserAdd>,
pub role_scheduled_event_user_remove: GatewayEvent<types::GuildScheduledEventUserRemove>,
pub passive_update_v1: GatewayEvent<types::PassiveUpdateV1>,
}
#[derive(Default, Debug)]
pub struct Invite {
pub create: GatewayEvent<types::InviteCreate>,
pub delete: GatewayEvent<types::InviteDelete>,
}
#[derive(Default, Debug)]
pub struct Integration {
pub create: GatewayEvent<types::IntegrationCreate>,
pub update: GatewayEvent<types::IntegrationUpdate>,
pub delete: GatewayEvent<types::IntegrationDelete>,
}
#[derive(Default, Debug)]
pub struct Interaction {
pub create: GatewayEvent<types::InteractionCreate>,
}
#[derive(Default, Debug)]
pub struct Call {
pub create: GatewayEvent<types::CallCreate>,
pub update: GatewayEvent<types::CallUpdate>,
pub delete: GatewayEvent<types::CallDelete>,
}
#[derive(Default, Debug)]
pub struct Voice {
pub state_update: GatewayEvent<types::VoiceStateUpdate>,
pub server_update: GatewayEvent<types::VoiceServerUpdate>,
}
#[derive(Default, Debug)]
pub struct Webhooks {
pub update: GatewayEvent<types::WebhooksUpdate>,
}
}

View File

@ -7,7 +7,7 @@ use log::*;
use std::fmt::Debug;
use super::{event::Events, *};
use super::{events::Events, *};
use crate::types::{self, Composite};
/// Represents a handle to a Gateway connection. A Gateway connection will create observable

View File

@ -26,7 +26,7 @@ use super::*;
use crate::types;
/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms
const HEARTBEAT_ACK_TIMEOUT: u64 = 2000;
pub const HEARTBEAT_ACK_TIMEOUT: u64 = 2000;
/// Handles sending heartbeats to the gateway in another thread
#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is used
@ -77,11 +77,6 @@ impl HeartbeatHandler {
let mut last_seq_number: Option<u64> = None;
loop {
if kill_receive.try_recv().is_ok() {
trace!("GW: Closing heartbeat task");
break;
}
let timeout = if last_heartbeat_acknowledged {
heartbeat_interval
} else {
@ -115,6 +110,10 @@ impl HeartbeatHandler {
}
}
}
Ok(_) = kill_receive.recv() => {
log::trace!("GW: Closing heartbeat task");
break;
}
}
if should_send {

View File

@ -98,6 +98,12 @@ pub struct GatewayEvent<T: WebSocketEvent> {
}
impl<T: WebSocketEvent> GatewayEvent<T> {
pub fn new() -> Self {
Self {
observers: Vec::new(),
}
}
/// Returns true if the GatewayEvent is observed by at least one Observer.
pub fn is_observed(&self) -> bool {
!self.observers.is_empty()
@ -120,7 +126,7 @@ impl<T: WebSocketEvent> GatewayEvent<T> {
}
/// Notifies the observers of the GatewayEvent.
async fn notify(&self, new_event_data: T) {
pub(crate) async fn notify(&self, new_event_data: T) {
for observer in &self.observers {
observer.update(&new_event_data).await;
}

View File

@ -132,7 +132,10 @@ pub mod instance;
#[cfg(feature = "client")]
pub mod ratelimiter;
pub mod types;
#[cfg(feature = "client")]
#[cfg(all(
feature = "client",
any(feature = "voice_udp", feature = "voice_gateway")
))]
pub mod voice;
#[derive(Clone, Default, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]

View File

@ -37,6 +37,7 @@ pub struct VoiceState {
pub channel_id: Option<Snowflake>,
pub user_id: Snowflake,
pub member: Option<Shared<GuildMember>>,
/// Includes alphanumeric characters, not a snowflake
pub session_id: String,
pub token: Option<String>,
pub deaf: bool,

View File

@ -0,0 +1,17 @@
use serde::{Deserialize, Serialize};
use super::WebSocketEvent;
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
/// Your session is now invalid.
///
/// Either reauthenticate and reidentify or resume if possible.
///
/// # Reference
/// See <https://docs.discord.sex/topics/gateway-events#invalid-session>
pub struct GatewayInvalidSession {
#[serde(rename = "d")]
pub resumable: bool,
}
impl WebSocketEvent for GatewayInvalidSession {}

View File

@ -14,12 +14,14 @@ pub use hello::*;
pub use identify::*;
pub use integration::*;
pub use interaction::*;
pub use invalid_session::*;
pub use invite::*;
pub use lazy_request::*;
pub use message::*;
pub use passive_update::*;
pub use presence::*;
pub use ready::*;
pub use reconnect::*;
pub use relationship::*;
pub use request_members::*;
pub use resume::*;
@ -28,8 +30,8 @@ pub use stage_instance::*;
pub use thread::*;
pub use user::*;
pub use voice::*;
pub use voice_gateway::*;
pub use webhooks::*;
pub use webrtc::*;
#[cfg(feature = "client")]
use super::Snowflake;
@ -60,12 +62,14 @@ mod hello;
mod identify;
mod integration;
mod interaction;
mod invalid_session;
mod invite;
mod lazy_request;
mod message;
mod passive_update;
mod presence;
mod ready;
mod reconnect;
mod relationship;
mod request_members;
mod resume;
@ -76,7 +80,7 @@ mod user;
mod voice;
mod webhooks;
mod webrtc;
mod voice_gateway;
pub trait WebSocketEvent: Send + Sync + Debug {}

View File

@ -0,0 +1,12 @@
use serde::{Deserialize, Serialize};
use super::WebSocketEvent;
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
/// "The reconnect event is dispatched when a client should reconnect to the Gateway (and resume their existing session, if they have one). This event usually occurs during deploys to migrate sessions gracefully off old hosts"
///
/// # Reference
/// See <https://docs.discord.sex/topics/gateway-events#reconnect>
pub struct GatewayReconnect {}
impl WebSocketEvent for GatewayReconnect {}

View File

@ -38,7 +38,10 @@ impl WebSocketEvent for VoiceStateUpdate {}
/// Received to indicate which voice endpoint, token and guild_id to use;
pub struct VoiceServerUpdate {
pub token: String,
pub guild_id: Snowflake,
/// The guild this voice server update is for
pub guild_id: Option<Snowflake>,
/// The private channel this voice server update is for
pub channel_id: Option<Snowflake>,
pub endpoint: Option<String>,
}

View File

@ -0,0 +1,40 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use crate::types::{Snowflake, WebSocketEvent};
use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Copy)]
/// Sent when another user connects to the voice server.
///
/// Contains the user id and "flags".
///
/// Not documented anywhere, if you know what this is, please reach out
///
/// {"op":18,"d":{"user_id":"1234567890","flags":2}}
pub struct VoiceClientConnectFlags {
pub user_id: Snowflake,
// Likely some sort of bitflags
//
// Not always sent, sometimes null?
pub flags: Option<u8>,
}
impl WebSocketEvent for VoiceClientConnectFlags {}
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Copy)]
/// Sent when another user connects to the voice server.
///
/// Contains the user id and "platform".
///
/// Not documented anywhere, if you know what this is, please reach out
///
/// {"op":20,"d":{"user_id":"1234567890","platform":0}}
pub struct VoiceClientConnectPlatform {
pub user_id: Snowflake,
// Likely an enum
pub platform: u8,
}
impl WebSocketEvent for VoiceClientConnectPlatform {}

View File

@ -0,0 +1,18 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use crate::types::{Snowflake, WebSocketEvent};
use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Copy)]
/// Sent when another user disconnects from the voice server.
///
/// When received, the SSRC of the user should be discarded.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#other-client-disconnection>
pub struct VoiceClientDisconnection {
pub user_id: Snowflake,
}
impl WebSocketEvent for VoiceClientDisconnection {}

View File

@ -0,0 +1,24 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Copy)]
/// Contains info on how often the client should send heartbeats to the server;
///
/// Differs from the normal hello data in that discord sends heartbeat interval as a float.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#heartbeating>
pub struct VoiceHelloData {
/// The voice gateway version.
///
/// Note: no idea why this is sent, we already specify the version when establishing a connection.
#[serde(rename = "v")]
pub version: u8,
/// How often a client should send heartbeats, in milliseconds
pub heartbeat_interval: f64,
}
impl WebSocketEvent for VoiceHelloData {}

View File

@ -6,17 +6,20 @@ use crate::types::{Snowflake, WebSocketEvent};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)]
/// The identify payload for the webrtc stream;
/// Contains info to begin a webrtc connection;
/// See https://discord.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection-example-voice-identify-payload;
/// The identify payload for the voice gateway connection;
///
/// Contains authentication info and context to authenticate to the voice gateway.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#identify-structure>
pub struct VoiceIdentify {
server_id: Snowflake,
user_id: Snowflake,
session_id: String,
token: String,
/// The ID of the guild or the private channel being connected to
pub server_id: Snowflake,
pub user_id: Snowflake,
pub session_id: String,
pub token: String,
#[serde(skip_serializing_if = "Option::is_none")]
/// Undocumented field, but is also in discord client comms
video: Option<bool>,
pub video: Option<bool>,
// TODO: Add video streams
}
impl WebSocketEvent for VoiceIdentify {}

View File

@ -0,0 +1,18 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Copy)]
/// What does this do?
///
/// {"op":15,"d":{"any":100}}
///
/// Opcode from <https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#voice-opcodes>
pub struct VoiceMediaSinkWants {
pub any: u16,
}
impl WebSocketEvent for VoiceMediaSinkWants {}

View File

@ -0,0 +1,170 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use super::WebSocketEvent;
use serde::{Deserialize, Serialize};
use serde_json::{value::RawValue, Value};
pub use client_connect::*;
pub use client_disconnect::*;
pub use hello::*;
pub use identify::*;
pub use media_sink_wants::*;
pub use ready::*;
pub use select_protocol::*;
pub use session_description::*;
pub use speaking::*;
pub use ssrc_definition::*;
pub use voice_backend_version::*;
mod client_connect;
mod client_disconnect;
mod hello;
mod identify;
mod media_sink_wants;
mod ready;
mod select_protocol;
mod session_description;
mod speaking;
mod ssrc_definition;
mod voice_backend_version;
#[derive(Debug, Default, Serialize, Clone)]
/// The payload used for sending events to the voice gateway.
///
/// Similar to [VoiceGatewayReceivePayload], except we send a [Value] for d whilst we receive a [serde_json::value::RawValue]
pub struct VoiceGatewaySendPayload {
#[serde(rename = "op")]
pub op_code: u8,
#[serde(rename = "d")]
pub data: Value,
}
impl WebSocketEvent for VoiceGatewaySendPayload {}
#[derive(Debug, Deserialize, Clone)]
/// The payload used for receiving events from the voice gateway.
///
/// Note that this is similar to the regular gateway, except we no longer have s or t
///
/// Similar to [VoiceGatewaySendPayload], except we send a [Value] for d whilst we receive a [serde_json::value::RawValue]
pub struct VoiceGatewayReceivePayload<'a> {
#[serde(rename = "op")]
pub op_code: u8,
#[serde(borrow)]
#[serde(rename = "d")]
pub data: &'a RawValue,
}
impl<'a> WebSocketEvent for VoiceGatewayReceivePayload<'a> {}
/// The modes of encryption available in voice UDP connections;
///
/// Not all encryption modes are implemented; it is generally recommended
/// to use either [[VoiceEncryptionMode::Xsalsa20Poly1305]] or
/// [[VoiceEncryptionMode::Xsalsa20Poly1305Suffix]]
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#encryption-mode> and <https://discord.com/developers/docs/topics/voice-connections#establishing-a-voice-udp-connection-encryption-modes>
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum VoiceEncryptionMode {
#[default]
// Officially Documented
/// Use XSalsa20Poly1305 encryption, using the rtp header as a nonce.
///
/// Fully implemented
Xsalsa20Poly1305,
/// Use XSalsa20Poly1305 encryption, using a random 24 byte suffix as a nonce.
///
/// Fully implemented
Xsalsa20Poly1305Suffix,
/// Use XSalsa20Poly1305 encryption, using a 4 byte incremental value as a nonce.
///
/// Fully implemented
Xsalsa20Poly1305Lite,
// Officially Undocumented
/// Not implemented yet, we have no idea what the rtpsize nonces are.
Xsalsa20Poly1305LiteRtpsize,
/// Not implemented yet, we have no idea what the nonce is.
AeadAes256Gcm,
/// Not implemented yet, we have no idea what the rtpsize nonces are.
AeadAes256GcmRtpsize,
/// Not implemented yet, we have no idea what the rtpsize nonces are.
AeadXchacha20Poly1305Rtpsize,
}
impl VoiceEncryptionMode {
/// Returns whether this encryption mode uses Xsalsa20Poly1305 encryption.
pub fn is_xsalsa20_poly1305(&self) -> bool {
matches!(
*self,
VoiceEncryptionMode::Xsalsa20Poly1305
| VoiceEncryptionMode::Xsalsa20Poly1305Lite
| VoiceEncryptionMode::Xsalsa20Poly1305Suffix
| VoiceEncryptionMode::Xsalsa20Poly1305LiteRtpsize
)
}
/// Returns whether this encryption mode uses AeadAes256Gcm encryption.
pub fn is_aead_aes256_gcm(&self) -> bool {
matches!(
*self,
VoiceEncryptionMode::AeadAes256Gcm | VoiceEncryptionMode::AeadAes256GcmRtpsize
)
}
/// Returns whether this encryption mode uses AeadXchacha20Poly1305 encryption.
pub fn is_aead_xchacha20_poly1305(&self) -> bool {
*self == VoiceEncryptionMode::AeadXchacha20Poly1305Rtpsize
}
}
/// The possible audio codecs to use
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AudioCodec {
#[default]
Opus,
}
/// The possible video codecs to use
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "UPPERCASE")]
pub enum VideoCodec {
#[default]
VP8,
VP9,
H264,
}
// The various voice opcodes
pub const VOICE_IDENTIFY: u8 = 0;
pub const VOICE_SELECT_PROTOCOL: u8 = 1;
pub const VOICE_READY: u8 = 2;
pub const VOICE_HEARTBEAT: u8 = 3;
pub const VOICE_SESSION_DESCRIPTION: u8 = 4;
pub const VOICE_SPEAKING: u8 = 5;
pub const VOICE_HEARTBEAT_ACK: u8 = 6;
pub const VOICE_RESUME: u8 = 7;
pub const VOICE_HELLO: u8 = 8;
pub const VOICE_RESUMED: u8 = 9;
pub const VOICE_SSRC_DEFINITION: u8 = 12;
pub const VOICE_CLIENT_DISCONNECT: u8 = 13;
pub const VOICE_SESSION_UPDATE: u8 = 14;
/// What is this?
///
/// {"op":15,"d":{"any":100}}
///
/// Opcode from <https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#voice-opcodes>
pub const VOICE_MEDIA_SINK_WANTS: u8 = 15;
/// See <https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#voice-opcodes>
/// Sent with empty data from the client, the server responds with the voice backend version;
pub const VOICE_BACKEND_VERSION: u8 = 16;
// These two get simultaenously fired when a user joins, one has flags and one has a platform
pub const VOICE_CLIENT_CONNECT_FLAGS: u8 = 18;
pub const VOICE_CLIENT_CONNECT_PLATFORM: u8 = 20;

View File

@ -7,15 +7,27 @@ use std::net::Ipv4Addr;
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
use super::VoiceEncryptionMode;
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
/// The ready event for the webrtc stream;
/// Used to give info after the identify event;
/// See https://discord.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection-example-voice-ready-payload;
/// The voice gateway's ready event;
///
/// Gives the user info about the UDP connection IP and port, srrc to use,
/// available encryption modes and other data.
///
/// Sent in response to an Identify event.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#ready-structure>
pub struct VoiceReady {
ssrc: i32,
ip: Ipv4Addr,
port: u32,
modes: Vec<String>,
/// See <https://developer.mozilla.org/en-US/docs/Web/API/RTCRtpStreamStats/ssrc>
pub ssrc: u32,
pub ip: Ipv4Addr,
pub port: u16,
/// The available encryption modes for the UDP connection
pub modes: Vec<VoiceEncryptionMode>,
#[serde(default)]
pub experiments: Vec<String>,
// TODO: Add video streams
// Heartbeat interval is also sent, but is "an erroneous field and should be ignored. The correct heartbeat_interval value comes from the Hello payload."
}
@ -26,6 +38,7 @@ impl Default for VoiceReady {
ip: Ipv4Addr::UNSPECIFIED,
port: 0,
modes: Vec::new(),
experiments: Vec::new(),
}
}
}

View File

@ -0,0 +1,52 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use serde::{Deserialize, Serialize};
use super::VoiceEncryptionMode;
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// An event sent by the client to the voice gateway server,
/// detailing what protocol, address and encryption to use;
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#select-protocol-structure>
pub struct SelectProtocol {
/// The protocol to use. The only option chorus supports is [VoiceProtocol::Udp].
pub protocol: VoiceProtocol,
pub data: SelectProtocolData,
/// The UUID4 RTC connection ID, used for analytics.
///
/// Note: Not recommended to set this
pub rtc_connection_id: Option<String>,
// TODO: Add codecs, what is a codec object
/// The possible experiments we want to enable
#[serde(rename = "experiments")]
pub enabled_experiments: Vec<String>,
}
/// The possible protocol for sending a receiving voice data.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#select-protocol-structure>
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum VoiceProtocol {
#[default]
/// Sending data via UDP, documented and the only protocol chorus supports.
Udp,
// Possible value, yet NOT RECOMMENDED, AS CHORUS DOES NOT SUPPORT WEBRTC
//Webrtc,
}
#[derive(Debug, Default, Deserialize, Serialize, Clone)]
/// The data field of the SelectProtocol Event
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#protocol-data-structure>
pub struct SelectProtocolData {
/// Our external IP we got from IP discovery
pub address: String,
/// Our external UDP port we got from IP discovery
pub port: u16,
/// The mode of encryption to use
pub mode: VoiceEncryptionMode,
}

View File

@ -0,0 +1,39 @@
use super::{AudioCodec, VideoCodec, VoiceEncryptionMode};
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// Event that describes our encryption mode and secret key for encryption
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#session-description-structure>
pub struct SessionDescription {
pub audio_codec: AudioCodec,
pub video_codec: VideoCodec,
pub media_session_id: String,
/// The encryption mode to use
#[serde(rename = "mode")]
pub encryption_mode: VoiceEncryptionMode,
/// The secret key we'll use for encryption
pub secret_key: [u8; 32],
/// The keyframe interval in milliseconds
pub keyframe_interval: Option<u64>,
}
impl WebSocketEvent for SessionDescription {}
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// Event that might be sent to update session parameters
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#session-update-structure>
pub struct SessionUpdate {
#[serde(rename = "audio_codec")]
pub new_audio_codec: Option<AudioCodec>,
#[serde(rename = "video_codec")]
pub new_video_codec: Option<VideoCodec>,
#[serde(rename = "media_session_id")]
pub new_media_session_id: Option<String>,
}
impl WebSocketEvent for SessionUpdate {}

View File

@ -0,0 +1,52 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use bitflags::bitflags;
use serde::{Deserialize, Serialize};
use crate::types::{Snowflake, WebSocketEvent};
/// Event that tells the server we are speaking;
///
/// Essentially, what allows us to send UDP data and lights up the green circle around your avatar.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#speaking-structure>
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
pub struct Speaking {
/// Data about the audio we're transmitting.
///
/// See [SpeakingBitflags]
pub speaking: u8,
pub ssrc: u32,
/// The user id of the speaking user, only sent by the server
#[serde(skip_serializing)]
pub user_id: Option<Snowflake>,
/// Delay in milliseconds, not sent by the server
#[serde(default)]
pub delay: u64,
}
impl WebSocketEvent for Speaking {}
bitflags! {
/// Bitflags of speaking types;
///
/// See <https://discord.com/developers/docs/topics/voice-connections#speaking>
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Serialize, Deserialize)]
pub struct SpeakingBitflags: u8 {
/// Whether we'll be transmitting normal voice audio
const MICROPHONE = 1 << 0;
/// Whether we'll be transmitting context audio for video, no speaking indicator
const SOUNDSHARE = 1 << 1;
/// Whether we are a priority speaker, lowering audio of other speakers
const PRIORITY = 1 << 2;
}
}
impl Default for SpeakingBitflags {
/// Returns the default value for these flags, assuming normal microphone audio and not being a priority speaker
fn default() -> Self {
Self::MICROPHONE
}
}

View File

@ -0,0 +1,53 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use crate::types::{Snowflake, WebSocketEvent};
use serde::{Deserialize, Serialize};
/// Defines an event which provides ssrcs for voice and video for a user id.
///
/// This event is sent when we begin to speak.
///
/// It must be sent before sending audio, or else clients will not be able to play the stream.
///
/// This event is sent via opcode 12.
///
/// Examples of the event:
///
/// When receiving:
/// ```json
/// {"op":12,"d":{"video_ssrc":0,"user_id":"463640391196082177","streams":[{"ssrc":26595,"rtx_ssrc":26596,"rid":"100","quality":100,"max_resolution":{"width":1280,"type":"fixed","height":720},"max_framerate":30,"active":false}],"audio_ssrc":26597}}{"op":12,"d":{"video_ssrc":0,"user_id":"463640391196082177","streams":[{"ssrc":26595,"rtx_ssrc":26596,"rid":"100","quality":100,"max_resolution":{"width":1280,"type":"fixed","height":720},"max_framerate":30,"active":false}],"audio_ssrc":26597}}
/// ```
///
/// When sending:
/// ```json
/// {"op":12,"d":{"audio_ssrc":2307250864,"video_ssrc":0,"rtx_ssrc":0,"streams":[{"type":"video","rid":"100","ssrc":26595,"active":false,"quality":100,"rtx_ssrc":26596,"max_bitrate":2500000,"max_framerate":30,"max_resolution":{"type":"fixed","width":1280,"height":720}}]}}
/// ```
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)]
pub struct SsrcDefinition {
/// The ssrc used for video communications.
///
/// Is always sent and received, though is 0 if describing only the audio ssrc.
#[serde(default)]
pub video_ssrc: usize,
/// The ssrc used for audio communications.
///
/// Is always sent and received, though is 0 if describing only the video ssrc.
#[serde(default)]
pub audio_ssrc: usize,
// Not sure what this is
// It is usually 0
#[serde(default)]
pub rtx_ssrc: usize,
/// The user id these ssrcs apply to.
///
/// Is never sent by the user and is filled in by the server
#[serde(skip_serializing)]
pub user_id: Option<Snowflake>,
// TODO: Add video streams
#[serde(default)]
pub streams: Vec<String>,
}
impl WebSocketEvent for SsrcDefinition {}

View File

@ -0,0 +1,21 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq)]
/// Received from the voice gateway server to describe the backend version.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#voice-backend-version>
pub struct VoiceBackendVersion {
/// The voice backend's version
#[serde(rename = "voice")]
pub voice_version: String,
/// The WebRTC worker's version
#[serde(rename = "rtc_worker")]
pub rtc_worker_version: String,
}
impl WebSocketEvent for VoiceBackendVersion {}

View File

@ -1,6 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//! Where the voice chat implementation will be, once it's finished.
//! For development on voice, see the feature/voice branch.

90
src/voice/crypto.rs Normal file
View File

@ -0,0 +1,90 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//! Defines cryptography functions used within the voice implementation.
//!
//! All functions in this module return a 24 byte long `Vec<u8>`.
/// Gets an `xsalsa20_poly1305` nonce from an rtppacket.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#encryption-mode>
pub(crate) fn get_xsalsa20_poly1305_nonce(packet: &[u8]) -> Vec<u8> {
let mut rtp_header = Vec::with_capacity(24);
rtp_header.append(&mut packet[0..12].to_vec());
// The header is only 12 bytes, but the nonce has to be 24
while rtp_header.len() < 24 {
rtp_header.push(0);
}
rtp_header
}
/// Gets an `xsalsa20_poly1305_suffix` nonce from an rtppacket.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#encryption-mode>
pub(crate) fn get_xsalsa20_poly1305_suffix_nonce(packet: &[u8]) -> Vec<u8> {
let mut nonce = Vec::with_capacity(24);
nonce.append(&mut packet[(packet.len() - 24)..packet.len()].to_vec());
nonce
}
/// Gets an `xsalsa20_poly1305_lite` nonce from an rtppacket.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#encryption-mode>
pub(crate) fn get_xsalsa20_poly1305_lite_nonce(packet: &[u8]) -> Vec<u8> {
let mut nonce = Vec::with_capacity(24);
nonce.append(&mut packet[(packet.len() - 4)..packet.len()].to_vec());
// The suffix is only 4 bytes, but the nonce has to be 24
while nonce.len() < 24 {
nonce.push(0);
}
nonce
}
#[test]
// Asserts all functions that retrieve a nonce from packet bytes
fn test_packet_nonce_derives() {
let test_packet_bytes = vec![
144, 120, 98, 5, 71, 174, 52, 64, 0, 4, 85, 36, 178, 8, 37, 146, 35, 154, 141, 36, 125, 15,
65, 179, 227, 108, 165, 56, 68, 68, 3, 62, 87, 233, 7, 81, 147, 93, 22, 95, 115, 202, 48,
66, 190, 229, 69, 146, 66, 108, 60, 114, 2, 228, 111, 40, 108, 5, 68, 226, 76, 240, 20,
231, 210, 214, 123, 175, 188, 161, 10, 125, 13, 196, 114, 248, 50, 84, 103, 139, 86, 223,
82, 173, 8, 209, 78, 188, 169, 151, 157, 42, 189, 153, 228, 105, 199, 19, 185, 16, 33, 133,
113, 253, 145, 36, 106, 14, 222, 128, 226, 239, 10, 39, 72, 113, 33, 113,
];
let nonce_1 = get_xsalsa20_poly1305_nonce(&test_packet_bytes);
let nonce_1_expected = vec![
144, 120, 98, 5, 71, 174, 52, 64, 0, 4, 85, 36, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
let nonce_2 = get_xsalsa20_poly1305_suffix_nonce(&test_packet_bytes);
let nonce_2_expected = vec![
228, 105, 199, 19, 185, 16, 33, 133, 113, 253, 145, 36, 106, 14, 222, 128, 226, 239, 10,
39, 72, 113, 33, 113,
];
let nonce_3 = get_xsalsa20_poly1305_lite_nonce(&test_packet_bytes);
let nonce_3_expected = vec![
72, 113, 33, 113, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
println!("nonce 1: {:?}", nonce_1);
println!("nonce 2: {:?}", nonce_2);
println!("nonce 3: {:?}", nonce_3);
assert_eq!(nonce_1.len(), 24);
assert_eq!(nonce_2.len(), 24);
assert_eq!(nonce_3.len(), 24);
assert_eq!(nonce_1, nonce_1_expected);
assert_eq!(nonce_2, nonce_2_expected);
assert_eq!(nonce_3, nonce_3_expected);
}

View File

@ -0,0 +1,27 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))]
pub mod tungstenite;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))]
pub use tungstenite::*;
#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))]
pub mod wasm;
#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))]
pub use wasm::*;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))]
pub type Sink = tungstenite::TungsteniteSink;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))]
pub type Stream = tungstenite::TungsteniteStream;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))]
pub type WebSocketBackend = tungstenite::TungsteniteBackend;
#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))]
pub type Sink = wasm::WasmSink;
#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))]
pub type Stream = wasm::WasmStream;
#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))]
pub type WebSocketBackend = wasm::WasmBackend;

View File

@ -0,0 +1,77 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use futures_util::{
stream::{SplitSink, SplitStream},
StreamExt,
};
use tokio::net::TcpStream;
use tokio_tungstenite::{
connect_async_tls_with_config, tungstenite, Connector, MaybeTlsStream, WebSocketStream,
};
use crate::{errors::VoiceGatewayError, voice::gateway::VoiceGatewayMessage};
#[derive(Debug, Clone)]
pub struct TungsteniteBackend;
// These could be made into inherent associated types when that's stabilized
pub type TungsteniteSink =
SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>;
pub type TungsteniteStream = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
impl TungsteniteBackend {
pub async fn connect(
websocket_url: &str,
) -> Result<(TungsteniteSink, TungsteniteStream), crate::errors::VoiceGatewayError> {
let mut roots = rustls::RootCertStore::empty();
let certs = rustls_native_certs::load_native_certs();
if let Err(e) = certs {
log::error!("Failed to load platform native certs! {:?}", e);
return Err(VoiceGatewayError::CannotConnect {
error: format!("{:?}", e),
});
}
for cert in certs.unwrap() {
roots.add(&rustls::Certificate(cert.0)).unwrap();
}
let (websocket_stream, _) = match connect_async_tls_with_config(
websocket_url,
None,
false,
Some(Connector::Rustls(
rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(roots)
.with_no_client_auth()
.into(),
)),
)
.await
{
Ok(websocket_stream) => websocket_stream,
Err(e) => {
return Err(VoiceGatewayError::CannotConnect {
error: e.to_string(),
})
}
};
Ok(websocket_stream.split())
}
}
impl From<VoiceGatewayMessage> for tungstenite::Message {
fn from(message: VoiceGatewayMessage) -> Self {
Self::Text(message.0)
}
}
impl From<tungstenite::Message> for VoiceGatewayMessage {
fn from(value: tungstenite::Message) -> Self {
Self(value.to_string())
}
}

View File

@ -0,0 +1,52 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use futures_util::{
stream::{SplitSink, SplitStream},
StreamExt,
};
use ws_stream_wasm::*;
use crate::errors::VoiceGatewayError;
use crate::voice::gateway::VoiceGatewayMessage;
#[derive(Debug, Clone)]
pub struct WasmBackend;
// These could be made into inherent associated types when that's stabilized
pub type WasmSink = SplitSink<WsStream, WsMessage>;
pub type WasmStream = SplitStream<WsStream>;
impl WasmBackend {
pub async fn connect(websocket_url: &str) -> Result<(WasmSink, WasmStream), VoiceGatewayError> {
let (_, websocket_stream) = match WsMeta::connect(websocket_url, None).await {
Ok(stream) => Ok(stream),
Err(e) => Err(VoiceGatewayError::CannotConnect {
error: e.to_string(),
}),
}?;
Ok(websocket_stream.split())
}
}
impl From<VoiceGatewayMessage> for WsMessage {
fn from(message: VoiceGatewayMessage) -> Self {
Self::Text(message.0)
}
}
impl From<WsMessage> for VoiceGatewayMessage {
fn from(value: WsMessage) -> Self {
match value {
WsMessage::Text(text) => Self(text),
WsMessage::Binary(bin) => {
let mut text = String::new();
let _ = bin.iter().map(|v| text.push_str(&v.to_string()));
Self(text)
}
}
}
}

View File

@ -0,0 +1,28 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use crate::{
errors::VoiceGatewayError,
gateway::GatewayEvent,
types::{
SessionDescription, SessionUpdate, Speaking, SsrcDefinition, VoiceBackendVersion,
VoiceClientConnectFlags, VoiceClientConnectPlatform, VoiceClientDisconnection,
VoiceMediaSinkWants, VoiceReady,
},
};
#[derive(Default, Debug)]
pub struct VoiceEvents {
pub voice_ready: GatewayEvent<VoiceReady>,
pub backend_version: GatewayEvent<VoiceBackendVersion>,
pub session_description: GatewayEvent<SessionDescription>,
pub session_update: GatewayEvent<SessionUpdate>,
pub speaking: GatewayEvent<Speaking>,
pub ssrc_definition: GatewayEvent<SsrcDefinition>,
pub client_disconnect: GatewayEvent<VoiceClientDisconnection>,
pub client_connect_flags: GatewayEvent<VoiceClientConnectFlags>,
pub client_connect_platform: GatewayEvent<VoiceClientConnectPlatform>,
pub media_sink_wants: GatewayEvent<VoiceMediaSinkWants>,
pub error: GatewayEvent<VoiceGatewayError>,
}

View File

@ -0,0 +1,349 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use std::{sync::Arc, time::Duration};
use log::*;
use tokio::sync::Mutex;
use futures_util::SinkExt;
use futures_util::StreamExt;
use crate::{
errors::VoiceGatewayError,
gateway::GatewayEvent,
types::{
VoiceGatewayReceivePayload, VoiceHelloData, WebSocketEvent, VOICE_BACKEND_VERSION,
VOICE_CLIENT_CONNECT_FLAGS, VOICE_CLIENT_CONNECT_PLATFORM, VOICE_CLIENT_DISCONNECT,
VOICE_HEARTBEAT, VOICE_HEARTBEAT_ACK, VOICE_HELLO, VOICE_IDENTIFY, VOICE_MEDIA_SINK_WANTS,
VOICE_READY, VOICE_RESUME, VOICE_SELECT_PROTOCOL, VOICE_SESSION_DESCRIPTION,
VOICE_SESSION_UPDATE, VOICE_SPEAKING, VOICE_SSRC_DEFINITION,
},
voice::gateway::{
heartbeat::VoiceHeartbeatThreadCommunication, VoiceGatewayMessage, WebSocketBackend,
},
};
use super::{
events::VoiceEvents, heartbeat::VoiceHeartbeatHandler, Sink, Stream, VoiceGatewayHandle,
};
#[derive(Debug)]
pub struct VoiceGateway {
events: Arc<Mutex<VoiceEvents>>,
heartbeat_handler: VoiceHeartbeatHandler,
websocket_send: Arc<Mutex<Sink>>,
websocket_receive: Stream,
kill_send: tokio::sync::broadcast::Sender<()>,
kill_receive: tokio::sync::broadcast::Receiver<()>,
}
impl VoiceGateway {
#[allow(clippy::new_ret_no_self)]
pub async fn spawn(websocket_url: String) -> Result<VoiceGatewayHandle, VoiceGatewayError> {
// Append the needed things to the websocket url
let processed_url = format!("wss://{}/?v=7", websocket_url);
trace!("Created voice socket url: {}", processed_url.clone());
let (websocket_send, mut websocket_receive) =
WebSocketBackend::connect(&processed_url).await?;
let shared_websocket_send = Arc::new(Mutex::new(websocket_send));
// Create a shared broadcast channel for killing all gateway tasks
let (kill_send, mut _kill_receive) = tokio::sync::broadcast::channel::<()>(16);
// Wait for the first hello and then spawn both tasks so we avoid nested tasks
// This automatically spawns the heartbeat task, but from the main thread
#[cfg(not(target_arch = "wasm32"))]
let msg: VoiceGatewayMessage = websocket_receive.next().await.unwrap().unwrap().into();
#[cfg(target_arch = "wasm32")]
let msg: VoiceGatewayMessage = websocket_receive.next().await.unwrap().into();
let gateway_payload: VoiceGatewayReceivePayload = serde_json::from_str(&msg.0).unwrap();
if gateway_payload.op_code != VOICE_HELLO {
return Err(VoiceGatewayError::NonHelloOnInitiate {
opcode: gateway_payload.op_code,
});
}
info!("VGW: Received Hello");
// The hello data for voice gateways is in float milliseconds, so we convert it to f64 seconds
let gateway_hello: VoiceHelloData =
serde_json::from_str(gateway_payload.data.get()).unwrap();
let heartbeat_interval_seconds: f64 = gateway_hello.heartbeat_interval / 1000.0;
let voice_events = VoiceEvents::default();
let shared_events = Arc::new(Mutex::new(voice_events));
let mut gateway = VoiceGateway {
events: shared_events.clone(),
heartbeat_handler: VoiceHeartbeatHandler::new(
Duration::from_secs_f64(heartbeat_interval_seconds),
1, // to:do actually compute nonce
shared_websocket_send.clone(),
kill_send.subscribe(),
),
websocket_send: shared_websocket_send.clone(),
websocket_receive,
kill_send: kill_send.clone(),
kill_receive: kill_send.subscribe(),
};
// Now we can continuously check for messages in a different task, since we aren't going to receive another hello
#[cfg(not(target_arch = "wasm32"))]
tokio::task::spawn(async move {
gateway.gateway_listen_task().await;
});
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
gateway.gateway_listen_task().await;
});
Ok(VoiceGatewayHandle {
url: websocket_url.clone(),
events: shared_events,
websocket_send: shared_websocket_send.clone(),
kill_send: kill_send.clone(),
})
}
/// The main gateway listener task;
pub async fn gateway_listen_task(&mut self) {
loop {
let msg;
tokio::select! {
Ok(_) = self.kill_receive.recv() => {
log::trace!("VGW: Closing listener task");
break;
}
message = self.websocket_receive.next() => {
msg = message;
}
}
// PRETTYFYME: Remove inline conditional compiling
#[cfg(not(target_arch = "wasm32"))]
if let Some(Ok(message)) = msg {
self.handle_message(message.into()).await;
continue;
}
#[cfg(target_arch = "wasm32")]
if let Some(message) = msg {
self.handle_message(message.into()).await;
continue;
}
// We couldn't receive the next message or it was an error, something is wrong with the websocket, close
warn!("VGW: Websocket is broken, stopping gateway");
break;
}
}
/// Closes the websocket connection and stops all tasks
async fn close(&mut self) {
self.kill_send.send(()).unwrap();
self.websocket_send.lock().await.close().await.unwrap();
}
/// Deserializes and updates a dispatched event, when we already know its type;
/// (Called for every event in handle_message)
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
data: &'a str,
event: &mut GatewayEvent<T>,
) -> Result<(), serde_json::Error> {
let data_deserialize_result: Result<T, serde_json::Error> = serde_json::from_str(data);
if data_deserialize_result.is_err() {
return Err(data_deserialize_result.err().unwrap());
}
event.notify(data_deserialize_result.unwrap()).await;
Ok(())
}
/// This handles a message as a websocket event and updates its events along with the events' observers
pub async fn handle_message(&mut self, msg: VoiceGatewayMessage) {
if msg.0.is_empty() {
return;
}
let Ok(gateway_payload) = msg.payload() else {
if let Some(error) = msg.error() {
warn!("GW: Received error {:?}, connection will close..", error);
self.close().await;
self.events.lock().await.error.notify(error).await;
} else {
warn!(
"Message unrecognised: {:?}, please open an issue on the chorus github",
msg.0
);
}
return;
};
// See <https://discord.com/developers/docs/topics/voice-connections>
match gateway_payload.op_code {
VOICE_READY => {
trace!("VGW: Received READY!");
let event = &mut self.events.lock().await.voice_ready;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!("Failed to parse VOICE_READY ({})", result.err().unwrap());
}
}
VOICE_BACKEND_VERSION => {
trace!("VGW: Received Backend Version");
let event = &mut self.events.lock().await.backend_version;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_BACKEND_VERSION ({})",
result.err().unwrap()
);
}
}
VOICE_SESSION_DESCRIPTION => {
trace!("VGW: Received Session Description");
let event = &mut self.events.lock().await.session_description;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_SESSION_DESCRIPTION ({})",
result.err().unwrap()
);
}
}
VOICE_SESSION_UPDATE => {
trace!("VGW: Received Session Update");
let event = &mut self.events.lock().await.session_update;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_SESSION_UPDATE ({})",
result.err().unwrap()
);
}
}
VOICE_SPEAKING => {
trace!("VGW: Received Speaking");
let event = &mut self.events.lock().await.speaking;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!("Failed to parse VOICE_SPEAKING ({})", result.err().unwrap());
}
}
VOICE_SSRC_DEFINITION => {
trace!("VGW: Received Ssrc Definition");
let event = &mut self.events.lock().await.ssrc_definition;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_SSRC_DEFINITION ({})",
result.err().unwrap()
);
}
}
VOICE_CLIENT_DISCONNECT => {
trace!("VGW: Received Client Disconnect");
let event = &mut self.events.lock().await.client_disconnect;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_CLIENT_DISCONNECT ({})",
result.err().unwrap()
);
}
}
VOICE_CLIENT_CONNECT_FLAGS => {
trace!("VGW: Received Client Connect Flags");
let event = &mut self.events.lock().await.client_connect_flags;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_CLIENT_CONNECT_FLAGS ({})",
result.err().unwrap()
);
}
}
VOICE_CLIENT_CONNECT_PLATFORM => {
trace!("VGW: Received Client Connect Platform");
let event = &mut self.events.lock().await.client_connect_platform;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_CLIENT_CONNECT_PLATFORM ({})",
result.err().unwrap()
);
}
}
VOICE_MEDIA_SINK_WANTS => {
trace!("VGW: Received Media Sink Wants");
let event = &mut self.events.lock().await.media_sink_wants;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_MEDIA_SINK_WANTS ({})",
result.err().unwrap()
);
}
}
// We received a heartbeat from the server
// "Discord may send the app a Heartbeat (opcode 1) event, in which case the app should send a Heartbeat event immediately."
VOICE_HEARTBEAT => {
trace!("VGW: Received Heartbeat // Heartbeat Request");
// Tell the heartbeat handler it should send a heartbeat right away
let heartbeat_communication = VoiceHeartbeatThreadCommunication {
updated_nonce: None,
op_code: Some(VOICE_HEARTBEAT),
};
self.heartbeat_handler
.send
.send(heartbeat_communication)
.await
.unwrap();
}
VOICE_HEARTBEAT_ACK => {
trace!("VGW: Received Heartbeat ACK");
// Tell the heartbeat handler we received an ack
let heartbeat_communication = VoiceHeartbeatThreadCommunication {
updated_nonce: None,
op_code: Some(VOICE_HEARTBEAT_ACK),
};
self.heartbeat_handler
.send
.send(heartbeat_communication)
.await
.unwrap();
}
VOICE_IDENTIFY | VOICE_SELECT_PROTOCOL | VOICE_RESUME => {
info!(
"VGW: Received unexpected opcode ({}) for current state. This might be due to a faulty server implementation and is likely not the fault of chorus.",
gateway_payload.op_code
);
}
_ => {
warn!("VGW: Received unrecognized voice gateway op code ({})! Please open an issue on the chorus github so we can implement it", gateway_payload.op_code);
}
}
}
}

105
src/voice/gateway/handle.rs Normal file
View File

@ -0,0 +1,105 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use std::sync::Arc;
use log::*;
use futures_util::SinkExt;
use serde_json::json;
use tokio::sync::Mutex;
use crate::types::{
SelectProtocol, Speaking, SsrcDefinition, VoiceGatewaySendPayload, VoiceIdentify,
VOICE_BACKEND_VERSION, VOICE_IDENTIFY, VOICE_SELECT_PROTOCOL, VOICE_SPEAKING,
VOICE_SSRC_DEFINITION,
};
use super::{events::VoiceEvents, Sink, VoiceGatewayMessage};
/// Represents a handle to a Voice Gateway connection.
/// Using this handle you can send Gateway Events directly.
#[derive(Debug, Clone)]
pub struct VoiceGatewayHandle {
pub url: String,
pub events: Arc<Mutex<VoiceEvents>>,
pub websocket_send: Arc<Mutex<Sink>>,
/// Tells gateway tasks to close
pub(super) kill_send: tokio::sync::broadcast::Sender<()>,
}
impl VoiceGatewayHandle {
/// Sends json to the gateway with an opcode
async fn send_json(&self, op_code: u8, to_send: serde_json::Value) {
let gateway_payload = VoiceGatewaySendPayload {
op_code,
data: to_send,
};
let payload_json = serde_json::to_string(&gateway_payload).unwrap();
let message = VoiceGatewayMessage(payload_json);
self.websocket_send
.lock()
.await
.send(message.into())
.await
.unwrap();
}
/// Sends a voice identify event to the gateway
pub async fn send_identify(&self, to_send: VoiceIdentify) {
let to_send_value = serde_json::to_value(&to_send).unwrap();
trace!("VGW: Sending Identify..");
self.send_json(VOICE_IDENTIFY, to_send_value).await;
}
/// Sends a select protocol event to the gateway
pub async fn send_select_protocol(&self, to_send: SelectProtocol) {
let to_send_value = serde_json::to_value(&to_send).unwrap();
trace!("VGW: Sending Select Protocol");
self.send_json(VOICE_SELECT_PROTOCOL, to_send_value).await;
}
/// Sends a speaking event to the gateway
pub async fn send_speaking(&self, to_send: Speaking) {
let to_send_value = serde_json::to_value(&to_send).unwrap();
trace!("VGW: Sending Speaking");
self.send_json(VOICE_SPEAKING, to_send_value).await;
}
/// Sends an ssrc definition event
pub async fn send_ssrc_definition(&self, to_send: SsrcDefinition) {
let to_send_value = serde_json::to_value(&to_send).unwrap();
trace!("VGW: Sending SsrcDefinition");
self.send_json(VOICE_SSRC_DEFINITION, to_send_value).await;
}
/// Sends a voice backend version request to the gateway
pub async fn send_voice_backend_version_request(&self) {
let data_empty_object = json!("{}");
trace!("VGW: Requesting voice backend version");
self.send_json(VOICE_BACKEND_VERSION, data_empty_object)
.await;
}
/// Closes the websocket connection and stops all gateway tasks;
///
/// Essentially pulls the plug on the voice gateway, leaving it possible to resume;
pub async fn close(&self) {
self.kill_send.send(()).unwrap();
self.websocket_send.lock().await.close().await.unwrap();
}
}

View File

@ -0,0 +1,174 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use futures_util::SinkExt;
use log::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep_until;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep_until;
use std::{sync::Arc, time::Duration};
use tokio::sync::{
mpsc::{Receiver, Sender},
Mutex,
};
#[cfg(not(target_arch = "wasm32"))]
use tokio::task;
use crate::{
gateway::heartbeat::HEARTBEAT_ACK_TIMEOUT,
types::{VoiceGatewaySendPayload, VOICE_HEARTBEAT, VOICE_HEARTBEAT_ACK},
voice::gateway::VoiceGatewayMessage,
};
use super::Sink;
/// Handles sending heartbeats to the voice gateway in another thread
#[allow(dead_code)] // FIXME: Remove this, once all fields of VoiceHeartbeatHandler are used
#[derive(Debug)]
pub(super) struct VoiceHeartbeatHandler {
/// The heartbeat interval in milliseconds
pub heartbeat_interval: Duration,
/// The send channel for the heartbeat thread
pub send: Sender<VoiceHeartbeatThreadCommunication>,
}
impl VoiceHeartbeatHandler {
pub fn new(
heartbeat_interval: Duration,
starting_nonce: u64,
websocket_tx: Arc<Mutex<Sink>>,
kill_rc: tokio::sync::broadcast::Receiver<()>,
) -> Self {
let (send, receive) = tokio::sync::mpsc::channel(32);
let kill_receive = kill_rc.resubscribe();
#[cfg(not(target_arch = "wasm32"))]
task::spawn(async move {
Self::heartbeat_task(
websocket_tx,
heartbeat_interval,
starting_nonce,
receive,
kill_receive,
)
.await;
});
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
Self::heartbeat_task(
websocket_tx,
heartbeat_interval,
starting_nonce,
receive,
kill_receive,
)
.await;
});
Self {
heartbeat_interval,
send,
}
}
/// The main heartbeat task;
///
/// Can be killed by the kill broadcast;
/// If the websocket is closed, will die out next time it tries to send a heartbeat;
pub async fn heartbeat_task(
websocket_tx: Arc<Mutex<Sink>>,
heartbeat_interval: Duration,
starting_nonce: u64,
mut receive: Receiver<VoiceHeartbeatThreadCommunication>,
mut kill_receive: tokio::sync::broadcast::Receiver<()>,
) {
let mut last_heartbeat_timestamp: Instant = Instant::now();
let mut last_heartbeat_acknowledged = true;
let mut nonce: u64 = starting_nonce;
loop {
let timeout = if last_heartbeat_acknowledged {
heartbeat_interval
} else {
// If the server hasn't acknowledged our heartbeat we should resend it
Duration::from_millis(HEARTBEAT_ACK_TIMEOUT)
};
let mut should_send = false;
tokio::select! {
() = sleep_until(last_heartbeat_timestamp + timeout) => {
should_send = true;
}
Some(communication) = receive.recv() => {
// If we received a nonce update, use that nonce now
if communication.updated_nonce.is_some() {
nonce = communication.updated_nonce.unwrap();
}
if let Some(op_code) = communication.op_code {
match op_code {
VOICE_HEARTBEAT => {
// As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately
should_send = true;
}
VOICE_HEARTBEAT_ACK => {
// The server received our heartbeat
last_heartbeat_acknowledged = true;
}
_ => {}
}
}
}
Ok(_) = kill_receive.recv() => {
log::trace!("VGW: Closing heartbeat task");
break;
}
}
if should_send {
trace!("VGW: Sending Heartbeat..");
let heartbeat = VoiceGatewaySendPayload {
op_code: VOICE_HEARTBEAT,
data: nonce.into(),
};
let heartbeat_json = serde_json::to_string(&heartbeat).unwrap();
let msg = VoiceGatewayMessage(heartbeat_json);
let send_result = websocket_tx.lock().await.send(msg.into()).await;
if send_result.is_err() {
// We couldn't send, the websocket is broken
warn!("VGW: Couldnt send heartbeat, websocket seems broken");
break;
}
last_heartbeat_timestamp = Instant::now();
last_heartbeat_acknowledged = false;
}
}
}
}
/// Used for communications between the voice heartbeat and voice gateway thread.
/// Either signifies a nonce update, a heartbeat ACK or a Heartbeat request by the server
#[derive(Clone, Copy, Debug)]
pub(super) struct VoiceHeartbeatThreadCommunication {
/// The opcode for the communication we received, if relevant
pub(super) op_code: Option<u8>,
/// The new nonce to use, if any
pub(super) updated_nonce: Option<u64>,
}

View File

@ -0,0 +1,46 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use crate::{errors::VoiceGatewayError, types::VoiceGatewayReceivePayload};
/// Represents a message received from the voice websocket connection.
///
/// This will be either a [VoiceGatewayReceivePayload], containing voice gateway events, or a [VoiceGatewayError].
///
/// This struct is used internally when handling messages.
#[derive(Clone, Debug)]
pub struct VoiceGatewayMessage(pub String);
impl VoiceGatewayMessage {
/// Parses the message as an error;
/// Returns the error if successfully parsed, None if the message isn't an error
pub fn error(&self) -> Option<VoiceGatewayError> {
// Some error strings have dots on the end, which we don't care about
let processed_content = self.0.to_lowercase().replace('.', "");
match processed_content.as_str() {
"unknown opcode" | "4001" => Some(VoiceGatewayError::UnknownOpcode),
"decode error" | "failed to decode payload" | "4002" => {
Some(VoiceGatewayError::FailedToDecodePayload)
}
"not authenticated" | "4003" => Some(VoiceGatewayError::NotAuthenticated),
"authentication failed" | "4004" => Some(VoiceGatewayError::AuthenticationFailed),
"already authenticated" | "4005" => Some(VoiceGatewayError::AlreadyAuthenticated),
"session is no longer valid" | "4006" => Some(VoiceGatewayError::SessionNoLongerValid),
"session timeout" | "4009" => Some(VoiceGatewayError::SessionTimeout),
"server not found" | "4011" => Some(VoiceGatewayError::ServerNotFound),
"unknown protocol" | "4012" => Some(VoiceGatewayError::UnknownProtocol),
"disconnected" | "4014" => Some(VoiceGatewayError::Disconnected),
"voice server crashed" | "4015" => Some(VoiceGatewayError::VoiceServerCrashed),
"unknown encryption mode" | "4016" => Some(VoiceGatewayError::UnknownEncryptionMode),
_ => None,
}
}
/// Parses the message as a payload;
/// Returns a result of deserializing
pub fn payload(&self) -> Result<VoiceGatewayReceivePayload, serde_json::Error> {
serde_json::from_str(&self.0)
}
}

View File

@ -2,8 +2,14 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
pub use identify::*;
pub use ready::*;
pub mod backends;
pub mod events;
pub mod gateway;
pub mod handle;
pub mod heartbeat;
pub mod message;
mod identify;
mod ready;
pub use backends::*;
pub use gateway::*;
pub use handle::*;
pub use message::*;

17
src/voice/mod.rs Normal file
View File

@ -0,0 +1,17 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//! Module for all voice functionality within chorus.
mod crypto;
#[cfg(feature = "voice_gateway")]
pub mod gateway;
#[cfg(feature = "voice_udp")]
pub mod udp;
#[cfg(feature = "voice_udp")]
pub mod voice_data;
// Pub use this so users can interact with packet types if they want
#[cfg(feature = "voice_udp")]
pub use discortp;

View File

@ -0,0 +1,16 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_udp"))]
pub mod tokio;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_udp"))]
pub use tokio::*;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_udp"))]
pub type UdpSocket = tokio::TokioSocket;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_udp"))]
pub type UdpBackend = tokio::TokioBackend;
#[cfg(all(target_arch = "wasm32", feature = "voice_udp"))]
compile_error!("UDP Voice support is not (and will likely never be) supported for WASM. This is because UDP cannot be used in the browser. We are however looking into Webrtc for WASM voice support.");

View File

@ -0,0 +1,37 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use std::net::SocketAddr;
use crate::errors::VoiceUdpError;
#[derive(Debug, Clone)]
pub struct TokioBackend;
pub type TokioSocket = tokio::net::UdpSocket;
impl TokioBackend {
pub async fn connect(url: SocketAddr) -> Result<TokioSocket, VoiceUdpError> {
// Bind with a port number of 0, so the os assigns this listener a port
let udp_socket_result = TokioSocket::bind("0.0.0.0:0").await;
if let Err(e) = udp_socket_result {
return Err(VoiceUdpError::CannotBind {
error: format!("{:?}", e),
});
}
let udp_socket = udp_socket_result.unwrap();
let connection_result = udp_socket.connect(url).await;
if let Err(e) = connection_result {
return Err(VoiceUdpError::CannotConnect {
error: format!("{:?}", e),
});
}
Ok(udp_socket)
}
}

25
src/voice/udp/events.rs Normal file
View File

@ -0,0 +1,25 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use discortp::{rtcp::Rtcp, rtp::Rtp};
use crate::{gateway::GatewayEvent, types::WebSocketEvent};
impl WebSocketEvent for Rtp {}
impl WebSocketEvent for Rtcp {}
#[derive(Debug)]
pub struct VoiceUDPEvents {
pub rtp: GatewayEvent<Rtp>,
pub rtcp: GatewayEvent<Rtcp>,
}
impl Default for VoiceUDPEvents {
fn default() -> Self {
Self {
rtp: GatewayEvent::new(),
rtcp: GatewayEvent::new(),
}
}
}

256
src/voice/udp/handle.rs Normal file
View File

@ -0,0 +1,256 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use std::sync::Arc;
use crypto_secretbox::{
aead::Aead, cipher::generic_array::GenericArray, KeyInit, XSalsa20Poly1305,
};
use discortp::Packet;
use getrandom::getrandom;
use log::*;
use tokio::{sync::Mutex, sync::RwLock};
use super::UdpSocket;
use crate::{
errors::VoiceUdpError,
types::VoiceEncryptionMode,
voice::{crypto::get_xsalsa20_poly1305_nonce, voice_data::VoiceData},
};
use super::{events::VoiceUDPEvents, RTP_HEADER_SIZE};
/// Handle to a voice UDP connection
///
/// Can be safely cloned and will still correspond to the same connection.
#[derive(Debug, Clone)]
pub struct UdpHandle {
pub events: Arc<Mutex<VoiceUDPEvents>>,
pub(super) socket: Arc<UdpSocket>,
pub data: Arc<RwLock<VoiceData>>,
}
impl UdpHandle {
/// Constructs and sends encoded opus rtp data.
///
/// Automatically makes an [RtpPacket](discortp::rtp::RtpPacket), encrypts it and sends it.
///
/// # Errors
/// If we do not have VoiceReady data, which contains our ssrc, this returns a
/// [VoiceUdpError::NoData] error.
///
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// If the UDP socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_opus_data(
&self,
timestamp: u32,
payload: Vec<u8>,
) -> Result<(), VoiceUdpError> {
let voice_ready_data_result = self.data.read().await.ready_data.clone();
if voice_ready_data_result.is_none() {
return Err(VoiceUdpError::NoData);
}
let ssrc = voice_ready_data_result.unwrap().ssrc;
let sequence_number = self.data.read().await.last_sequence_number.wrapping_add(1);
self.data.write().await.last_sequence_number = sequence_number;
let payload_len = payload.len();
let rtp_data = discortp::rtp::Rtp {
// Always the same
version: 2,
padding: 0,
extension: 0,
csrc_count: 0,
csrc_list: Vec::new(),
marker: 0,
payload_type: discortp::rtp::RtpType::Dynamic(120),
// Actually variable
sequence: sequence_number.into(),
timestamp: timestamp.into(),
ssrc,
payload,
};
let buffer_size = payload_len + RTP_HEADER_SIZE as usize;
let mut buffer = vec![0; buffer_size];
let mut rtp_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).expect("Mangled rtp packet creation buffer, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
rtp_packet.populate(&rtp_data);
self.send_rtp_packet(rtp_packet).await
}
/// Encrypts and sends and rtp packet.
///
/// # Errors
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_rtp_packet(
&self,
packet: discortp::rtp::MutableRtpPacket<'_>,
) -> Result<(), VoiceUdpError> {
let mut buffer = self.encrypt_rtp_packet_payload(&packet).await?;
let new_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap();
self.send_encrypted_rtp_packet(new_packet.consume_to_immutable())
.await?;
Ok(())
}
/// Encrypts an unencrypted rtp packet, returning a copy of the packet's bytes with an
/// encrypted payload
///
/// # Errors
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// When using voice encryption modes which require special nonce generation, and said generation fails, this returns a [VoiceUdpError::FailedNonceGeneration] error.
pub async fn encrypt_rtp_packet_payload(
&self,
packet: &discortp::rtp::MutableRtpPacket<'_>,
) -> Result<Vec<u8>, VoiceUdpError> {
let payload = packet.payload();
let session_description_result = self.data.read().await.session_description.clone();
// We are trying to encrypt, but have not received SessionDescription yet,
// which contains the secret key.
if session_description_result.is_none() {
return Err(VoiceUdpError::NoKey);
}
let session_description = session_description_result.unwrap();
let mut nonce_bytes = match session_description.encryption_mode {
VoiceEncryptionMode::Xsalsa20Poly1305 => get_xsalsa20_poly1305_nonce(packet.packet()),
VoiceEncryptionMode::Xsalsa20Poly1305Suffix => {
// Generate 24 random bytes
let mut random_destinaton: Vec<u8> = vec![0; 24];
let random_result = getrandom(&mut random_destinaton);
if let Err(e) = random_result {
return Err(VoiceUdpError::FailedNonceGeneration {
error: format!("{:?}", e),
});
}
random_destinaton
}
VoiceEncryptionMode::Xsalsa20Poly1305Lite => {
// "Incremental 4 bytes (32bit) int value"
let mut data_lock = self.data.write().await;
let nonce = data_lock
.last_udp_encryption_nonce
.unwrap_or_default()
.wrapping_add(1);
data_lock.last_udp_encryption_nonce = Some(nonce);
drop(data_lock);
// TODO: Is big endian correct? This is not documented anywhere
let mut bytes = nonce.to_be_bytes().to_vec();
// This is 4 bytes, it has to be a different size, appends 0s
while bytes.len() < 24 {
bytes.push(0);
}
bytes
}
_ => {
error!(
"This voice encryption mode ({:?}) is not yet implemented.",
session_description.encryption_mode
);
return Err(VoiceUdpError::EncryptionModeNotImplemented {
encryption_mode: format!("{:?}", session_description.encryption_mode),
});
}
};
let key = GenericArray::from_slice(&session_description.secret_key);
let encryption_result;
if session_description.encryption_mode.is_xsalsa20_poly1305() {
let nonce = GenericArray::from_slice(&nonce_bytes);
let encryptor = XSalsa20Poly1305::new(key);
encryption_result = encryptor.encrypt(nonce, payload);
}
// Note: currently unused because I have no idea what the AeadAes256Gcm nonce is
/*else if session_description.encryption_mode.is_aead_aes256_gcm() {
let nonce = GenericArray::from_slice(&nonce_bytes);
let encryptor = Aes256Gcm::new(key);
encryption_result = encryptor.encrypt(nonce, payload);
}*/
else {
error!(
"This voice encryption mode ({:?}) is not yet implemented.",
session_description.encryption_mode
);
return Err(VoiceUdpError::EncryptionModeNotImplemented {
encryption_mode: format!("{:?}", session_description.encryption_mode),
});
}
if encryption_result.is_err() {
// Safety: If encryption fails here, it's chorus' fault, and it makes no sense to
// return the error to the user.
//
// This is not an error the user should account for, which is why we throw it here.
panic!("{}", VoiceUdpError::FailedEncryption);
}
let mut encrypted_payload = encryption_result.unwrap();
// Append the nonce bytes, if needed
// All other encryption modes have an explicit nonce, whereas Xsalsa20Poly1305
// has the nonce as the rtp header.
if session_description.encryption_mode != VoiceEncryptionMode::Xsalsa20Poly1305 {
encrypted_payload.append(&mut nonce_bytes);
}
// We need to allocate a new buffer, since the old one is too small for our new encrypted
// data
let buffer_size = encrypted_payload.len() + RTP_HEADER_SIZE as usize;
let mut new_buffer: Vec<u8> = Vec::with_capacity(buffer_size);
let mut rtp_header = packet.packet().to_vec()[0..RTP_HEADER_SIZE as usize].to_vec();
new_buffer.append(&mut rtp_header);
new_buffer.append(&mut encrypted_payload);
Ok(new_buffer)
}
/// Sends an (already encrypted) rtp packet to the connection.
///
/// # Errors
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_encrypted_rtp_packet(
&self,
packet: discortp::rtp::RtpPacket<'_>,
) -> Result<(), VoiceUdpError> {
let raw_bytes = packet.packet();
let send_res = self.socket.send(raw_bytes).await;
if let Err(e) = send_res {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
trace!("VUDP: Sent rtp packet!");
Ok(())
}
}

356
src/voice/udp/handler.rs Normal file
View File

@ -0,0 +1,356 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use std::{net::SocketAddr, sync::Arc};
use crypto_secretbox::aead::Aead;
use crypto_secretbox::cipher::generic_array::GenericArray;
use crypto_secretbox::KeyInit;
use crypto_secretbox::XSalsa20Poly1305;
use discortp::demux::Demuxed;
use discortp::discord::{
IpDiscovery, IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket,
};
use discortp::rtcp::report::ReceiverReport;
use discortp::rtcp::report::SenderReport;
use discortp::{demux::demux, Packet};
use tokio::sync::{Mutex, RwLock};
use super::UdpBackend;
use super::UdpSocket;
use super::RTP_HEADER_SIZE;
use crate::errors::VoiceUdpError;
use crate::types::VoiceEncryptionMode;
use crate::voice::crypto::get_xsalsa20_poly1305_lite_nonce;
use crate::voice::crypto::get_xsalsa20_poly1305_nonce;
use crate::voice::crypto::get_xsalsa20_poly1305_suffix_nonce;
use crate::voice::voice_data::VoiceData;
use super::{events::VoiceUDPEvents, UdpHandle};
use log::*;
#[derive(Debug)]
/// The main UDP struct, which handles receiving, parsing and decrypting the rtp packets
pub struct UdpHandler {
events: Arc<Mutex<VoiceUDPEvents>>,
pub data: Arc<RwLock<VoiceData>>,
socket: Arc<UdpSocket>,
}
impl UdpHandler {
/// Spawns a new UDP handler and performs IP discovery.
///
/// Mutates the given data_reference with the IP discovery data.
pub async fn spawn(
data_reference: Arc<RwLock<VoiceData>>,
url: SocketAddr,
ssrc: u32,
) -> Result<UdpHandle, VoiceUdpError> {
let udp_socket = UdpBackend::connect(url).await?;
// First perform ip discovery
let ip_discovery = IpDiscovery {
pkt_type: IpDiscoveryType::Request,
ssrc,
length: 70,
address: Vec::new(),
port: 0,
payload: Vec::new(),
};
// Minimum size with an empty Address value, + 64 bytes for the actual address size
let size = IpDiscoveryPacket::minimum_packet_size() + 64;
let mut buf: Vec<u8> = vec![0; size];
// Safety: expect is justified here, since this is an error which should never happen.
// If this errors, the code at fault is the buffer size calculation.
let mut ip_discovery_packet =
MutableIpDiscoveryPacket::new(&mut buf).expect("Mangled ip discovery packet creation buffer, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
ip_discovery_packet.populate(&ip_discovery);
let data = ip_discovery_packet.packet();
debug!("VUDP: Sending Ip Discovery {:?}", &data);
let send_res = udp_socket.send(data).await;
if let Err(e) = send_res {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
debug!("VUDP: Sent packet discovery request");
// Handle the ip discovery response
let received_size_or_err = udp_socket.recv(&mut buf).await;
if let Err(e) = received_size_or_err {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
let receieved_ip_discovery = IpDiscoveryPacket::new(&buf).expect("Could not make ipdiscovery packet from received data, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
debug!(
"VUDP: Received ip discovery!!! {:?}",
receieved_ip_discovery
);
let ip_discovery = IpDiscovery {
pkt_type: receieved_ip_discovery.get_pkt_type(),
length: receieved_ip_discovery.get_length(),
ssrc: receieved_ip_discovery.get_ssrc(),
address: receieved_ip_discovery.get_address(),
port: receieved_ip_discovery.get_port(),
payload: Vec::new(),
};
let mut data_reference_lock = data_reference.write().await;
data_reference_lock.ip_discovery = Some(ip_discovery);
drop(data_reference_lock);
let socket = Arc::new(udp_socket);
let events = VoiceUDPEvents::default();
let shared_events = Arc::new(Mutex::new(events));
let mut handler = UdpHandler {
events: shared_events.clone(),
data: data_reference.clone(),
socket: socket.clone(),
};
// Now we can continuously check for messages in a different task
tokio::spawn(async move {
handler.listen_task().await;
});
Ok(UdpHandle {
events: shared_events,
socket,
data: data_reference,
})
}
/// The main listen task;
///
/// Receives UDP messages and parses them.
async fn listen_task(&mut self) {
loop {
// FIXME: is there a max size for these packets?
// Allocating 512 bytes seems a bit extreme
//
// Update: see <https://stackoverflow.com/questions/58097580/rtp-packet-maximum-size>
// > "The RTP standard does not set a maximum size.."
//
// The theoretical max for this buffer would be 1458 bytes, but that is imo
// unreasonable to allocate for every message.
let mut buf: Vec<u8> = vec![0; 512];
let result = self.socket.recv(&mut buf).await;
if let Ok(size) = result {
self.handle_message(&buf[0..size]).await;
continue;
}
warn!("VUDP: Voice UDP is broken, closing connection");
break;
}
}
/// Handles a message buf
async fn handle_message(&self, buf: &[u8]) {
let parsed = demux(buf);
match parsed {
Demuxed::Rtp(rtp) => {
trace!("VUDP: Parsed packet as rtp! {:?}", buf);
let decryption_result = self.decrypt_rtp_packet_payload(&rtp).await;
if let Err(err) = decryption_result {
match err {
VoiceUdpError::NoKey => {
warn!("VUDP: Received encyrpted voice data, but no encryption key, CANNOT DECRYPT!");
return;
}
VoiceUdpError::FailedDecryption => {
warn!("VUDP: Failed to decrypt voice data!");
return;
}
_ => {
error!("VUDP: Failed to decrypt voice data: {}", err);
return;
}
}
}
let decrypted = decryption_result.unwrap();
trace!("VUDP: Successfully decrypted voice data!");
let rtp_with_decrypted_data = discortp::rtp::Rtp {
ssrc: rtp.get_ssrc(),
marker: rtp.get_marker(),
version: rtp.get_version(),
padding: rtp.get_padding(),
sequence: rtp.get_sequence(),
extension: rtp.get_extension(),
timestamp: rtp.get_timestamp(),
csrc_list: rtp.get_csrc_list(),
csrc_count: rtp.get_csrc_count(),
payload_type: rtp.get_payload_type(),
payload: decrypted,
};
self.events
.lock()
.await
.rtp
.notify(rtp_with_decrypted_data)
.await;
}
Demuxed::Rtcp(rtcp) => {
trace!("VUDP: Parsed packet as rtcp!");
let rtcp_data = match rtcp {
discortp::rtcp::RtcpPacket::KnownType(knowntype) => {
discortp::rtcp::Rtcp::KnownType(knowntype)
}
discortp::rtcp::RtcpPacket::SenderReport(senderreport) => {
discortp::rtcp::Rtcp::SenderReport(SenderReport {
payload: senderreport.payload().to_vec(),
padding: senderreport.get_padding(),
version: senderreport.get_version(),
ssrc: senderreport.get_ssrc(),
pkt_length: senderreport.get_pkt_length(),
packet_type: senderreport.get_packet_type(),
rx_report_count: senderreport.get_rx_report_count(),
})
}
discortp::rtcp::RtcpPacket::ReceiverReport(receiverreport) => {
discortp::rtcp::Rtcp::ReceiverReport(ReceiverReport {
payload: receiverreport.payload().to_vec(),
padding: receiverreport.get_padding(),
version: receiverreport.get_version(),
ssrc: receiverreport.get_ssrc(),
pkt_length: receiverreport.get_pkt_length(),
packet_type: receiverreport.get_packet_type(),
rx_report_count: receiverreport.get_rx_report_count(),
})
}
_ => {
unreachable!();
}
};
self.events.lock().await.rtcp.notify(rtcp_data).await;
}
Demuxed::FailedParse(e) => {
trace!("VUDP: Failed to parse packet: {:?}", e);
}
Demuxed::TooSmall => {
unreachable!()
}
}
}
/// Decrypts an encrypted rtp packet, returning a decrypted copy of the packet's payload
/// bytes.
///
/// # Errors
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// If the decryption fails, this returns a [VoiceUdpError::FailedDecryption].
pub async fn decrypt_rtp_packet_payload(
&self,
rtp: &discortp::rtp::RtpPacket<'_>,
) -> Result<Vec<u8>, VoiceUdpError> {
let packet_bytes = rtp.packet();
let mut ciphertext: Vec<u8> =
packet_bytes[(RTP_HEADER_SIZE as usize)..packet_bytes.len()].to_vec();
let session_description_result = self.data.read().await.session_description.clone();
// We are trying to decrypt, but have not received SessionDescription yet,
// which contains the secret key
if session_description_result.is_none() {
return Err(VoiceUdpError::NoKey);
}
let session_description = session_description_result.unwrap();
let nonce_bytes = match session_description.encryption_mode {
VoiceEncryptionMode::Xsalsa20Poly1305 => get_xsalsa20_poly1305_nonce(packet_bytes),
VoiceEncryptionMode::Xsalsa20Poly1305Suffix => {
// Remove the suffix from the ciphertext
ciphertext = ciphertext[0..ciphertext.len() - 24].to_vec();
get_xsalsa20_poly1305_suffix_nonce(packet_bytes)
}
// Note: Rtpsize is documented by userdoccers to be the same, yet decryption
// doesn't work.
//
// I have no idea how Rtpsize works.
VoiceEncryptionMode::Xsalsa20Poly1305Lite => {
// Remove the suffix from the ciphertext
ciphertext = ciphertext[0..ciphertext.len() - 4].to_vec();
get_xsalsa20_poly1305_lite_nonce(packet_bytes)
}
_ => {
error!(
"This voice encryption mode ({:?}) is not yet implemented.",
session_description.encryption_mode
);
return Err(VoiceUdpError::EncryptionModeNotImplemented {
encryption_mode: format!("{:?}", session_description.encryption_mode),
});
}
};
let key = GenericArray::from_slice(&session_description.secret_key);
let decryption_result;
if session_description.encryption_mode.is_xsalsa20_poly1305() {
let nonce = GenericArray::from_slice(&nonce_bytes);
let decryptor = XSalsa20Poly1305::new(key);
decryption_result = decryptor.decrypt(nonce, ciphertext.as_ref());
}
// Note: currently unused because I have no idea what the AeadAes256Gcm nonce is
/*else if session_description.encryption_mode.is_aead_aes256_gcm() {
let nonce = GenericArray::from_slice(&nonce_bytes);
let decryptor = Aes256Gcm::new(key);
decryption_result = decryptor.decrypt(nonce, ciphertext.as_ref());
}*/
else {
error!(
"This voice encryption mode ({:?}) is not yet implemented.",
session_description.encryption_mode
);
return Err(VoiceUdpError::EncryptionModeNotImplemented {
encryption_mode: format!("{:?}", session_description.encryption_mode),
});
}
// Note: this may seem like we are throwing away valuable error handling data,
// but the decryption error provides no extra info.
if decryption_result.is_err() {
return Err(VoiceUdpError::FailedDecryption);
}
Ok(decryption_result.unwrap())
}
}

18
src/voice/udp/mod.rs Normal file
View File

@ -0,0 +1,18 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//! Defines the UDP component of voice communications, sending and receiving raw rtp data.
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#voice-packet-structure>
/// This always adds up to 12 bytes
const RTP_HEADER_SIZE: u8 = 12;
pub mod backends;
pub mod events;
pub mod handle;
pub mod handler;
pub use backends::*;
pub use handle::*;
pub use handler::*;

25
src/voice/voice_data.rs Normal file
View File

@ -0,0 +1,25 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use discortp::discord::IpDiscovery;
use crate::types::{SessionDescription, Snowflake, VoiceReady, VoiceServerUpdate};
#[derive(Debug, Default)]
/// Saves data shared between parts of the voice architecture;
///
/// Struct used to give the UDP connection data received from the gateway.
pub struct VoiceData {
pub server_data: Option<VoiceServerUpdate>,
pub ready_data: Option<VoiceReady>,
pub session_description: Option<SessionDescription>,
pub user_id: Snowflake,
pub session_id: String,
/// The last sequence number we used, has to be incremented by one every time we send a message
pub last_sequence_number: u16,
pub ip_discovery: Option<IpDiscovery>,
/// The last UDP encryption nonce, if we are using an encryption mode with incremental nonces.
pub last_udp_encryption_nonce: Option<u32>,
}

View File

@ -4,17 +4,26 @@
mod common;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use chorus::errors::GatewayError;
use chorus::gateway::*;
use chorus::types::{
self, Channel, ChannelCreateSchema, ChannelModifySchema, IntoShared, RoleCreateModifySchema,
RoleObject,
self, Channel, ChannelCreateSchema, ChannelModifySchema, GatewayReady, IntoShared,
RoleCreateModifySchema, RoleObject,
};
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::*;
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test_configure!(run_in_browser);
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep;
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
/// Tests establishing a connection (hello and heartbeats) on the local gateway;
@ -25,6 +34,18 @@ async fn test_gateway_establish() {
common::teardown(bundle).await
}
#[derive(Debug)]
struct GatewayReadyObserver {
channel: tokio::sync::mpsc::Sender<()>,
}
#[async_trait]
impl Observer<GatewayReady> for GatewayReadyObserver {
async fn update(&self, _data: &GatewayReady) {
self.channel.send(()).await.unwrap();
}
}
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
/// Tests establishing a connection and authenticating
@ -33,10 +54,35 @@ async fn test_gateway_authenticate() {
let gateway: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone()).await.unwrap();
let (ready_send, mut ready_receive) = tokio::sync::mpsc::channel(1);
let observer = Arc::new(GatewayReadyObserver {
channel: ready_send,
});
gateway
.events
.lock()
.await
.session
.ready
.subscribe(observer);
let mut identify = types::GatewayIdentifyPayload::common();
identify.token = bundle.user.token.clone();
gateway.send_identify(identify).await;
tokio::select! {
// Fail, we timed out waiting for it
() = sleep(Duration::from_secs(20)) => {
println!("Timed out waiting for event, failing..");
assert!(false);
}
// Sucess, we have received it
Some(_) = ready_receive.recv() => {}
};
common::teardown(bundle).await
}