1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
//! [`Sender`] part of a stream.

use std::{marker::PhantomData, mem::size_of};

use bytes::{BufMut, Bytes, BytesMut};
use futures_util::StreamExt;
use quinn::{SendStream, VarInt};
use serde::Serialize;

use super::Task;
use crate::error;

/// Used to send data to a stream.
#[derive(Clone, Debug)]
pub struct Sender<T: Serialize> {
	/// Send [`Serialize`]d data to the sending task.
	sender: flume::Sender<Bytes>,
	/// Holds the type to [`Serialize`] too.
	_type: PhantomData<T>,
	/// [`Task`] handle that does the sending into the stream.
	task: Task<Result<(), error::Sender>, Message>,
}

/// Messages sent to the [`Sender`] task.
#[derive(Clone, Debug)]
enum Message {
	/// Data to be sent.
	Data(Bytes),
	/// Tell [`Task`] to finish the [`Sender`] part of the stream and close it.
	Finish,
	/// Tell [`Task`] to close the [`Sender`].
	Close,
}

impl<T: Serialize> Sender<T> {
	/// Builds a new [`Sender`] from a raw [`quinn`] type. Spawns a task that
	/// sends data into the stream.
	pub(super) fn new(mut stream_sender: SendStream) -> Self {
		// sender channels
		let (sender, receiver) = flume::unbounded();

		// `Task` handling `Sender`
		let task = Task::new(|mut shutdown| async move {
			let mut receiver = receiver.into_stream().fuse();

			while let Some(message) = futures_util::select_biased! {
				message = receiver.next() => message.map(Message::Data),
				shutdown = shutdown => shutdown.ok(),
				complete => None,
			} {
				match message {
					Message::Data(bytes) => stream_sender.write_chunk(bytes).await?,
					Message::Finish => {
						stream_sender.finish().await?;
						break;
					}
					Message::Close => {
						stream_sender
							.reset(VarInt::from_u32(0))
							.map_err(|_error| error::AlreadyClosed)?;
						break;
					}
				}
			}

			Ok(())
		});

		Self {
			sender,
			_type: PhantomData,
			task,
		}
	}

	/// Send `data` into the stream.
	///
	/// # Errors
	/// - [`error::Sender::Serialize`] if `data` failed to be serialized
	/// - [`error::Sender::Write`] if the [`Sender`] failed to to write to the
	///   stream
	/// - [`error::Sender::Closed`] if the [`Sender`] is closed
	pub fn send(&self, data: &T) -> Result<(), error::Sender> {
		self.send_any(data)
	}

	/// Send any `data` into the stream. This will fail on the receiving end if
	/// not decoded into the proper type.
	///
	/// # Errors
	/// - [`error::Sender::Serialize`] if `data` failed to be serialized
	/// - [`error::Sender::Write`] if the [`Sender`] failed to to write to the
	///   stream
	/// - [`error::Sender::Closed`] if the [`Sender`] is closed
	pub(super) fn send_any<A: Serialize>(&self, data: &A) -> Result<(), error::Sender> {
		let mut bytes = BytesMut::new();

		// get size
		let len = bincode::serialized_size(&data)?;
		// reserve an appropriate amount of space
		bytes.reserve(
			usize::try_from(len)
				.expect("not a 64-bit system")
				.checked_add(size_of::<u64>())
				.expect("data trying to be sent is too big"),
		);
		// insert length first, this enables framing
		bytes.put_u64_le(len);

		let mut bytes = bytes.writer();

		// serialize `data` into `bytes`
		bincode::serialize_into(&mut bytes, &data)?;

		// send data to task
		let bytes = bytes.into_inner().freeze();

		// make sure that our length is correct
		debug_assert_eq!(
			u64::try_from(bytes.len()).expect("not a 64-bit system"),
			u64::try_from(size_of::<u64>())
				.expect("not a 64-bit system")
				.checked_add(len)
				.expect("message to long")
		);

		// if the sender task has been dropped, return it's error
		if self.sender.send(bytes).is_err() {
			// TODO: configurable executor
			futures_executor::block_on(async { (&self.task).await })?
		} else {
			Ok(())
		}
	}

	/// Shut down the [`Send`] part of the stream gracefully.
	///
	/// No new data may be written after calling this method. Completes when the
	/// peer has acknowledged all sent data, retransmitting data as needed.
	///
	/// # Errors
	/// This can only return [`error::Sender::Closed`] as an [`Err`], if it was
	/// already closed, but if the [`Sender`] failed to write to the stream it
	/// will return a queued [`error::Sender::Write`].
	pub async fn finish(&self) -> Result<(), error::Sender> {
		self.task.close(Message::Finish).await?
	}

	/// Close the [`Sender`] immediately.
	///
	/// To close a [`Sender`] gracefully use [`Sender::finish`].
	///
	/// # Errors
	/// This can only return [`error::Sender::Closed`] as an [`Err`], if it was
	/// already closed, but if the [`Sender`] failed to write to the stream it
	/// will return a queued [`error::Sender::Write`].
	pub async fn close(&self) -> Result<(), error::Sender> {
		self.task.close(Message::Close).await?
	}
}