1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//! [`Receiver`] part of a stream.

use std::{
	fmt::{self, Debug, Formatter},
	pin::Pin,
	task::{Context, Poll},
};

use futures_util::{stream::Stream, StreamExt};
use serde::de::DeserializeOwned;

use super::{ReceiverStream, Task};
use crate::error;

/// Used to receive data from a stream. Will stop receiving message if
/// deserialization failed.
#[derive(Clone)]
pub struct Receiver<T: 'static> {
	/// Send [`Deserialize`](serde::Deserialize)d data to the sending task.
	receiver: flume::r#async::RecvStream<'static, Result<T, error::Receiver>>,
	/// [`Task`] handle that does the receiving from the stream.
	task: Task<Result<(), error::AlreadyClosed>>,
}

impl<T> Debug for Receiver<T> {
	fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
		formatter
			.debug_struct("Receiver")
			.field("receiver", &"RecvStream")
			.field("task", &self.task)
			.finish()
	}
}

impl<T> Receiver<T> {
	/// Builds a new [`Receiver`] from a raw [`quinn`] type. Spawns a task that
	/// receives data from the stream.
	pub(super) fn new(mut stream: ReceiverStream<T>) -> Self
	where
		T: DeserializeOwned + Send,
	{
		// receiver channels
		let (sender, receiver) = flume::unbounded();
		let receiver = receiver.into_stream();

		// `Task` handling `Receiver`
		let task = Task::new(|mut shutdown| async move {
			/// Help Group Messages
			enum Message<T> {
				/// Data arrived from stream.
				Data(Result<T, error::Receiver>),
				/// [`Receiver`] asked to close.
				Close,
			}

			while let Some(message) = futures_util::select_biased! {
				message = stream.next() => message.map(Message::Data),
				shutdown = shutdown => shutdown.ok().map(|_| Message::Close),
				complete => None,
			} {
				match message {
					Message::Data(message) => {
						let failed = message.is_err();

						// the receiver might have been dropped
						if sender.send(message).is_err() {
							break;
						}

						if failed {
							break;
						}
					}
					Message::Close => {
						stream.stop()?;
						break;
					}
				}
			}

			Ok(())
		});

		Self { receiver, task }
	}

	/// Wait for the [`Receiver`] part of the stream to finish gracefully.
	///
	/// This can only be achieved through the peer's
	/// [`Sender::finish`](crate::Sender::finish) or an error.
	///
	/// # Errors
	/// [`error::AlreadyClosed`] if it has already been closed.
	pub async fn finish(&self) -> Result<(), error::AlreadyClosed> {
		(&self.task).await?
	}

	/// Close the [`Receiver`] immediately. To close a [`Receiver`] gracefully
	/// use [`finish`](Self::finish).
	///
	/// # Errors
	/// [`error::AlreadyClosed`] if it has already been closed.
	pub async fn close(&self) -> Result<(), error::AlreadyClosed> {
		self.task.close(()).await?
	}
}

impl<T> Stream for Receiver<T> {
	type Item = Result<T, error::Receiver>;

	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
		self.receiver.poll_next_unpin(cx)
	}
}