use std::fmt::{self, Debug, Formatter};
use futures_util::StreamExt;
use quinn::{RecvStream, SendStream};
use serde::{de::DeserializeOwned, Serialize};
use super::ReceiverStream;
use crate::{error, Receiver, Sender};
#[must_use = "`Incoming` does nothing unless accepted with `Incoming::accept`"]
pub struct Incoming<T: DeserializeOwned> {
sender: SendStream,
receiver: ReceiverStream<T>,
r#type: Option<Result<T, error::Incoming>>,
}
impl<T: DeserializeOwned> Debug for Incoming<T> {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("Incoming")
.field("sender", &self.sender)
.field("receiver", &"ReceiverStream")
.field("type", &"Option<Result<T>>")
.finish()
}
}
impl<T: DeserializeOwned> Incoming<T> {
pub(super) fn new(sender: SendStream, receiver: RecvStream) -> Self {
Self {
sender,
receiver: ReceiverStream::new(receiver),
r#type: None,
}
}
#[allow(clippy::missing_panics_doc)]
pub async fn r#type(&mut self) -> Result<&T, &error::Incoming> {
#[allow(clippy::ref_patterns)]
if let Some(ref r#type) = self.r#type {
r#type.as_ref()
} else {
let r#type = self
.receiver
.next()
.await
.map_or(Err(error::Incoming::Closed), |result| {
result.map_err(error::Incoming::Receiver)
});
self.r#type.insert(r#type).as_ref()
}
}
pub async fn accept<
S: DeserializeOwned + Serialize + Send + 'static,
R: DeserializeOwned + Serialize + Send + 'static,
>(
mut self,
) -> Result<(Sender<S>, Receiver<R>), error::Incoming> {
match self.r#type {
Some(Ok(_)) => (),
Some(Err(error)) => return Err(error),
None => {
let _type = self
.receiver
.next()
.await
.map_or(Err(error::Incoming::Closed), |result| {
result.map_err(error::Incoming::Receiver)
});
}
}
let sender = Sender::new(self.sender);
let receiver = Receiver::new(self.receiver.transmute());
Ok((sender, receiver))
}
}