Impl spawn() for wasm gateway
This commit is contained in:
parent
9ae07a15d7
commit
565a0cd745
|
@ -1,11 +1,14 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::u8;
|
||||||
|
|
||||||
use super::events::Events;
|
use super::events::Events;
|
||||||
use super::*;
|
use super::*;
|
||||||
use pharos::*;
|
use futures_util::StreamExt;
|
||||||
|
use tokio::task;
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
use ws_stream_wasm::*;
|
use ws_stream_wasm::*;
|
||||||
|
|
||||||
use crate::types::{self, WebSocketEvent};
|
use crate::types::{self, GatewayReceivePayload, WebSocketEvent};
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct WasmGateway {
|
pub struct WasmGateway {
|
||||||
events: Arc<Mutex<Events>>,
|
events: Arc<Mutex<Events>>,
|
||||||
|
@ -37,19 +40,73 @@ impl GatewayCapable<WsMessage, WsStream> for WasmGateway {
|
||||||
async fn spawn<G: GatewayHandleCapable<WsMessage, WsStream>>(
|
async fn spawn<G: GatewayHandleCapable<WsMessage, WsStream>>(
|
||||||
websocket_url: String,
|
websocket_url: String,
|
||||||
) -> Result<G, GatewayError> {
|
) -> Result<G, GatewayError> {
|
||||||
let (mut websocket_stream, _) = match WsMeta::connect(websocket_url, None).await {
|
let (_, mut websocket_stream) = match WsMeta::connect(websocket_url.clone(), None).await {
|
||||||
Ok(ws) => Ok(ws),
|
Ok(ws) => Ok(ws),
|
||||||
Err(e) => Err(GatewayError::CannotConnect {
|
Err(e) => Err(GatewayError::CannotConnect {
|
||||||
error: e.to_string(),
|
error: e.to_string(),
|
||||||
}),
|
}),
|
||||||
}?;
|
}?;
|
||||||
|
|
||||||
let mut event = match websocket_stream
|
let (kill_send, mut _kill_receive) = tokio::sync::broadcast::channel::<()>(16);
|
||||||
.observe(ObserveConfig::channel(self, Channel::Unbounded))
|
let (websocket_send, mut websocket_receive) = websocket_stream.split();
|
||||||
.await
|
let shared_websocket_send = Arc::new(Mutex::new(websocket_send));
|
||||||
{
|
|
||||||
Ok(ok) => Ok(ok),
|
let msg = match websocket_receive.next().await {
|
||||||
Err(e) => Err(GatewayError::CannotConnect { error: e }),
|
Some(msg) => match msg {
|
||||||
|
WsMessage::Text(text) => Ok(text),
|
||||||
|
WsMessage::Binary(vec) => Err(GatewayError::NonHelloOnInitiate {
|
||||||
|
opcode: vec.into_iter().next().unwrap_or(u8::MIN),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
None => Err(GatewayError::CannotConnect {
|
||||||
|
error: "No 'Hello' message received!".to_string(),
|
||||||
|
}),
|
||||||
}?;
|
}?;
|
||||||
|
|
||||||
|
let payload: GatewayReceivePayload = match serde_json::from_str(msg.as_str()) {
|
||||||
|
Ok(msg) => Ok(msg),
|
||||||
|
Err(_) => Err(GatewayError::Decode),
|
||||||
|
}?;
|
||||||
|
if payload.op_code != GATEWAY_HELLO {
|
||||||
|
return Err(GatewayError::NonHelloOnInitiate {
|
||||||
|
opcode: payload.op_code,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("GW: Received Hello");
|
||||||
|
|
||||||
|
let gateway_hello: types::HelloData =
|
||||||
|
serde_json::from_str(payload.event_data.unwrap().get()).unwrap();
|
||||||
|
|
||||||
|
let events = Events::default();
|
||||||
|
let shared_events: Arc<Mutex<Events>> = Arc::new(Mutex::new(events));
|
||||||
|
let store: GatewayStore = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
|
let mut gateway = WasmGateway {
|
||||||
|
events: shared_events.clone(),
|
||||||
|
heartbeat_handler: todo!(),
|
||||||
|
websocket_send: shared_websocket_send.clone(),
|
||||||
|
websocket_receive,
|
||||||
|
store: store.clone(),
|
||||||
|
url: websocket_url.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
task::spawn_local(async move {
|
||||||
|
gateway.gateway_listen_task().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(G::new(
|
||||||
|
websocket_url.clone(),
|
||||||
|
shared_events.clone(),
|
||||||
|
shared_websocket_send.clone(),
|
||||||
|
kill_send.clone(),
|
||||||
|
store,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WasmGateway {
|
||||||
|
async fn gateway_listen_task(&mut self) {
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue